/ EXERCISE, PROGRAMMING, STYLE

Exercises in Concurrent Programming Style

In the Exercises in Programming style book, this chapter is called Actors. In software, the actor model is a very specific way to design code:

The actor model in computer science is a mathematical model of concurrent computation that treats "actors" as the universal primitives of concurrent computation. In response to a message that it receives, an actor can: make local decisions, create more actors, send more messages, and determine how to respond to the next message received. Actors may modify their own private state, but can only affect each other indirectly through messaging (obviating lock-based synchronization).

This is the 16th post in the Exercises in Programming Style focus series.Other posts include:

  1. Introducing Exercises in Programming Style
  2. Exercises in Programming Style, stacking things up
  3. Exercises in Programming Style, Kwisatz Haderach-style
  4. Exercises in Programming Style, recursion
  5. Exercises in Programming Style with higher-order functions
  6. Composing Exercises in Programming Style
  7. Exercises in Programming Style, back to Object-Oriented Programming
  8. Exercises in Programming Style: maps are objects too
  9. Exercises in Programming Style: Event-Driven Programming
  10. Exercises in Programming Style and the Event Bus
  11. Reflecting over Exercises in Programming Style
  12. Exercises in Aspect-Oriented Programming Style
  13. Exercises in Programming Style: FP & I/O
  14. Exercises in Relational Database Style
  15. Exercises in Programming Style: spreadsheets
  16. Exercises in Concurrent Programming Style (this post)
  17. Exercises in Programming Style: sharing data among threads
  18. Exercises in Programming Style with Hazelcast
  19. Exercises in MapReduce Style
  20. Conclusion of Exercises in Programming Style

The original Python code

As mentioned above, the original Python code is based upon the actor model. Every class is an actor, that inherits from Thread and that communicates with other actors through messages. Here’s the class diagram:

Original Python class diagram

Actors communicate with each other via messages, as seen in the previous post on event-driven programming. The event handling is implemented in the _dispatch() function.

The code of ActiveWFObject is the following:

def run(self):
    while not self._stopMe:
        message = self.queue.get()
        self._dispatch(message)
        if message[0] == 'die':
            self._stopMe = True

In essence, the run() function is a loop that:

  1. reads the message from the queue
  2. calls the _dispatch() function with the the received message as a parameter
  3. the die message stops the loop

Porting to Kotlin

The first step to porting the code to Kotlin is just a matter of mapping the Python classes one-to-one with Kotlin classes:

Kotlin class diagram

Python is a dynamically-typed language. Hence, in the original Python code, a message is an array where the first element is by a convention a String that defines the message’s "type". An improvement to benefit from the static typing nature of Kotlin is to create a message type hierarchy.

Message class diagram

Note that just like in the event-driven programming post, message types are defined in the class that spawns them.

Toward a more flexible design

The code of the Actor class that has been ported looks like the following:

abstract class Actor : Runnable {

  private val queue = ArrayDeque<Message>()
  private var stop = false
  internal var thread = Thread(this, this::class.simpleName).apply {    (1)
    start()
  }

  final override fun run() {
    while (!stop) {
      val message: Message? = queue.poll()
      if (message != null) {
        dispatch(message)
        if (message == Die) stop = true
      }
    }
  }

  abstract fun dispatch(message: Message)
  fun send(message: Message) = queue.add(message)
}
1 The thread starts when the object is instantiated

IMHO, binding the lifecycle of the object with the lifecycle of the thread is an issue. It makes the thread an implementation detail, while it’s definitely not. Moreover, it prevents to easily migrate to other concurrency models.

Hence, we should remove the thread property from the Actor class, and move the threading logic to the calling code:

val wordFrequencyController = WordFrequencyController()

createThread(wordFrequencyController) {
    start()
    join()
}

fun <T: Actor> createThread(actor: T) = Thread(actor, actor::class.simpleName)

Further refinements

At this point, it’s possible to use more advanced APIs, such as those provided by more recent Java versions.

Introducing the Executor Service

To migrate to more recent APIs, the first step is to stop using threads directly in favor of the executor service - available since Java 5. The executor service allows to return a Future, which allows to keep track of a task’s progress.

Moreover, Java 5 also provides the Executors class, a factory of different executor service types. For example, Executors.newFixedThreadPool() creates an executor service that…​ obviously makes a fixed number of threads available:

val dataStorageManager = DataStorageManager()
val stopWordManager = StopWordManager()
val wordFrequencyManager = WordFrequencyManager()
val wordFrequencyController = WordFrequencyController()

val executorService = Executors.newFixedThreadPool(4)
  listOf(dataStorageManager, stopWordManager,
         wordFrequencyManager, wordFrequencyController)
    .map { executorService.submit(it) }[3]
    .get()

return wordFrequencyController.getResult()

Introducing Callable

The next step is to realize that to get the result from the code in its current state, one needs to read a property:

class WordFrequencyController : Actor() {

  private lateinit var result: Map<String, Int>

  // Somehow fill in the result property

  fun getResult(): Map<String, Int> {
    return result
  }
}

However, this design is not optimal, as there’s a interface in the JDK that upholds this contract: java.util.concurrent.Callable returns a result when a task has finished. Hence, we could re-design our current class hierarchy to benefit from Callable:

Callable class diagram

The equivalent code is as follows:

class WordFrequencyController : Actor(), Callable<Map<String, Int>> {

  private lateinit var result: Map<String, Int>

  override fun call(): Map<String, Int> {
    loop()                                   (1)
    return result
  }
}
1 Calls dispatch() in turn, which fills in result

Introducing Callable Futures

The final step requires the usage of CompletableFuture, a specialized Future "that may be explicitly completed". It offers static functions that can accept an additional Executor parameter. It’s available since Java 1.8.

Callable class diagram

To take advantage of CompletableFuture, one needs to migrate from Supplier to Callable. Then, the code can be updated as follows:

with (Executors.newFixedThreadPool(4)) {
  CompletableFuture.runAsync(dataStorageManager, this)
  CompletableFuture.runAsync(stopWordManager, this)
  CompletableFuture.runAsync(wordFrequencyManager, this)
  return CompletableFuture.supplyAsync(wordFrequencyController, this).get()
}

Conclusion

The actor model is an important design based on messages and offering resiliency. In some technology stacks, it’s provided as a library, such as Akka for Scala. In Elixir, it seems to be a central part of the language API itself - I definitely need to dig in deeper into it.

In other contexts, actors need to be implemented manually. In such cases, one needs to be aware of some of their important features. For example, one of the major benefits of the actor model is to let individual actors crash, and automatically spawn new ones, providing resiliency through self-healing. That was not considered necessary in the above code.

When the language doesn’t provide actors out-of-the-box, there are several ways to implement them. Some ways are easier than others, depending on the tech stack: it’s beneficial to check new concurrency APIs that are brought by each language’s version.

The complete source code for this post can be found on Github in Maven format.
Nicolas Fränkel

Nicolas Fränkel

Developer Advocate with 15+ years experience consulting for many different customers, in a wide range of contexts (such as telecoms, banking, insurances, large retail and public sector). Usually working on Java/Java EE and Spring technologies, but with focused interests like Rich Internet Applications, Testing, CI/CD and DevOps. Also double as a trainer and triples as a book author.

Read More
Exercises in Concurrent Programming Style
Share this