Even smarter async with coroutine actors

In my previous post on coroutines for Android, I described how to create a small and simple DSL for performing a single asynchronous task…

In my previous post on coroutines for Android, I described how to create a small and simple DSL for performing a single asynchronous task and getting the response on the main thread.

While this works well for single events, it doesn’t suit the cases where we have a continuous stream of events, like click listeners and other continuous event sources.

In this post I will describe how to use an actor in coroutines to solve this.


If you’ve been doing Kotlin for a while you’ve probably seen the use of its functional style with Collections and Sequences. Instead of using a regular for-loop to work on the items in a collection, you can use a streams API for reacting to each item.

So instead of doing this;

fun convertToCats(kittens: List<Kitten>): List<Cat> {
    val cats = ArrayList<Cat>()
    for (kitten in kittens) {
        cats += Cat.fromKitten(kitten)
    }
    return cats
}

You can do the following;

fun convertToCats(kittens: List<Kitten>): List<Cat> {
   return kittens.map { Cat.fromKitten(it) }
}

This is great for making our code slimmer and often more readable. You might have already used the same pattern when you’ve worked with RxJava. The additional bonus from something like RxJava is that you also gain the possibility of switching to a different thread for each operation, effectively letting you perform heavy operations on a background thread and then moving back to the main thread to display the result.

Let’s see how this works with Kotlin Coroutines.

Channels

One of the utilities built on top of coroutines are channels. The easiest way to describe them is as a regular queue that works inside coroutines. Both the operation to write to and read from a channel are suspending functions, and will thus not block the thread they will be running on. All you need to do is to make sure you call them inside a coroutine.

val channel: ReceiveChannel<Int> = produce<Int>(CommonPool) {
    for (i in 0 .. 100) {
        delay(1000)
        channel.send(i)
    }
}

launch(UI) {
    for (number in channel) {
        textView.text = "Latest number is $number"
    }
}

The code above show a simple use of channels with coroutines. The lamba that we pass to produce will loop from 0 to 100, delay 1 second on each iteration, and then send the current value of i to the channel.

produce is another coroutine builder and works similar to launch or async, but will also provide you with a SendChannel inside its scope. The function send on a SendChannel will add the number to the channel, effectively queuing it up for the consuming side.

The return value from produce is a ReceiveChannel, which we use inside the second coroutine (running on the UI thread) where we effectively loop through each number, or event, that is queued.

Note that all of this is running inside two different coroutines. What we effectively have here is a channel to communicate from the first coroutine (running on the background thread) to the second one (running on the main thread).

A related coroutine builder is actor, which basically does the opposite of produce. It launches a coroutine that wraps a ReceiveChannel and returns a SendChannel. Let’s see if we can use that to handle click events on an Android View.

val channel: SendChannel<View> = actor(UI) {
    for (number in channel) {
        textView.text = "A new click happend!"
    }
}

button.setOnClickListener {
    launch(CommonPool) {
        channel.send(it)
    }
}

Now we’re basically doing the same thing as before, but in a different order. We call launch from the click listener on our button, which will send an event (containing the View itself) to the channel. In the coroutine created by actor, we simply iterate over each event and update the UI.

Sequence of events

What if we would like to perform an action on each event before updating the UI? We could launch another coroutine inside the for-loop that runs on the background thread and performs and expensive operations there, and then call await on the deferred result. However, not only will that require a new coroutine for each event, it will also look a bit messy. Let’s see if we can fix that.

We start with the following two functions as an example;

fun loadData(): Thing {
    // loading things from network
}

fun showResult(thing: Thing) {
    textView.text = thing.toString()
}

They are both synchronous functions, the first must run on a worker thread while the second must run on the main/UI thread.

val channel: SendChannel<View> = actor(UI) {
    channel.map(CommonPool) { loadData() }
           .consumeEach { showResult(it) }
}

button.setOnClickListener {
    launch { channel.send(it) }
}

Now we’ve updated the lambda inside the call to actor. We’re now using the same API as with a regular sequence. map can take an optional parameter in the form or a CoroutineContext, which basically lets us switch to a worker thread for that operation.

The nice thing here is that we don’t need to switch back to the UI thread (as you need to do with RxJava), as that is the default for this context already (i.e, the CoroutineContext in map is only applied to that operation).

However, we’re still creating a new coroutine for each click (the call to launch in the click listener). With one small change we can solve that, as well as gain an additional bonus.

button.setOnClickListener { channel.offer(it) }

By switching the call from send to offer, we no longer need to wrap it in a coroutine. This function works outside coroutines and will also drop that event if the receiving channel (inside the actor lambda) is busy processing the previous event. This means we no longer need to create a new coroutine for each click AND we can ensure that we won’t be queuing up additional calls to loadData before the latest data has been loaded and shown.

Another DSL

While the code above is simple enough, we can make it much easier to use. Let’s expand on the DSL I made in the previous post and see if we can get something like this;

whenClicking(button) {
    loadData()
} then {
    showResult(it)
}

We start by creating a class called OnClickLoader that will wrap this for us.

class OnClickLoader<out T>(val lifecycle: Lifecycle,
                           val view: View,
                           val loadFunction: () -> T) {
    infix fun then(uiFunction: (T) -> Unit) {
        val job = Job()
        val actor = actor<Unit>(context = UI, parent = job) {
            channel.map(CommonPool) { loadFunction() }
                    .consumeEach { uiFunction(it) }
        }

        lifecycle.addObserver(LifecycleListener(job))

        view.setOnClickListener { actor.offer(Unit) }
    }
}

This class takes a Lifecycle, a View, and a lambda that will load our data. It contains a single function named then, that will setup the click listener and map it to a coroutine for us.

We also create a single Job object that we will use for cancelling. The LifecycleListener class, that will listen to the lifecycle of the Activity or Fragment we’re calling it on, will make sure our coroutine is cancelled in onStop.

class LifecycleListener(val job: Job) : LifecycleObserver {
    @OnLifecycleEvent(Lifecycle.Event.ON_STOP)
    fun stop() {
        if (!job.isCancelled) {
            job.cancel()
        }
    }
}

Finally, we create an extension function on LifecycleOwner, which let’s us call this inside any Fragment or Activity.

fun <T> LifecycleOwner.whenClicking(view: View,
                                    loadFunction: () -> T): 
                                    OnClickLoader<T> {
    return OnClickLoader(lifecycle = lifecycle, 
                         view = view, 
                         loadFunction = loadFunction)
}

Since lifecycle is a property on any class implementing the LifecycleOwner interface, we don’t need to pass that as a parameter to the extension function whenClicking. That means we can simply call it like this;

whenClicking(button) {
    loadData()
}

The lambda after whenClicking will be the loadFunction passed to the constructor of OnClickLoader. Since this class has then declared as an infix function, we now get a similar DSL as in my earlier post;

whenClicking(button) { loadData() } then { showResult(it) }

And there we go. We now have a simple way of wrapping clicks, or any type of event stream, using coroutines and we also have a nice DSL for dealing with threads and cancellations. Since we’re calling offer instead of send on the SendChannel, we don’t need to create another coroutine for each click and we prevent accidental, quick double taps on the button as bonus.

The source code for the code above can be found on GitHub. It also contains a more cleaned up version of the DSL I described in my earlier post.