/ RX, TEST, THREAD

Creative Rx usage in testing setup

I don’t know if events became part of software engineering since Graphical-User Interface interactions, but for sure they are a very convenient way of modeling them. With more and more interconnected systems, asynchronous event management has become an important issue to tackle. With Functional-Programming also on the raise, this gave birth to libraries such as RxJava. However, modeling a problem as the handling of a stream of events shouldn’t be restricted to system events handling. It can also be used in testing in many different ways.

One common use-case for testing setup is to launch a program, for example an external dependency such as a mock server. In this case, we need to wait until the program has been successfully launched. On the contrary, the test should stop as soon as the external program launch fails. If the program has a Java API, that’s easy. However, this is rarely the case and the more basics API are generally used, such as ProcessBuilder or Runtime.getRuntime().exec():

ProcessBuilder builder;

@BeforeMethod
protected void setUp() throws IOException {
    builder = new ProcessBuilder().command("script.sh");
    process = builder.start();
}

@AfterMethod
protected void tearDown() {
    process.destroy();

}

The traditional way to handle this problem was to put a big Thread.sleep() just after the launch. Not only was it system dependent as the launch time changed from system to system, it didn’t tackle the case where the launch failed. In this later case, precious computing time as well as manual relaunch time were lost. Better solutions exist, but they involve a lot of lines of code as much at some (or a high) degree of complexity. Wouldn’t it be nice if we could have a simple and reliable way to start the program and depending on the output, either continue the setup or fail the test? Rx to the rescue!

The first step is to create an Observable around the launched process’s input stream:

Observable<String> observable = Observable.create(subscriber -> {

    InputStream stream = process.getInputStream();

    try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) {
        String line;
        while ((line = reader.readLine()) != null) {
            subscriber.onNext(line);
        }
        subscriber.onCompleted();

    } catch (Exception e) {
        subscriber.onError(e);
    }
});

In the above snippet:

  • Each time the process writes on the output, a next event is sent
  • When there is no more output, a complete event is sent
  • Finally, if an exception happens, it’s an error event

By Rx definition, any of the last two events mark the end of the sequence.

Once the observable has been created, it just needs to be observed for events. A simple script that emits a single event can be listened to with the following snippet:

BlockingObservable<String> blocking = observable.toBlocking();

blocking.first();

The thing of interest here is the wrapping of the Observable instance in a BlockingObservable. While the former can be combined together, the later adds methods to manage events. At this point, the first() method will listen to the first (and single) event.

For a more complex script that emits a random number of regular events terminated by a single end event, a code snippet could be:

BlockingObservable<String> blocking = observable
    .filter("Done"::equals)
    .toBlocking();

blocking.first();

In this case, whatever the number of regular events, the filter() method provides the way to listen to the only event we are interested in.

Previous cases do not reflect the reality, however. Most of the time, setup scripts should start before and run in parallel to the tests i.e. the end event is never sent - at least until after tests have finished. In this case, there are some threading involved. Rx let it handle that quite easily:

BlockingObservable<String> blocking = observable
    .subscribeOn(Schedulers.newThread())
    .take(5)
    .toBlocking();

blocking.first();

There is a simple difference there: the subscriber will listen on a new thread thanks to the subscribeOn() method. Alternatively, events could have been emitted on another thread with the observeOn() method. Note I replaced the filter() method with take() to pretend to be interested only in the first 5 events.

At this point, the test setup is finished. Happy testing!

Sources for this article can be found here in IDEA/Maven format.

Nicolas Fränkel

Nicolas Fränkel

Developer Advocate with 15+ years experience consulting for many different customers, in a wide range of contexts (such as telecoms, banking, insurances, large retail and public sector). Usually working on Java/Java EE and Spring technologies, but with focused interests like Rich Internet Applications, Testing, CI/CD and DevOps. Also double as a trainer and triples as a book author.

Read More
Creative Rx usage in testing setup
Share this