Using Kotlin, RxJava 2 and Retrofit to consume REST API on Android

Jakub Dziworski

11 May 2017.6 minutes read

Recently I decided to create a pet project which would help me and my girlfriend split house chores.
The idea was to define a set of chores and assign them to users periodically.
Every night, chores for the upcoming day are distributed to users (based on what they have done in the past).

Tech Stack

I am a backend developer and my go-to language is Scala, so the choice for server side was quite clear.
For client side I decided to go with Android since doing house chores is highly mobile activity.
Considering nature of house chores, mobile client seemed like a good idea.
The remaining decision was language and stack for it.

Kotlin

Kotlin is JVM language which makes it compatible with existing Java libraries (RxJava and Retrofit being some of them).
Apart from that Kotlin is more functional than Java which helps to deal with the approach imposed by RxJava.
It is already stable and has an excellent IDE support.
Kotlin surprised me also with its great compile time (almost matching Java).

RxJava 2

Almost every mobile app has at least two asynchronous sources of data - user input and IO (network/storage).
UI needs to be responsive at all times - that's why they need to be handled on a different threads.
We still need a way to glue these sources with the app.
Traditionally it is done by listening to user input/IO and then changing state as a reaction to these events.
It usually leads to callback hell and complex state management.
RxJava is the solution to this problem. It provides a way to chain and combine asynchronous flows of data.

Retrofit

This is a type safe http client, handling all serialization and deserialization into user defined classes behind the scenes.
I've used Retrofit before and had very positive experience with it.
Apart from that, it integrates nicely with RxJava.

The Process

Consuming REST api with Retrofit

The very first step was adding and getting chores from the server. Using retrofit, we can define http endpoints in a type safe way:

interface BackendService {
  @GET("chores")
  fun getChores() : Call<GetChoresDto>

  @POST("chores")
  fun addChore(@Body addChoreDto: AddChoreDto) : Call<ChoreId>
}

In Kotlin, DTOs can be defined as immutable data classes:

data class ChoreId(val choreId: Long)
data class GetChoreDto(val id: Long,val  name: String, val points: Int,val  interval: Int?)
data class GetChoresDto(val chores: List<GetChoreDto>)
data class AddChoreDto(val name: String, val points: Int, val interval: Int?)

That's it. No equals, hashcode, constructors or toString. It's all there under the hood.

Using shiny, new interface it's possible to add chores and fetch them from a server.

var chores = listOf<GetChoreDto>() //no need to provide type on left side of statement

backend.addChore(chore).enqueue(object : Callback<ChoreId> { //anonymous inner class. No lambda because there are two methods in Callback interface 
            override fun onResponse(call: Call<ChoreId>, response: Response<ChoreId>) {
                backend.getChores().enqueue(object : Callback<GetChoresDto> {
                    override fun onResponse(call: Call<GetChoresDto>, response: Response<GetChoresDto>) {
                        chores = response.body().chores
                    }
                    override fun onFailure(call: Call<ChoreId>, t: Throwable){}
                })
            }
            override fun onFailure(call: Call<ChoreId>, t: Throwable){}
        })

Wow, this sucks. It's the very first example and we've already run into nested callbacks.
Apart from that, what would happen if some other part of app also required updated chores? We would have to make another request.
Ideally all interested components should be notified about new chores, regardless of what initiated them.
Let's go ahead and make this code better.

Introducing endpoints as RxJava Flowables

This is where RxJava becomes useful.
There is an adapter for retrofit that allows to define endpoints as Flowables (Flowable is RxJava2's replacement for Observable):

interface BackendService {
  @GET("chores")
  fun getChores() : Flowable<GetChoresDto>

  @POST("chores")
  fun addChore(@Body addChoreDto: AddChoreDto) : Flowable<ChoreId>
}

Once the endpoints are defined, it's a good idea to specify which scheduler should be used for doing IO and which for receiving result:

val choresFlowable = backend.getChores() //val makes reference final
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())

It's now possible for anyone to subscribe to choresFlowable and get chores upon subscription:

choresFlowable.subscribe { response ->
      chores = response.chores  
  }

It would also be great if all subscribers of choresFlowable received newest chores when a new chore is added.
Currently the data is pulled once, upon subscription.
RxJava is all about chaining and composing, so achieving it becomes really easy:

val choresChanged = PublishProcessor.create<Unit>() //Think of a Unit as of void in Java 

val choresFlowable = choresChanged
            .flatMap { backend.getChores() }
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())

fun addChore(chore: AddChoreDto) {
    backend.addChore(chore)
            .subscribeOn(Schedulers.io())
            .subscribe { choresChanged.onNext(Unit) }
}

choresChanged is PublishProcessor. This is an RxJava component that allows to manually emit items to downstream.

By calling choresChanged.onNext it sends the item (Unit) down the stream. This signals a need for chores to be updated.

choresChanged.flatMap { backend.getChores() } - after choresChanged emits an empty item, we need to map it into chores.
We do that by calling getChores endpoint, which also returns Flowable.
To avoid nested Flowables, flattening is required (flatmap = map + flatten)

What happens is that when addChore is invoked, POST chores http endpoint is called.
Once the response comes back from the server, choresChanged PublishProcessor emits a new empty (Unit) item.
This triggers GET chores http call, whose result is passed to all subscribers of choresFlowable.

We have no nested callbacks. Anyone can subscribe to choresFlowable, get items on subscription and also each time someone adds a new chore.
This is great! Btw a good place to subscribe to a flowable is Presenter, if you are using MVP.

Error handling

What about error handling?
It'd be nice to have one global error handler for all http errors.
In such handler we could log errors, display a toast and suppress throwable so the app doesn't crash.
Flowable has onErrorResumeNext method, which does exactly that.
All we have to do is create a new PublishProcessor and invoke it from onErrorResumeNext:

val errorOccurred = PublishProcessor.create<Throwable>()
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe { throwable ->
                              Toast.makeText(
                                applicationContext,
                                "NETWORK ERROR $throwable", //String interpolation. No more ugly concatenation
                                Toast.LENGTH_SHORT
                              ).show()
                        }

val choresFlowable = choresChanged
            .flatMap { backend.getChores() }
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .onErrorResumeNext{ throwable: Throwable ->
                  errorOccurred.onNext(throwable)
                  Flowable.empty<T>()
            }

Polishing it with extension methods

As you can see, the lambda argument for onErrorResumeNext is a bit verbose.
In java we could write a method that wraps the existing Flowable and adds error handling:

  withErrorHandling(choresChanged
                        .flatMap { backend.getChores() }
                        //...)

There is, however, a better way to do this in Kotlin using extension functions.
Basically, what they allow to do is define a function for existing type and call it as if it was part of that type.
You could use it to write extensions for existing collections api or even another String utils library.
There is already RxKotlin library, which uses this concept extensively.
In our case we could utilize this feature to define withErrorHandling function on Flowable type:

val choresFlowable = choresChanged
            .flatMap { backend.getChores() }
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .withErrorHandling() //<---- extension function call

fun <T> Flowable<T>.withErrorHandling(): Flowable<T> {
        return this.onErrorResumeNext{ throwable: Throwable ->
                    errorOccurred.onNext(throwable)
                    Flowable.empty<T>()
                }
    }

The best part is the function can be used on any Flowable object so we can reuse it for other endpoints too.

Conclusion

A few years ago Android development was really painful.
Java 7 forcing us to create an infinite amount of anonymous classes, weird APIs, slow emulator, eclipse and sluggish edit->compile->run cycle.
Today with many libraries, Android Studio and Kotlin I must say the process is way more enjoyable.
Given my short experiment with Kotlin I honestly see no reason why new projects should still use Java.
I had experienced absolutely no issues with Kotlin, regarding bugs / compilation times or IDE support.
Coming from Scala I missed a few traits here and there (I prefer Option monad instead of nullable values for instance), but that's just style preference.
The learning curve is also gentle, so I believe any Java developer could pick it in a matter of day.

Blog Comments powered by Disqus.