/ EXERCISE, PROGRAMMING, STYLE, MAPREDUCE

Exercises in MapReduce Style

In the last episode of Exercises in Programming Style, we solved the word frequency problem with the Hazelcast library. This time, we are going to use the MapReduce approach for that.

This is the 19th post in the Exercises in Programming Style focus series.Other posts include:

  1. Introducing Exercises in Programming Style
  2. Exercises in Programming Style, stacking things up
  3. Exercises in Programming Style, Kwisatz Haderach-style
  4. Exercises in Programming Style, recursion
  5. Exercises in Programming Style with higher-order functions
  6. Composing Exercises in Programming Style
  7. Exercises in Programming Style, back to Object-Oriented Programming
  8. Exercises in Programming Style: maps are objects too
  9. Exercises in Programming Style: Event-Driven Programming
  10. Exercises in Programming Style and the Event Bus
  11. Reflecting over Exercises in Programming Style
  12. Exercises in Aspect-Oriented Programming Style
  13. Exercises in Programming Style: FP & I/O
  14. Exercises in Relational Database Style
  15. Exercises in Programming Style: spreadsheets
  16. Exercises in Concurrent Programming Style
  17. Exercises in Programming Style: sharing data among threads
  18. Exercises in Programming Style with Hazelcast
  19. Exercises in MapReduce Style (this post)
  20. Conclusion of Exercises in Programming Style

MapReduce in a few words

MapReduce is a process that consists of two steps:

  1. Map: Performs transformations, filtering and sorting into different "queues"
  2. Reduce: Aggregates the content of the "queues" into a result

The biggest benefit of MapReduce is that both map and reduce steps can be both potentially executed in parallel. This makes it a great fit to handle large data sets.

The following diagram helps visualize the overall flow:

MapReduce activity diagram

While MapReduce make parallelism possible, it’s not mandatory to implement it: parallelism is only an option. It’s used neither in the original Python code, nor in the Kotlin port.

Migrating to Kotlin

The reference code uses the Python yield keyword: with it, functions don’t return simple collections of items, but streams instead. While the code looks quite the same, what happens under the hood is different. Performance improves when compared to standard collections, in proportion to the number of items associated within the stream. While in Java, streams are implemented using Stream, while in Kotlin, they are using Sequence.

To do that, Kotlin provides the Iterable<T>.asSequence() extension function. For a more in-depth understanding of collections and sequences in Kotlin, and how they relate to Java streams, please check this earlier post.

Applied to the problem at hand, the processing pipeline looks like the following:

Data processing pipeline, from filename to hash map

This translates into the following code:

fun run(filename: String): Map<String, Int>  = read(filename)
  .asSequence()
  .chunked(200)
  .map(::splitWords)
  .reduce { acc, pair -> countWords(acc, pair) }
  .sortedBy { it.second }
  .takeLast(25)
  .toMap()

An overview of reduction

The map() function has been amply used in previous posts of this series. Besides, I believe most developers are already familiar with it, in one way or another.

The reduce() function, on the other hand, is much less understood, even if it’s used as much. For example, in Java, streams' terminal operations are reduction functions: those include sum(), average(), max() but also collect()! From collect() Javadoc:

Performs a mutable reduction operation on the elements of this stream using a Collector.

Collect functions are only specializations of the more general reduce() functions provided by Stream:

Stream class diagram focusing on reduce

As seen in the diagram, reduction is made available in three "flavors":

  1. The first flavor accepts a single BinaryOperator parameter. A BinaryOperator<T> accepts two parameters of type T, and combines them together to return a T. Note that because the starting stream might be empty, the method returns an Optional<T>.

    For example, the following snippet sums the items in the stream:

    Stream.of(1, 2, 3, 4, 5)
          .reduce { a, b -> a + b }
  2. The second flavor is similar to the first one, but requires a starting value. For this reason, the return type is not an Optional<T> but a T. If the stream is empty, the result will be the starting value. If it isn’t, it should be the same as with the previous flavor, provided the starting value is the neutral element for the reduction function.
    Stream.of(1, 2, 3, 4, 5)
          .reduce { a, b -> a + b }
  3. The third and last flavor additionally allows to change the returned type, at the cost of more complexity

In Kotlin, reduction is quite similar what exists in Java. There’s a slight difference, though: the function that accepts a starting value is called fold(), the one that doesn’t is called reduce(). In addition, both of them also provide another signature that provides an index for the current item.

Signature Starting value Indexed

<S, T : S> reduce(op: (S, T) → S): S

<S, T : S> reduceIndexed(op: (Int, S, T) → S): S

<T, R> fold(initial: R, op: (R, T) → R): R

<T, R> foldIndexed(initial: R, op: (Int, R, T) → R): R

The no-starting value function above working on empty sequences will throw an exception at runtime. This is one of the few places where I believe the Kotlin API pales compared to Java’s. I’ve raised the point on Kotlin’s Slack, there might be a new xxxOrNull() coming in one of the future version of the stdlib.

The implemented functions in Kotlin

The mapping function just creates pairs of words with count of 1 from a list of lines:

fun splitWords(lines: Iterable<String>): Iterable<Pair<String, Int>> {
  fun Iterable<String>.scan() = this
    .flatMap { it.toLowerCase().split("\\W|_".toRegex()) }
    .filter { it.isNotBlank() && it.length >= 2 }
  fun Iterable<String>.removeStopWords(): Iterable<String> {
    val stopWords = read("stop_words.txt")
      .flatMap { it.split(",") }
    return this - stopWords
  }
  return lines.scan()
    .removeStopWords()
    .map { it to 1 }
}

I believe this is pretty straightforward. The reducing function combines two iterables of word frequencies together to create a new iterable.

fun countWords(frequencies1: Iterable<Pair<String, Int>>,
               frequencies2: Iterable<Pair<String, Int>>): Iterable<Pair<String, Int>> {
  val results = mutableMapOf<String, Int>()
  frequencies1.forEach {
    results.merge(it.first, it.second) {
      count, value -> count + value
    }
  }
  frequencies2.forEach {
    results.merge(it.first, it.second) {
      count, value -> count + value
    }
  }
  return results.toList()
}

It’s crude, but it works. Sadly, I found no other elegant way to achieve merging. Proposals welcome!

A little hiccup along the way

That could be the end, but it’s unfortunately not. Most tests succeed, for the smallest data set - when the frequency is one for each word, and for the largest one - pride and prejudice. Yet, it fails for the following sample:

White tigers live mostly in India
Wild lions live mostly in Africa

The reason behind this is that the mapping function assigns a default frequency of 1, but read lines are separated into chunks of words of size 200 (see above). Since the sample is not big enough, the countWords() reducing function is not applied. Hence, pairs of word/1 are not merged together, and the resulting map has a frequency of 1 for each word.

In order to return the correct result, one should first check the sample size before sending it through the MapReduce process.

Conclusion

MapReduce is nothing mind blowing. Most developers already use it, without knowing. The only challenge is to code the correct mapping and reducing functions.

Also, one should be extra careful about the input size, and how it’s processed, because if it’s too small, the reducing function won’t be necessary and its code won’t be executed. In most scenarios, however, this shouldn’t be an issue as MapReduce is targeted at large data sets.

The complete source code for this post can be found on Github.
Nicolas Fränkel

Nicolas Fränkel

Nicolas Fränkel is a Developer Advocate with 15+ years experience consulting for many different customers, in a wide range of contexts (such as telecoms, banking, insurances, large retail and public sector). Usually working on Java/Java EE and Spring technologies, but with focused interests like Rich Internet Applications, Testing, CI/CD and DevOps. Currently working for Hazelcast. Also double as a teacher in universities and higher education schools, a trainer and triples as a book author.

Read More