Reactive Streams in Scala: Akka Streams vs Monix - part 2
This is the second part of the Reactive Streams in Scala: Akka Streams vs Monix series. The previous part is General concepts, example use case, the next one is Putting the building blocks together.
In this article we're going to see how we can define the various building blocks needed to process our files. You will then be able to see the similarities and differences in the approaches as well as APIs exposed by Akka Streams and Monix.
Streaming lines from a gzipped file
The first thing we'd like to implement is the conversion of a gzipped file into a stream of lines, emitted as String
s, including a possibility to skip an arbitrary number of initial lines.
Akka Streams
In Akka Streams, we first need to manually define how the stream of bytes (represented as Akka's ByteString
s) is going to be divided into lines - using a built-in Framing
helper:
val lineDelimiter: Flow[ByteString, ByteString, NotUsed] =
Framing.delimiter(ByteString("\n"), 128, allowTruncation = true)
The above step accepts and emits sequences of bytes (ByteString
s), with the emitted ones representing subsequent lines, having a maximum length of 128. The allowTruncation
flag lets us also accept the last line in the file when it doesn't end with \n
- without the flag, the stream would fail when no delimiter is present at the end.
To stream a file from a gzip archive, we're going to use the following code for each file
value (defined later):
val gzipInputStream = new GZIPInputStream(new FileInputStream(file))
StreamConverters.fromInputStream(() => gzipInputStream)
.via(lineDelimiter)
.drop(linesToSkip)
.map(_.utf8String)
.mapAsync(nonIOParallelism)(parseLine)
which builds a Source
that emits Readings
. This code creates a new Source
for every file, so with multiple files we're going to end up with multiple Source
s. However, our goal is to concatenate the readings from all of the files - this can be achieved by wrapping the above code with a flatMapConcat
stage. Its role is to transform every input element into a Source
, and then flatten the Source
s into a single stream of elements (by consuming one Source
after the other):
val parseFile: Flow[File, Reading, NotUsed] =
Flow[File].flatMapConcat { file =>
StreamConverters.fromInputStream(() => gzipInputStream)
.via(lineDelimiter)
.drop(linesToSkip)
.map(_.utf8String)
.mapAsync(nonIOParallelism)(parseLine)
}
Please note the use of mapAsync
in the last line. It's crucial that, when parallel asynchronous computations are executed for each element, the results of those are emitted downstream in the same order in which the elements arrived. This may often mean waiting for the slower computations to complete, if they were started earlier than the already completed faster ones.
In case you don't care about the order in which the results make it to the downstream, you can use the mapAsyncUnordered
counterpart, which emits the results as soon as they are ready, and can thus be faster in some cases.
Monix
In Monix, on the other hand, there's a built-in factory method to create an Observable
from a BufferedReader
. Thus, creating the file-parsing transformer is a bit simpler than in Akka Streams. We start with creating an Observable
of parsed lines from each file
:
val reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(
new FileInputStream(file)), "UTF-8"))
Observable.fromLinesReader(reader)
.drop(linesToSkip)
.transform(mapAsyncOrdered(nonIOParallelism)(parseLine))
}
Since the code above creates an Observable
for every file, we need to eventually flatten and concat the Observable
s - similarly to Akka Streams:
val parseFile: Transformer[File, Reading] = _.concatMap { file =>
val reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(
new FileInputStream(file)), "UTF-8"))
Observable.fromLinesReader(reader)
.drop(linesToSkip)
.transform(mapAsyncOrdered(nonIOParallelism)(parseLine))
}
Unfortunately, contrary to Akka Streams, Monix does not have a built-in support for ordered parallelization (see the GitHub issue) - the Observable
only provides the mapAsync
method, which, as opposed to Akka Streams, doesn't preserve the order of the values passed downstream.
To achieve the ordered behavior, we need to create a helper method ourselves, using other methods that the Observable
provides. A possible implementation can look like this:
def mapAsyncOrdered[A, B](parallelism: Int)(f: A => Task[B]): Transformer[A, B] =
_.map(f).bufferTumbling(parallelism).flatMap { tasks =>
val gathered = Task.gather(tasks)
Observable.fromTask(gathered).concatMap(Observable.fromIterable)
}
What this method does is it:
- creates a
Task
for every computation (note that Monix’sTask
s are lazy, so nothing gets executed yet), - groups the tasks into batches of size determined by
parallelism
, usingbufferTumbling
, - waits for each entire batch to complete using
Task.gather
, which returns aTask[Seq[B]]
- creates an
Observable
from the aboveTask
and flattens theSeq[B]
, so that theB
s are emitted downstream one by one.
Computation of an Average
The common logic for computing an average of a dataset is pretty straightforward - we group the Reading
s in pairs (with each element of a given pair sharing a common id
), then try to compute the average of the valid readings in each pair. If none of the readings are valid, we assume the average to be a dummy value of -1
:
val validReadings = readings.collect { case r: ValidReading => r }
val average = if (validReadings.nonEmpty)
validReadings.map(_.value).sum / validReadings.size
else -1
ValidReading(readings.head.id, average)
The building block definitions are also pretty similar across both libraries, as shown below.
Akka Streams
In Akka Streams, we're going to define the logic as a Flow[Reading, ValidReading, NotUsed]
with the following definition:
val computeAverage: Flow[Reading, ValidReading, NotUsed] =
Flow[Reading].grouped(2).mapAsyncUnordered(nonIOParallelism) { readings =>
Future {
// average computation logic
}
}
Monix
In Monix, the building block is going to be a Transformer[Reading, ValidReading]
:
val computeAverage: Transformer[Reading, ValidReading] =
_.bufferTumbling(2).mapAsync(nonIOParallelism) { readings =>
Task {
// average computation logic
}
}
As a reminder: mapAsync
in Monix is similar to mapAsyncUnordered
in Akka Streams, i.e. both do not preserve the order in which the input elements arrived in the general case.
Storing the average readings
Let's assume we have some ReadingRepository
that provides an asynchronous way of writing a Reading
to Cassandra, with the following signature:
def save(reading: ValidReading)
The return type is a Future[Unit]
in the Akka Streams version, and a Task[Unit]
in the Monix world.
Now, we'd like the storing building block to be the terminal part of the pipeline - an Akka Streams' Sink
or a Monix Consumer
. Here is where the different approaches to defining a processing pipeline come into play.
Akka Streams
In Akka Streams, we're going to build a Sink[ValidReading, Future[Done]]
. The less obvious Future[Done]
, which is the type of the materialized value, results from using a special Sink.ignore
that just ignores the incoming elements, but waits for the upstream to complete and only then completes the Future
. The Done
thing is another unification of Scala's Unit
and Java's Void
(as it was the case with NotUsed
). The Sink
definition is as follows:
val storeReadings: Sink[ValidReading, Future[Done]] =
Flow[ValidReading]
.mapAsyncUnordered(concurrentWrites)(readingRepository.save)
.toMat(Sink.ignore)(Keep.right)
You already know what mapAsyncUnordered
does, but I'm sure you're now wondering what the toMat
part actually does, so let's have a look. The mapAsyncUnordered
returns a Flow
, but we want a Sink
, so we use toMat
to connect the Flow
to Sink.ignore
. Now, since both the Flow
and the Sink
can potentially produce a materialized value, we need to explicitly choose in which of those values we're interested - see Figure 1.
When you imagine what the pipeline looks like, there's the Flow
to the left and the Sink
to the right. That's why we use Keep.right
to get the materialized value from the Sink
- using Keep.left
would give us the materialized value from the Flow
, as in Figure 2.
Similarly, you could use Keep.both()
to capture both materialized values (as a (M1, M2)
tuple), and Keep.none()
to discard both of them (in which the materialized value type will be NotUsed
) - see Figure 3.
Monix
In Monix, due to the lack of the materialized values, the implementation of a Consumer[ValidReading, Unit]
is a bit more straightforward:
val storeReadings: Consumer[ValidReading, Unit] =
Consumer.foreachParallelAsync(concurrentWrites)(readingRepository.save)
Summary
In this part of the Akka Streams vs Monix series you have seen how to define the building blocks of our file processing pipeline using the two APIs. You already know that they are similar to some extent, but also significantly different in many areas, e.g. due to the concept of materialized values used in Akka Streams.
In the upcoming last part of the series, you are going to see how to put the building blocks together, and actually execute the pipeline against some test data. Stay tuned!