/ FUTURE, KOTLIN, COROUTINE

Concurrency: Java Futures and Kotlin Coroutines

A long time ago, one had to manually start new threads when wanting to run code concurrently in Java. Not only was this hard to write, it was easy to introduce bugs that were hard to find. Testing, reading and maintaining such code was no walk in the park either. Since that time - and with a little incentive coming from multi-core machines, the Java API has evolved to make developing concurrent code easier. Meanwhile, alternative JVM languages also have their opinion about helping developers write such code. In this post, I’ll compare how it’s implemented in Java and Kotlin.

To keep the post focused, I deliberately left out performance to write about code readability.

About the use-case

The use-case is not very original. We need to call different web services. The naive solution would be to call them sequentially, one after the other, and collect the result of each of them. In that case, the overall call time would be the sum of the call time of each service. An easy improvement is to call them in parallel, and wait for the last one to finish. Thus, performance improves from linear to constant - or for the more mathematically inclined, from o(n) to o(1).

To simulate calling a web service with a delay, let’s use the following code (in Kotlin because this is so much less verbose):

class DummyService(private val name: String) {

    private val random = SecureRandom()

    val content: ContentDuration
        get() {
            val duration = random.nextInt(5000)
            Thread.sleep(duration.toLong())
            return ContentDuration(name, duration)
        }
}

data class ContentDuration(val content: String, val duration: Int)

The Java Future API

Java offers a whole class hierarchy to handle concurrent calls. It’s based on the following classes:

Callable

A Callable is a "task that returns a result". From another view point, it’s similar to a function that takes no parameter and returns this result.

Future

A Future is "the result of an asynchronous computation". Also, "The result can only be retrieved using method get when the computation has completed, blocking if necessary until it is ready". In other words, it represents a wrapper around a value, where this value is the outcome of a calculation.

Executor Service

An ExecutorService "provides methods to manage termination and methods that can produce a Future for tracking progress of one or more asynchronous tasks". It is the entry point into concurrent handling code in Java. Implementations of this interface - as well are more specialized ones, can be obtained through static methods in the Executors class.

This is summarized in the following class diagram:

java util concurrent

Calling our services using the concurrent package is a 2-steps process.

Creating a collection of callables

First, there need to be a collection of Callable to pass to the executor service. This is how it might go:

  1. From a stream of service names
  2. For each service name, create a new dummy service initialized with the string
  3. For every service, return the service’s getContent() method reference as a Callable. This works because the method signature, matches Callable.call() and Callable is a functional interface.

This is the preparation phase. It translates into the following code:

List<Callable<ContentDuration>> callables = Stream.of("Service A", "Service B", "Service C")
      .map(DummyService::new)
      .map(service -> (Callable<ContentDuration>) service::getContent)
      .collect(Collectors.toList());

Processing the callables

Once the list has been prepared, it’s time for the ExecutorService to process it aka the "real work".

  1. Create a new executor service - any will do
  2. Pass the list of Callable to the executor service, and stream the resulting list of Future
  3. For every future,
  4. Either return the result
  5. Or handle the exception

The following snippet is a possible implementation:

ExecutorService executor = Executors.newWorkStealingPool();
List<ContentDuration> results = executor.invokeAll(callables).stream()
    .map(future -> {
         try { return future.get(); }
         catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); }
     }).collect(Collectors.toList());

The Future API, but in Kotlin

Let’s face it, while Java makes it possible to write concurrent code, reading and maintaining it is not that easy, mainly due to:

  • Going back and forth between collections and streams
  • Handling checked exception in lambdas
  • Casting explicitly

Just porting the above code to Kotlin removes those limitations and makes it more straightforward:

var callables: List<Callable<ContentDuration>> = arrayOf("Service A", "Service B", "Service C")
    .map { DummyService(it) }
    .map { Callable<ContentDuration> { it.content } }

val executor = Executors.newWorkStealingPool()
val results = executor.invokeAll(callables).map { it.get() }

Kotlin Coroutines

With version 1.1 of Kotlin comes a new experimental feature called coroutines.

Basically, coroutines are computations that can be suspended without blocking a thread. Blocking threads is often expensive, especially under high load […​]. Coroutine suspension is almost free, on the other hand. No context switch or any other involvement of the OS is required.

The leading design principle behind coroutines is that they must feel like sequential code but run like concurrent code. They are based on the following (simplified) class diagram:

coroutines

Nothing beats the code itself though. Let’s implement the same as above, but with coroutines in Kotlin instead of Java futures.

As a pre-step, let’s just extend the service to ease further processing by adding a new computed property wrapped around content, of type Deferred:

val DummyService.asyncContent: Deferred<ContentDuration>
    get() = async(CommonPool) { content }

This is standard Kotlin extension property code, but notice the CommonPool parameter. This is the magic that makes the code run concurrent. It’s a companion object (i.e. a singleton) that uses a multi-fallback algorithm to get an ExecutorService instance.

Now, on to the code flow proper:

  1. Coroutines are handled inside a block. Declare a variable list outside the block to be assigned inside it
  2. Open the synchronization block
  3. Create the array of service names
  4. For each name, create a service and return it
  5. For each service, get its async content (declared above) and return it
  6. For each deferred, get the result and return it
// Variable must be initialized or the compiler complains
// And the variable cannot be used afterwards
var results = runBlocking {
    arrayOf("Service A", "Service B", "Service C")
        .map { DummyService(it) }
        .map { it.asyncContent }
        .map { it.await() }
}

Takeaways

The Future API is not so much a problem than the Java language itself. As soon as the code is translated into Kotlin, readability improves a lot. Yet, having to create a collection to pass to the executor service breaks the nice functional pipeline.

On the coroutines side, remember that they are still experimental. Despite that, code does look sequential - and is thus more readable, and behaves parallel.

The complete source code for this post can be found on Github in Maven format.
Nicolas Fränkel

Nicolas Fränkel

Developer Advocate with 15+ years experience consulting for many different customers, in a wide range of contexts (such as telecoms, banking, insurances, large retail and public sector). Usually working on Java/Java EE and Spring technologies, but with focused interests like Rich Internet Applications, Testing, CI/CD and DevOps. Also double as a trainer and triples as a book author.

Read More
Concurrency: Java Futures and Kotlin Coroutines
Share this