/ EXERCISE, PROGRAMMING, STYLE, HAZELCAST

Exercises in Programming Style with Hazelcast

The previous week, we shared data among threads to solve the now well-known word frequencies problem. The very next day, I joined Hazelcast as a Developer Advocate. Therefore, I thought it would be extremely interesting to use Hazelcast In-Memory-Data-Grid to improve on the previous solution.

This is the 18th post in the Exercises in Programming Style focus series. Other posts include:

  1. Introducing Exercises in Programming Style
  2. Exercises in Programming Style, stacking things up
  3. Exercises in Programming Style, Kwisatz Haderach-style
  4. Exercises in Programming Style, recursion
  5. Exercises in Programming Style with higher-order functions
  6. Composing Exercises in Programming Style
  7. Exercises in Programming Style, back to Object-Oriented Programming
  8. Exercises in Programming Style: maps are objects too
  9. Exercises in Programming Style: Event-Driven Programming
  10. Exercises in Programming Style and the Event Bus
  11. Reflecting over Exercises in Programming Style
  12. Exercises in Aspect-Oriented Programming Style
  13. Exercises in Programming Style: FP & I/O
  14. Exercises in Relational Database Style
  15. Exercises in Programming Style: spreadsheets
  16. Exercises in Concurrent Programming Style
  17. Exercises in Programming Style: sharing data among threads
  18. Exercises in Programming Style with Hazelcast (this post)
  19. Exercises in MapReduce Style
  20. Conclusion of Exercises in Programming Style

What’s an In-Memory-Data-Grid?

According to Wikipedia, a Data Grid is:

[…​] an architecture or set of services that gives individuals or groups of users the ability to access, modify and transfer extremely large amounts of geographically distributed data for research purposes

Let’s drop the research purposes. The important bits of the above definition is that a Data Grid contains large amounts of data and is distributed among nodes.

The main issue with such architectures is that it requires the following steps:

  1. Read data from disk
  2. Load it into memory (RAM)
  3. Process it there
  4. And finally write it back on disk

While the processing part is actually quite fast because it happens in-memory, the reading/writing part is not. Hence, a data grid’s overall performance is bound by the time it takes to access and read from/write to the storage. To better understand the issue, here are the generally accepted metrics at the time of this writing:

StorageAccess timeRead/write speed

Random Access Memory (RAM)

~5 ns

> 10 GB/s

Solid-State Disk (SSD)

< 1 millisecond

200 MB/s to +3.5 GB/s

Hard Drive Disk (HDD)

3-10 milliseconds

< 200 MB/s

If the disks parts could be removed, the data grid would be as fast as the memory in which it’s executed. That’s the whole point of an IMDG: trading off persistence for high speed and low latency.

Setting up Hazelcast IMDG

Hazelcast is able to run in two different modes:

  1. Client-server
  2. Embedded

The embedded mode is a breeze to set up. Just adding the Hazelcast core JAR to the application’s classpath makes it possible to start an embedded instance that will automatically join any other Hazelcast cluster. Note that while broadcasting is the default, it’s possible to be more selective by configuring which clusters can be joined.

With Maven, starting Hazelcast embedded is just a matter of adding a single dependency to the POM:

pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project>
  <!-- ... -->
  <dependencies>
    <!-- Other dependencies -->
    <dependency>
      <groupId>com.hazelcast</groupId>
      <artifactId>hazelcast</artifactId>
      <version>3.12.2</version>
    </dependency>
  <dependencies>
</project>

At that point, it’s possible to create the Hazelcast instance, and related data structures.

Distributed data structures and IMap

Hazelcast is all about distributed data structures: lists, sets, queues, maps, etc.

Among them, the IMap interface is of particular interest. In essence, one can think of IMap as a distributed version of Java’s ConcurrentMap. In fact, IMap inherits from ConcurrentMap.

IMap class diagram

In addition to the standard concurrent map’s features, IMap offers the following:

  • Transformation of all or only specific entries
  • A rich event-listener model around all standard operations
  • Statistics about the amp
  • A locking feature on specific entries
  • And much more…​

The design of Hazelcast’s API makes it so that one doesn’t instantiate distributed data structures, such as IMap. The entry point to get a handle on such data structures is through an Hazelcast instance. Here’s a simplified summary of the API:

Hazelcast instance class diagram

With that, it’s straightforward to create the Hazelcast instance and get handles on the map:

val hazelcastInstance = Hazelcast.newHazelcastInstance()        (1)
val freqSpace = hazelcastInstance.getMap<String, Int>("map")    (2)
1Create a new instance
2Get the handle on a distributed map

Migrating the code to use Hazelcast

There’s still only thing that needs to be updated to migrate to Hazelcast. One needs to get the executor service from the Hazelcast instance, instead of the one from the standard Java API. This translates into the following code:

val executorService = hazelcastInstance.getExecutorService("executorService")

There’s still one not-so-minor caveat. Since the map might be distributed among different JVMs - and that’s the whole point of distributed systems - stored objects need to be serializable: in Java, this means they need to implement the Serializable interface. Given the current state of the code, it’s not the case: neither the top-level Kotlin function nor the Callable wrapper around it are serializable. Hence, a full-fledged class implementing the necessary class hierarchy is required:

class ProcessWords(val words: List<String>) :
                         Callable<Unit>, Serializable, HazelcastInstanceAware { (1) (2)

  private lateinit var hazelcastInstance: HazelcastInstance

  override fun setHazelcastInstance(hazelcastInstance: HazelcastInstance) {     (1)
    this.hazelcastInstance = hazelcastInstance
  }

  override fun call() {                                                         (2)
    val frequencies = hazelcastInstance.getMap<String, Int>("map")              (3)
    val stopWords = read("stop_words.txt").flatMap { it.split(",") }
    words.forEach {
      if (!stopWords.contains(it))
        frequencies.merge(it, 1) { count, value ->
            count + value
        }
    }
  }
}

val callables = words.chunked(words.size / 4).map { ProcessWords(it) }          (2)
executorService.invokeAll(callables)                                            (4)
1HazelcastInstanceAware allows to inject…​ an Hazelcast instance. The IExecutorService will inject it when executing the task.
2The previous version a runnable. As explained above, it cannot be wrapped in a Callable using the standard Executors class. The Runnable.run() function needs to be changed to Callable.call(), but since there’s no need to return a result, the bound type can safely be Unit.
3Instead of using a shared concurrent hash map, get one from the injected Hazelcast instance
4There’s no need to change anything! As in the previous version, it will block until all threads have terminated

At this point, the code can be scaled by running on multiple JVMs. Granted, we still need to correctly handle the word input. That would just require the usage of an IList.

Forewarned is forearmed

As with other posts in this series, the above code is just an exercise that aims to showcase how easy it is to:

  1. Use Hazelcast embedded in one’s application
  2. Migrate from the standard java.util.concurrent API to the Hazelcast API

This is by no way a showcase of the performance one can gain by using Hazelcast, especially given the limited size of the test data sets. Just to begin with, if you run tests, you’ll notice that the initialization of Hazelcast takes some time when the JVM starts. Then, the merge() implementation is not Hazelcast specific, and defaults to the one of ConcurrentHashMap: it’s not distributed-friendly.

Perhaps such an optimization will be the subject of a future post.

Conclusion

Once a program correctly handles multi-threading, it’s very straightforward to migrate to Hazelcast. Its API maps exactly to the java.util.concurrent one!

While the abstraction is the same doesn’t mean the implementation is. One needs to understand the constraints brought by the usage of a distributed data structure.

The complete source code for this post can be found on Github.
Nicolas Fränkel

Nicolas Fränkel

Nicolas Fränkel is a Developer Advocate with 15+ years experience consulting for many different customers, in a wide range of contexts (such as telecoms, banking, insurances, large retail and public sector). Usually working on Java/Java EE and Spring technologies, but with focused interests like Rich Internet Applications, Testing, CI/CD and DevOps. Currently working for Hazelcast. Also double as a teacher in universities and higher education schools, a trainer and triples as a book author.

Read More