/ EXERCISE, PROGRAMMING, STYLE

Exercises in Programming Style: sharing data among threads

Last week, we solved the word count problem using the Actor model: objects running on different threads and communicating through messages. This week, we will drop objects, and use data structures that are shared among the threads: such a shared structure is called data space in the book.

This is the 17th post in the Exercises in Programming Style focus series. Other posts include:

  1. Introducing Exercises in Programming Style
  2. Exercises in Programming Style, stacking things up
  3. Exercises in Programming Style, Kwisatz Haderach-style
  4. Exercises in Programming Style, recursion
  5. Exercises in Programming Style with higher-order functions
  6. Composing Exercises in Programming Style
  7. Exercises in Programming Style, back to Object-Oriented Programming
  8. Exercises in Programming Style: maps are objects too
  9. Exercises in Programming Style: Event-Driven Programming
  10. Exercises in Programming Style and the Event Bus
  11. Reflecting over Exercises in Programming Style
  12. Exercises in Aspect-Oriented Programming Style
  13. Exercises in Programming Style: FP & I/O
  14. Exercises in Relational Database Style
  15. Exercises in Programming Style: spreadsheets
  16. Exercises in Concurrent Programming Style
  17. Exercises in Programming Style: sharing data among threads (this post)
  18. Exercises in Programming Style with Hazelcast
  19. Exercises in MapReduce Style
  20. Conclusion of Exercises in Programming Style

Modelling the data space

The original Python code uses two dedicated data spaces:

  1. To store the words read from the source file,
  2. To store the word frequencies

The Python code uses queues to model the data spaces. Hence, it makes sense to do the same in Kotlin. However, the Java API offers lots of choices, as this abridged diagram shows:

Java Queue class diagram

Let’s describe the queue and the blocking queue.

Queue

In Java, the Queue interface has no definite semantics regardering the ordering of elements: it can be FIFO, LIFO or something completely different, such as based on a priority attribute.

Queues offer two different ways to add, check and remove elements: one throws an exception if the operation fails, the other returns a specific value. For example, removing an element might fail if the queue is empty.

FeatureExceptionSpecial value

Checks an element

element(): E

peek(): E

Adds an element

add(e: E): boolean

offer(e: E): boolean

Removes an element

remove(e: E): boolean

poll(e: E): boolean

BlockingQueue

A blocking queue is:

A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.
— JavaDoc
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html

It adds two different ways to achieve the above operations: a blocking one and one with timeout.

FeatureBlockingTimeout

Adds

put(e: E): boolean

offer(e: E, long timeout, TimeUnit unit): boolean

Removes

take(): E

poll(long timeout, TimeUnit unit): E

There are two out-of-the-box implementations of blocking queue of interest:

  1. ArrayBlockingQueue backed by a simple array
  2. LinkedBlockingQueue using nodes linked together

Porting the Python code

The straightforward porting of the original Python code relies on the same design: mutable data structures and timeout.

The implementation of the main function goes like this:

fun run(filename: String): Map<String, Int> {
    val freqSpace = LinkedBlockingQueue<Map<String, Int>>()
    val wordSpace = read(filename)
        .flatMap { it.toLowerCase().split("\\W|_".toRegex()) }
        .filter { it.isNotBlank() && it.length >= 2 }
        .toBlockingQueue()
    val count = 4
    val executorService = Executors.newFixedThreadPool(count)
    val callables = IntRange(1, 4).map { _ ->
        { processWords(wordSpace, freqSpace) }                   (1)
    }.map { Executors.callable(it) }                             (2)
    executorService.invokeAll(callables)                         (3)
    val frequencies = mutableMapOf<String, Int>()                (4)
    while (freqSpace.isNotEmpty()) {                             (5)
        val partial = freqSpace.poll(1, TimeUnit.SECONDS)        (5) (6)
        partial?.entries?.forEach {                              (5)
            frequencies.merge(it.key, it.value) {                (5)
              count, value -> count + value                      (5)
            }
        }
    }
    return frequencies
        .toList()
        .sortedByDescending { it.second }
        .take(25)
        .toMap()
}

fun <E> List<E>.toBlockingQueue() = LinkedBlockingDeque<E>(this)
1Wrap the processWord() function in a lambda
2Transform a Callable-compatible lambda to a Runnable
3Start to run the threads, waiting for the last one to finish
4Mutable map to store the final results
5Empty the queue and collect results in the map
6Remove elements from the queue using the timeout flavor
fun processWords(words: BlockingQueue<String>,
                 frequencies: BlockingQueue<Map<String, Int>>) {
  val stopWords = read("stop_words.txt")
    .flatMap { it.split(",") }
  val wordFreq = mutableMapOf<String, Int>()
  while (words.isNotEmpty()) {                                   (1)
    val word = words.poll(1, TimeUnit.SECONDS)                   (1) (2)
    if (word != null && !stopWords.contains(word))               (1)
      wordFreq.merge(word, 1) {                                  (1)
        count, value -> count + value                            (1)
      }
  }
  frequencies.put(wordFreq)
}
1As above, empty the queue and collect the results in a map
2Likewise, remove elements using a timeout

Introducing concurrent hash maps

The above code, just as the original Python solution, has an issue: it uses a queue to store maps of partial word frequencies. Then, those partial results need to be combined together to get the final word frequencies. Why not compute the count in each thread?

We need a thread-safe map: for this, the Java API offers the concurrent hash map. It offers or overrides methods that are thread-safe.

Java Concurrent Map class diagram

With concurrent hash maps, the processWords() function can be updated to:

fun processWords(words: BlockingQueue<String>,
                 frequencies: ConcurrentMap<String, Int>) {
  val stopWords = read("stop_words.txt")
    .flatMap { it.split(",") }
  while (words.isNotEmpty()) {
    val word = words.poll(1, TimeUnit.SECONDS)
    if (word != null && !stopWords.contains(word))
      frequencies.merge(word, 1) {
        count, value -> count + value
      }
  }
}

Since frequencies are now combined in the previous function, the run() function can be simplified:

val executorService = Executors.newFixedThreadPool(count)
val callables = IntRange(1, 4).map { _ ->
  { processWords(wordSpace, freqSpace) }
}.map { Executors.callable(it) }
executorService.invokeAll(callables)
return freqSpace
  .toList()
  .sortedByDescending { it.second }
  .take(25)
  .toMap()

The share-nothing benefit

Sharing data among threads requires to synchronize thread access. In turn, this requires locks and impacts performances. Concurrency experts now this: the best way to avoid concurrency issues is to avoid working on shared data when possible. In our case, the words space is shared among all threads for no good reason.

To avoid locks, we could split the words into lists, and send a list to a each thread to be processed separately:

val count = 4
val callables = words.chunked(words.size / count)           (1)
  .map { Runnable { processWords(it, freqSpace) }}          (2)
  .map { Executors.callable(it) }                           (3)
val executorService = Executors.newFixedThreadPool(count)   (3)
executorService.invokeAll(callables)                        (3)
1Create chunks of nearly-equivalent size out of the overall words list
2Map each chunk to a Runnable. Note that explicitly assigning the lambda to a Runnable is not strictly necessary, it improves readability
3The rest of the code is as before

Conclusion

Concurrent code is hard to write, hard to reason about, and bugs are more likely to occur. Java provides thread-safe dedicated data structures to lessen the burden, even a bit.

Finally, the best way to completely avoid those issues is to completely avoid sharing data. Be sure to have that as your fist option when possible.

The complete source code for this post can be found on Github.
Nicolas Fränkel

Nicolas Fränkel

Nicolas Fränkel is a Developer Advocate with 15+ years experience consulting for many different customers, in a wide range of contexts (such as telecoms, banking, insurances, large retail and public sector). Usually working on Java/Java EE and Spring technologies, but with focused interests like Rich Internet Applications, Testing, CI/CD and DevOps. Currently working for Hazelcast. Also double as a teacher in universities and higher education schools, a trainer and triples as a book author.

Read More