Reactive Streams in Scala: Akka Streams vs Monix - part 3
This is the third and final part of the Reactive Streams in Scala: Akka Streams vs Monix series. Previous installments can be located under the following:
So far we've learned what a typical processing pipeline looks like, and how to implement its building blocks. Now it's high time we put everything together and run the import process.
Combining multiple building blocks
Both Akka Streams and Monix provide pretty straightforward ways to chain several small building blocks together into a composite one.
Akka Streams
In Akka Streams, this can be achieved by using the via
method on a Flow
:
val processSingleFile: Flow[File, ValidReading, NotUsed] =
Flow[File].via(parseFile).via(computeAverage)
Monix
In Monix, there's the transform
method in Observable
:
val processSingleFile: Transformer[File, ValidReading] =
_.transform(parseFile).transform(computeAverage)
Note that, in both cases, the chaining APis are type-safe, i.e. you won't be able to connect a block with an output of type A
to a block with an input type of B
.
Executing the pipeline
To process our data using our pipeline, we need to create a producer of files. Then we will be able to connect the producer to the processSingleFile
, which would then be connected to the storeReadings
consumer.
There's one more thing we'd like to do - balance the file processing across a number of workers determined by the concurrentFiles
parameter.
Akka Streams
To implement load balancing in Akka Streams, we would need to go a bit deeper and use the lower-level GraphDSL
API to build a custom load-balancing building block:
val balancer = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val balance = builder.add(Balance[File](concurrentFiles)) // 1
val merge = builder.add(Merge[ValidReading](concurrentFiles)) // 2
(1 to concurrentFiles).foreach { _ =>
balance ~> processSingleFile ~> merge // 3
}
FlowShape(balance.in, merge.out)
}
We can notice two built-in stages here:
Balance
- which has a single input andconcurrentFiles
outputs,Merge
- which hasconcurrentFiles
inputs and a single output.
It takes the following steps to create the balancer:
- Add a
Balance
stage to split the work across a number of workers. - Add a
Merge
stage to collect the results from the workers. - Connect each output of the
Balance
stage to the worker, the connect the worker output to on of the inputs ofMerge
; the connections are made using the~>
operator from theGraphDSL
.
Note that, again, the the connections are type-safe. Moreover, when the graph is materialized, the entire structure is checked for correctness, so that you know that you have e.g. an unconnected input/output - which results in a runtime error.
Having the load balancer ready, we can now put the pipeline together:
Source(importDirectory.listFiles.toList)
.via(balancer)
.runWith(storeReadings)
The return type of runWith
is the same as that of the Sink
we're using, so here it's going to be a Future[Done]
- which lets you handle the completion/failure of the processing as you usually do with Future
s.
Monix
In Monix, there's nothing similar to Akka Streams' GraphDSL
- we can just manipulate the Observable
s. Fortunately, this is still enough to implement load balancing:
Observable
.fromIterable(files)
.bufferTumbling(concurrentFiles) // 1
.flatMap { fs =>
Observable.merge(fs.map(f => Observable.now(f).transform(processSingleFile)): _*) // 2, 3
}
.consumeWith(storeReadings)
.runAsync
The important steps here are:
- Grouping the files into batches of size
concurrentFiles
. - Mapping every batch of files into a batch of
Observable
s (thefs.map(...)
part) that do the actual processing of each file. - Merging the
Observable
s into a single one (withObservable.merge
), which is then attached to the consumer.
Note that we need to call two methods to actually run the processing, since consumeWith
returns a Task
, which is lazy by nature and needs to be run explicitly with runAsync
. The latter returns a CancellableFuture
- a Monix extension of the Scala's Future
.
Summary
In this last part of the Akka Streams vs Monix series you have learned how to put all the processing stages together, and how to combine them with producers and consumers of data to actually run the pipeline.
Overall, what you have seen in this series is that although both Akka Streams and Monix are both an implementation of the Reactive Streams concept, they were built with somewhat different approaches in mind.
Akka Streams has a strong focus on modularity, therefore the API encourages to design self-contained, reusable building blocks in the form of Flow
s. You also need to explicitly provide a way to run the entire processing graph by providing a Materializer
.
In Monix, on the other hand, there's no notion of and independent building block, but you can still achieve something similar - by defining and then combining plain functions that transform the Observable
s. Monix also hides the details of how the pipeline is run, since it doesn't expose anything like the Akka Streams' materializer.
You could also have noticed that - at least in some areas - the Monix API is a bit more high-level. This is the case e.g. with I/O operations like streaming the contents of a file, where you have a single method to do it, contrary to Akka Streams, where you need to go a bit deeper into the stream implementation and e.g. implement the delimiting yourself.
Last but not least, although in Akka Streams it's theoretically possible to use any materializer and although there is an alternative implementation base on Apache Gearpump, the built-in, actor-based materializer remains the de facto standard. So, despite the theoretical flexibility, the need to explicitly provide a materializer introduces unnecessary boilerplate.
If you'd like to experiment further on your own, the full working examples for both libraries are available on GitHub: the Akka Streams version and the Monix one. Both repositories include a Readme, which, apart from the running instructions, contains a guide to generating your own test data.
And, obviously, I'm eager to hear your comments.