/ STREAM PROCESSING, JET, SOURCES, SINKS

Stream processing: sources and sinks

Last week, we had a look at how stream processing could help us compute mathematical approximations using Hazelcast Jet. We described the Pipeline API, and mentioned that it "read" from a source and "wrote" to a sink. This week, I’d like to detail those concepts, what out-of-the-box sinks and sources are available in Jet, and how to create your own should the need be.

This is the 3rd post in the Stream Processing focus series.Other posts include:

  1. Introduction to stream processing
  2. Stream processing for computing approximations
  3. Stream processing: sources and sinks (this post)

Sources and sinks

I think that most of us are familiar with the batch model. In big companies, it’s been used for ages to read from a database, transform the data, and write to another database: this is known as ETL.

However, databases are not the only place to read from and write to. One other legacy but widespread use-case is to offer a FTP server for other parties to store files to. Then, a daemon process watches if files matching a specific pattern is present: if at least one is found, it reads and process the data, and proceeds to rename/move/delete the file.

In Java EE environments, data can also be read from/written to JMS queues. In that regard, Hazelcast IMDG is also an option.

Source and sink are terms that are broad enough to encompass the diversity of components possible. Those terms apply to both the batching model and the streaming model.

Jet sources and sinks

Out-of-the-box, Jet provides a set of sources and sinks. They are available respectively in the com.hazelcast.jet.pipeline.Sources and com.hazelcast.jet.pipeline.Sinks classes.

Some of them are related to Hazelcast IMDG, most are not:

Source Sink

JMS

Queue

Topic

SQL Database

Query

Statement

Hazelcast

Local/remote Cache

Local/remote IList

Local/remote IMap

Folder

Socket

Standard out

Custom sources

One is not limited to the out-of-the-box provided sources and sinks only. If the desired source/sink is missing, it’s possible to create one’s own using provided abstractions.

Here’s the model relevant to design a custom source:

Jet’s custom source builder class diagram'

batch() is for a bounded collection of items, stream() for an unbounded one.

A sample custom source

The web has a lot of data, mostly available through web services. A scenario where one repeatedly sends requests to the same endpoint to get data is very frequent. Let’s create such a custom Source, using Fuel, a Kotlin library for REST calls.

REST call

fun remoteService(url: String) = SourceBuilder
  .stream("http-source", DummyContext())                                 (1)
  .fillBufferFn(RestSource(url))
  .build()

class RestSource(private val url: String) : BiConsumerEx<Unit, SourceBuffer<String>> {
  override fun acceptEx(unit: Unit, buffer: SourceBuffer<String>) {        (2)
    val (_, _, result) = Fuel.get(url).header().responseString()         (3)
    when (result) {
      is Result.Failure -> println(result.getException())
      is Result.Success -> buffer.add(result.get())                       (4)
    }
  }
}

class DummyContext : FunctionEx<Processor.Context, Unit> {               (1)
    override fun applyEx(ctx: Processor.Context) {}
}
1 Create a stream using a context. The context can be mutable, so as to store state during the iterations. Here, we don’t need to pass anything, so we have an dummy context
2 The function accepts two parameters: the first is the context, the second is a SourceBuffer to append to. The buffer will be used by Jet to get the data from
3 Execute the request and assign the result into multiple variables - this is Kotlin’s destructuring declaration in action
4 If the call executed successfully, add the result to the buffer

Using a delay

A lot of publicly available web services enforce some kind of access restriction: it can relate to authentication, rate limiting, etc.

Imagine we need to restrict calls to comply with a rate limiting policy: to avoid getting 429 HTTP status responses, we should wait until a certain time has elapsed before executing the next request. This is the time when an object storing the state becomes necessary. Because Java 8 Date/Time API only provides immutable objects, let’s design a mutable object around it:

class TimeHolder : Serializable {
  var value : Instant = Instant.now()                    (1)
    private set                                          (2)
  fun reset() {
    value = Instant.now()                                (3)
  }
}
1 The initial instant is set to now
2 Enforce encapsulation so that only the class itself can set value
3 Reset value to now

The context creation function needs to be updated accordingly:

class CreateContext : FunctionEx<Processor.Context, TimeHolder> {
  override fun applyEx(ctx: Processor.Context) = TimeHolder()
}

At this point, the wait logic can be used in the RestSource:

class RestSource(private val url: String) :
    BiConsumerEx<TimeHolder, SourceBuilder.SourceBuffer<String>> {
  override fun acceptEx(time: TimeHolder, buffer: SourceBuilder.SourceBuffer<String>) {
    if (Instant.now().isAfter(time.value.plusSeconds(30))) {          (1)
      val (_, _, result) = Fuel.get(url).header().responseString()
      when (result) {
        is Result.Failure -> println(result.getException())
        is Result.Success -> buffer.add(result.get())
      }
      time.reset()                                                    (2)
    }
  }
}
1 Create a guard to check whether the time elapsed is at least 30 seconds after the previous request
2 Store the current time to be available for the next check

Conclusion

Hazelcast Jet provides some sources and sinks out-of-the-box for most common use-cases. In addition, it also offers a low-level API to create one’s own in case none fits one’s needs.

Nicolas Fränkel

Nicolas Fränkel

Developer Advocate with 15+ years experience consulting for many different customers, in a wide range of contexts (such as telecoms, banking, insurances, large retail and public sector). Usually working on Java/Java EE and Spring technologies, but with focused interests like Rich Internet Applications, Testing, CI/CD and DevOps. Also double as a trainer and triples as a book author.

Read More
Stream processing: sources and sinks
Share this