Last week, we had a look at what is stream processing. In this post, I’d like to address one possible use-case for stream processing: the computation of mathematical approximations.

This is the 2^{nd} post in the Stream Processing focus series.Other posts include:

- Introduction to stream processing
*Stream processing for computing approximations*(this post)- Stream processing: sources and sinks

## Approximations with a computer

In mathematics, some computations don’t return an exact result: they constantly compute over the previous computed result to get closer to the final value. Series belong to this category. For example, the alternating harmonic series is computed as the following:

S(n) = 1 - 1/2 + 1/3 - 1/4 + 1/5 - 1/6 ...

When `n`

is big enough, `S`

gets closer to `ln 2`

. One can prove it, but that means one already knows the model the data conforms to.

With a computer and basic programming skills, it’s trivial to code this computation:

```
var s = 0.0
(1..200)
.forEach {
if (it % 2 == 0) s -= 1 / it.toDouble()
else s += 1 / it.toDouble()
}
println(s)
```

Or if one prefers an immutable version:

```
fun main() {
(1..200)
.map {
if (it % 2 == 0) - 1 / it.toDouble()
else 1 / it.toDouble()
}
.sum()
.also { println(it) }
}
```

This works well, but there’s an issue. If the precision is not accurate enough for one’s need, one need to change the upper bound from 200 to a higher value and execute the code again. On the other side, one might waste precious computing power in unnecessary computations.

## The streaming model to the rescue?

If would be great if the code could run until the desired precision is reached.

That’s what the following code does:

```
var s = 0.0
var i = 1
while(true) {
if (i % 2 == 0) s -= 1 / i.toDouble()
else s += 1 / i.toDouble()
i++
println(s)
}
```

If the mutable version didn’t suit you before, then I admit this snippet is even worse.

Let’s walk the immutable way. It’s possible to generate an infinite sequence of numbers to simulate an infinite loop:

```
generateSequence(1 to 0.0) {
(it.first + 1) to
if (it.first % 2 == 0) (it.second - 1 / it.first.toDouble())
else it.second + 1 / it.first.toDouble()
}.map {
println(it.second)
it.second
}.sum()
```

Obviously, the `map()`

function is used in a non-standard way: not only to peek the current value, but also to print it, which doesn’t conform to basic FP principles.

Some capabilities are missing:

- How to compute the intermediate sum at each step?
- How to peek the computed sum in order to decide when to stop?
- How to map-reduce algorithms that are compatible with such processing?

## Hazelcast Jet

Hazelcast Jet is a stream processing engine, based on Hazelcast In-Memory Data Grid. It provides a distributed in-memory model, as well as an improved API.

### In a few words

Jet **draws** items from a source, and **drains** them to a sink. Between the two, a number of standard operations can happen: transforming, filtering, anything one can do with Java Streams, and much more.

Jet offers two base models: a **batch** model for the processing of a bounded number of items, and a **stream** model for the processing of an infinite number of them. Going from one model to the other is quite straightforward.

This is a simplified class diagram of Jet:

This is a sample of Jet’s code:

```
fun main(args: Array<String>) {
with(Pipeline.create()) {
```**(1)**
drawFrom(Sources.files("/opt/data/hazelcast")) **(2)**
.drainTo(Sinks.logger()) **(3)**
Jet.newJetInstance().newJob(this) **(4)**
}
}

1 | Create the pipeline |

2 | Configure it to read all files from a folder |

3 | Configure it to write to the console |

4 | Launch a new job with the pipeline |

### A sample problem

To see how Jet works, let’s model the Buffon’s needle problem:

Suppose we have a floor made of parallel strips of wood, each the same width, and we drop a needle onto the floor. What is the probability that the needle will lie across a line between two strips?

The solution, in the case where the needle length is not greater than the width of the strips, can be used to design a Monte Carlo method for approximating the number π, although that was not the original motivation for de Buffon’s question.

While the board has two dimensions, only one is interesting to us. Besides, it’s enough to just have one lower and one upper strip.

`class Board(val size: Int)`

The needle should have a size as well:

`class Needle(private val size: Int)`

Now, we drop the needle on the board, and check whether is intersects a strip:

```
class Needle(private val size: Int) {
fun drop(board: Board): Boolean {
val angle = Math.toRadians(Random.nextDouble(90.0))
```**(1)**
val position = Random.nextDouble(board.size.toDouble()) **(2)**
val normalizedSize = (size / 2.toDouble()) * sin(angle) **(3)**
val bottom = position - normalizedSize
val top = position + normalizedSize
return bottom <= 0 || top >= board.size **(4)**
}
}

1 | Randomly choose the angle of the dropped needle between 0° and 90° |

2 | Randomly choose the position of the dropped needle between 0 and the size of the board |

3 | Normalize the length of the needle using basic trigonometric |

4 | Checks whether one extremity of the needle intersects the lower bound - 0, or the upper bound - the size of the board |

Now is time to create the pipeline:

```
with(Pipeline.create()) {
val board = Board(args[0].toInt())
```**(1)**
drawFrom(needles(board)) **(2)**
.withoutTimestamps() **(3)**
.map { it.drop(board) } **(4)**
.rollingAggregate(pi()) **(5)**
.drainTo(Sinks.logger())
Jet.newJetInstance().newJob(this)
}

1 | Create the board |

2 | Use a custom source builder to generate an infinite stream of needles |

3 | Timestamps are not necessary, but it should be explicit |

4 | Transform each needle into a drop i.e. boolean values |

5 | See below |

The `rollingAggregate()`

requires an aggregate operation. The `reducing()`

functions returns such an object. It requires the following:

- An accumulator
- A function that puts a streamed item in the accumulator
- Another function that merges two accumulators together

`open class Aggregator( `**(1)**
private val drops: Long,
private val intersections: Long) : Serializable {
operator fun plus(aggregator: Aggregator) = **(2)**
Aggregator(drops + aggregator.drops,
intersections + aggregator.intersections)
private val pi: BigDecimal
get() = if (intersections == 0L) BigDecimal.ZERO **(3)**
else BigDecimal(drops).divide(BigDecimal(intersections), context)
override fun toString() = "Aggregator(pi= $pi, drops= $drops)"
}
object StartAggregator : Aggregator(0L, 0L) **(4)**
object IntersectAggregator : Aggregator(1L, 1L) **(5)**
object NonIntersectAggregator : Aggregator(1L, 0L) **(6)**

1 | Accumulator that keeps track of the number of drops and intersections |

2 | Combine two accumulators together by summing the their drops and intersections |

3 | The more the drops, the closer to the value of π |

4 | Empty accumulator, no drops and no intersections |

5 | Accumulator with one drop and one intersection |

6 | Accumulator with one drop but no intersection |

## Conclusion

Computing an approximated result is a great use-case for stream processing. In this post, we had a look at Hazelcast Jet. We also used its API to model the Buffon’s needle problem. In the next post, we will describe in details some common sources and sinks, as well as how to create custom ones.