envelopmenuskypeburger-menulink-externalfacebooktwitterlinkedin2crossgithub-minilinkedin-minitwitter-miniarrow_rightarrow_leftphonegithubphone-receiverstack-overflow

Reactive Streams in Scala: Akka Streams vs Monix - part 3

This is the third and final part of the Reactive Streams in Scala: Akka Streams vs Monix series. Previous installments can be located under the following:

So far we've learned what a typical processing pipeline looks like, and how to implement its building blocks. Now it's high time we put everything together and run the import process.

Combining multiple building blocks

Both Akka Streams and Monix provide pretty straightforward ways to chain several small building blocks together into a composite one.

Akka Streams

In Akka Streams, this can be achieved by using the via method on a Flow:

val processSingleFile: Flow[File, ValidReading, NotUsed] =
  Flow[File].via(parseFile).via(computeAverage)

Monix

In Monix, there's the transform method in Observable:

val processSingleFile: Transformer[File, ValidReading] = 
  _.transform(parseFile).transform(computeAverage)

Note that, in both cases, the chaining APis are type-safe, i.e. you won't be able to connect a block with an output of type A to a block with an input type of B.

Executing the pipeline

To process our data using our pipeline, we need to create a producer of files. Then we will be able to connect the producer to the processSingleFile, which would then be connected to the storeReadings consumer.

There's one more thing we'd like to do - balance the file processing across a number of workers determined by the concurrentFiles parameter.

Akka Streams

To implement load balancing in Akka Streams, we would need to go a bit deeper and use the lower-level GraphDSL API to build a custom load-balancing building block:

val balancer = GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val balance = builder.add(Balance[File](concurrentFiles)) // 1
  val merge = builder.add(Merge[ValidReading](concurrentFiles)) // 2

  (1 to concurrentFiles).foreach { _ =>
    balance ~> processSingleFile ~> merge // 3
  }

  FlowShape(balance.in, merge.out)
}

We can notice two built-in stages here:

  • Balance - which has a single input and concurrentFiles outputs,
  • Merge - which has concurrentFiles inputs and a single output.

It takes the following steps to create the balancer:

  1. Add a Balance stage to split the work across a number of workers.
  2. Add a Merge stage to collect the results from the workers.
  3. Connect each output of the Balance stage to the worker, the connect the worker output to on of the inputs of Merge; the connections are made using the ~> operator from the GraphDSL.

Note that, again, the the connections are type-safe. Moreover, when the graph is materialized, the entire structure is checked for correctness, so that you know that you have e.g. an unconnected input/output - which results in a runtime error.

Having the load balancer ready, we can now put the pipeline together:

Source(importDirectory.listFiles.toList)
  .via(balancer)
  .runWith(storeReadings)

The return type of runWith is the same as that of the Sink we're using, so here it's going to be a Future[Done] - which lets you handle the completion/failure of the processing as you usually do with Futures.

Monix

In Monix, there's nothing similar to Akka Streams' GraphDSL - we can just manipulate the Observables. Fortunately, this is still enough to implement load balancing:

Observable
  .fromIterable(files)
  .bufferTumbling(concurrentFiles) // 1
  .flatMap { fs =>
    Observable.merge(fs.map(f => Observable.now(f).transform(processSingleFile)): _*) // 2, 3
  }
  .consumeWith(storeReadings)
  .runAsync

The important steps here are:

  1. Grouping the files into batches of size concurrentFiles.
  2. Mapping every batch of files into a batch of Observables (the fs.map(...) part) that do the actual processing of each file.
  3. Merging the Observables into a single one (with Observable.merge), which is then attached to the consumer.

Note that we need to call two methods to actually run the processing, since consumeWith returns a Task, which is lazy by nature and needs to be run explicitly with runAsync. The latter returns a CancellableFuture - a Monix extension of the Scala's Future.

Summary

In this last part of the Akka Streams vs Monix series you have learned how to put all the processing stages together, and how to combine them with producers and consumers of data to actually run the pipeline.

Overall, what you have seen in this series is that although both Akka Streams and Monix are both an implementation of the Reactive Streams concept, they were built with somewhat different approaches in mind.

Akka Streams has a strong focus on modularity, therefore the API encourages to design self-contained, reusable building blocks in the form of Flows. You also need to explicitly provide a way to run the entire processing graph by providing a Materializer.

In Monix, on the other hand, there's no notion of and independent building block, but you can still achieve something similar - by defining and then combining plain functions that transform the Observables. Monix also hides the details of how the pipeline is run, since it doesn't expose anything like the Akka Streams' materializer.

You could also have noticed that - at least in some areas - the Monix API is a bit more high-level. This is the case e.g. with I/O operations like streaming the contents of a file, where you have a single method to do it, contrary to Akka Streams, where you need to go a bit deeper into the stream implementation and e.g. implement the delimiting yourself.

Last but not least, although in Akka Streams it's theoretically possible to use any materializer and although there is an alternative implementation base on Apache Gearpump, the built-in, actor-based materializer remains the de facto standard. So, despite the theoretical flexibility, the need to explicitly provide a materializer introduces unnecessary boilerplate.

If you'd like to experiment further on your own, the full working examples for both libraries are available on GitHub: the Akka Streams version and the Monix one. Both repositories include a Readme, which, apart from the running instructions, contains a guide to generating your own test data.

And, obviously, I'm eager to hear your comments.

scala times
Interested in Scala news?

Subscribe to Scala Times Newspaper delivered weekly by SoftwareMill straight to your inbox.