Kotlin Coroutines vs RxJava – spoiled for choices
One of the basic requirements that most mobile app developers must meet is handling of asynchronous tasks. Simply put, these are all the operations performed “in the background”. Downloading data from the Internet, reading from and writing to the database, or compressing files – these usually take the program longer to process.
For this reason, parts of applications require a “special treatment”. The programmers need to move the execution of these processes outside the main thread of the app. This provides the user with the impression of a smooth running of the keeps the interface responsive, despite the fact that the program itself has more work to do.
Android has a number of tools to help you implement this, but majority of them do not solve all of the issues a programmer may encounter. Direct manipulation of threads, use of AsyncTasks method quickly leads to creating hard-to-read and over complicated code. Let’s take a look at two libraries: the popular RxJava, and Kotlin Coroutines, which facilitate the writing of asynchronous programs.
Table of contents
- ReactiveX
– How do we solve this using streams?
– Combining functions into a sequence
– How to manage threads?
– Receiving the result
– ReactiveX – what does it offer besides handling asynchronous tasks? - Kotlin Coroutines
– Where is my coroutine located?
– Managing Threads
– Where do the errors go?
– Job object – remember what is happening in the program - Kotlin Coroutines – summary
- Choices, choices…
ReactiveX
RxJava is an implementation of the ReactiveX concept – an API for writing asynchronous programs using streams. Stream is a basic concept in RX, represented by Observables, an abstract data producer that flows data down to the recipient watching that stream. For this reason, Observable can be called an “inverted” iterator – the subscriber does not extract the values from the source, but only receives it when it is available. Thanks to this the stream can both synchronously or asynchronously.
This feature brings to mind the observer pattern. In fact, even the developers define ReactiveX as “the right way to handle the observer pattern” because it enriches it with the ability to notify of the stream’s end or when an error occurs. In addition, Observable can be processed simple using operators – higher-order functions such as zip, map or flatMap, allowing almost any combination of them as well as enabling modifications to the data they process.
Suppose you want to download a photo from memory, compress and send it. Each of these operations will take a while and may potentially end with an error. Functions should not “be aware”of the thread on which they are being executed and should be simple to use in other places in the program. We should also be able to safely cancel the task.
How do we solve this using streams?
fun Single<Picture> getPicFromMemory(name: String)
fun Single<Picture> resizePic(picture: Picture)
fun PictureDto mapPicToNetModel(picture: Picture)
fun Completable uploadPic(picture: PictureDto)
getPicFromMemory()
.flatMap(::resizePic)
.map(::mapPicToNetModel)
.flatMapCompletable(::uploadPic)
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.single())
.subscribe(::onSuccess,::onError)
The first two functions return the Single<> type. This is a special type of an Observable that emits only one value, then end or alerts you. Thanks to this, it is suitable for modeling tasks such as server queries, image processing, or reading from a database. The last function returns a Completable type that emits information about the success of an operation or an error. It is useful in a situation where we are not interested in the result of the operation, but only about the signal of its positive completion or failure.
Combining functions into a sequence
In this case, we use the flatMap operator to combine the tasks. It accepts as the argument the value emitted by the previous Singles and returns SingleSource<T>, ie. the next Single. In this way, we can combine our operations into any sequence. Then, in the same way, we attach a function that returns the Completable another version of the Observable, signaling only the end of the task, or an error. Thanks to this, we achieve one of our assumptions- composing our use-case out of small functions, which only those accept and return data.
How to manage threads?
Next, we call the subscribeOn() and observeOn() methods. The first allows you to specify on which thread the declared operations will be performed, the second one indicates the thread on which the final result will be delivered. These two operators are enough to manage on which threads the individual steps of the task are performed. The “behind the scenes” library will take care of switching, the programmer must only indicate the steps. In this way, we fulfill another assumption – giving over the responsibility for asynchrony to stream itself, without “bothering” individual elements.
Receiving the result
Finally, the subscribe function is called up to allow us to receive the result of our operation. It also accepts the onError() function, which is called when an error occurs. This is another advantage of ReactiveX – all errors that may occur in the stream, just like data, flow down here. By default, an error automatically ends the stream.
On Android, it is important to link the operation’s life cycle with the life cycle of the activity. Activity can be stopped or destroyed at any time and the programmer must remember about ending all its related tasks accordingly. The subscribe() function returns a Disposable object – thanks to it we can cancel the stream operation and find out if the cancellation has already occurred.ow to manage threads?On Android, it is important to link the operation’s life cycle with the life cycle of the activity. Activity can be stopped or destroyed at any time and the programmer must remember about ending all its related tasks accordingly. The subscribe() function returns a Disposable object – thanks to it we can cancel the stream operation and find out if the cancellation has already occurred.
val compositeDisposable: CompositeDisposable
val actionDisposable = Observable.create(::action).subscribe()
actionDisposable.addTo(compositeDisposable)
compositeDisposable.dispose()
In the listing, I used the CompositeDisposable class, which allows you to collect Disposables in one place and cancel them all at once. It is a convenient solution, thanks to which we will avoid manually declaring references for each operation separately. CompositeDisposable facilitates scoping, as we can, for example, assign them to a specific controller and clearly declare which streams are associated with it. In this way, ReactiveX solves our last problem – the ability to cancel our tasks.
ReactiveX – what does it offer besides handling asynchronous tasks?
The above is just a small example of the possibilities of reactive streams. Our case involved the processing of a single value, linearly passing through several stages. However, the library also gives control over streams running in parallel, switching between them, synchronizing so that they wait for each other, or the contrary – compete with each other. An important element is also the back-pressure mechanism, managing the stream of data flowing faster than the consumer is able to process it. You can also base a large part of application’s architecture with reactive programming. We can emit not only the results of longer calculations or queries to the backend. Clicks and other User Interface events, any change in the database or SharedPreferences, all this can be observed.
Kotlin Coroutines
Coroutines is an element that can be new to Android developers accustomed to Java. They are designed to convert long-term, callback-based operations into a sequential code. Their essential element are suspending functions – functions you can suspend without blocking the thread. Similar constructs can be found in JavaScript, where async functions return Promise. In Kotlin, however, the functions marked with suspend return values, just like ordinary functions. So let’s see how we can solve our sample problem using coroutines.
val handler = CoroutineExceptionHandler { coroutineContext, throwable ->
println(throwable)
}
suspend fun getPicFromMemory(): Picture
suspend fun resizePic(p: Picture): Picture
suspend fun mapPicToNetModel(p: Picture): PictureDto
suspend fun uploadPic(p: PictureDto)
val job = controllerContext.launch(handler) {
withContext(Dispatchers.IO) {
try {
val pic = getPicFromMemory()
val resizedPic = resizePic(pic)
val netModel = mapPicToNetModel(resizedPic)
uploadPic(netModel)
} catch (e: SpecialException) {
onSpecialError(e)
}
}
onSuccess()
}
The first thing that strikes you are the signatures of the function of the next steps. As I mentioned earlier, they directly return the result type, instead of the type representing the long-term operation, they are also marked as suspend. Thanks to this, we can save the entire algorithm 100% sequentially.
Where is my coroutine located?
controllerContext.launch(handler)
For a change, let’s talk about nesting from the outside. The ControllerContext determines the context in which the task will be set off. In this case, the name indicates that it will work in the controller’s space. Scopes are important because they create a parent-child hierarchy and allow grouping of tasks. The launch function is called coroutine builder, its purpose is to create coroutines, which context inherits from the parent context, in this case, ControllerContext. Parent-Context will always wait for all of his children-Contexts to end. If it is canceled, it will also end all children-Contexts. Exceptions that have not been captured in a subordinate context are passed to the parent. Scopes prove themselves useful, just like Android Activities and Fragments. But Kotlin does not limit us to adding scope as a component property, thanks to the delegation (by) the component itself can become our scope.
class Activity : CoroutineScope by CoroutineScope(Dispatchers.Main)
Managing Threads
withContext(Dispatchers.IO)
Next, we encounter the function withContext() – it is used to execute the instruction block on the specified thread. It should be noted that the execution and the suspension of the function within coroutines does not mean that the thread to which it is assigned has been unloaded! Operations defined in it are still executed as if they were called in a standard way. From the Android Developer’s point of view it may be a bit counter intuitive, eg. it is possible to call the network code on MainThread, without stuttering the interface and the NetworkOnMainThreadException exception. However, network operations are often just waiting for a response, tasks such as writing to a database or processing of photos are more computationally demanding. As a result, they will take up the resource that UI thread should use for smooth display of the interface.
Where do the errors go?
try {
val pic = getPicFromMemory()
val resizedPic = resizePic(pic)
val netModel = mapPicToNetModel(resizedPic)
uploadPic(netModel)
} catch (e: SpecialException) {
onSpecialError(e)
}
Next, we handle errors that we catch with the traditional try / catch block. But what if we want to define a general fault handling procedure that we can use in other places? CoroutineExceptionHandler is used for this, passed as one of the builder’s parameters. It inherits from CoroutineContext.Element, so we can simply add it as one of the elements of the context in which we launch the task. It takes over the handling of uncaught errors. Thanks to that, we do not have to nest every operation in try / catch. As a result, we limit the repetitions in the code and improve its readability.
Job object – remember what is happening in the program
The launch function returns a Job object representing the executed procedure. It has a life cycle, defining in which phase of the work it is located. The default, newly created Job, starts working immediately, but its start can be postponed. Thanks to this type of objects, we can track the status of individual tasks, as well as manually cancel them. Running Job automatically joins the parent context.
Kotlin Coroutines – summary
Kotlin Coroutines meet all the requirements set at the beginning. All steps of the algorithm are functions that can be suspended due to the “suspend” keyword. This allows for any composition of them similar to writing a synchronous, sequential code.
Scopes make it easier to control the launch and cancellation of both individual tasks and their entire sets. In addition, we can combine them with the life cycle of components. Coroutines also allow you to assign whole blocks of instructions to a specific thread, such a block then returns the result, so we keep the sequential structure of the code, which does not require entering the state. Like ReactiveX, coroutines make it easier to synchronize tasks performed in parallel.
Choices, choices …
Both technologies offer a very different approach to writing asynchronous tasks. RxJava is a very popular solution, allowing not only to manage threads and synchronize tasks. It becomes the basis of many projects, providing mechanisms for observing resources and streaming interface events. In addition, back-pressure support and the concepts of cold and hot observables add to its versatility.
Kotlin Coroutines in turn, is a fairly new library, oriented primarily on a different style of writing asynchronous code, previously not very popular on Android. It has the so-called Channels, which allow the transmission of data streams between coroutines, but they remain in the experimental phase. However, they are easy to use and blend in well with the Android architecture.
The decision belongs to the programmer. For me, learning about Kotlin Coroutines was an interesting escape from the current programming style. It’s worth getting familiar with them for getting to know the async / await pattern, especially since it is widely used in Flutter, a new cross-platform framework from Google. I hope you like the post applied methods and solutions. If you need extra support check out our blog.