Contents

Direct-style concurrent streaming

Direct-style concurrent streaming webp image

Ox, a Scala library for safe direct-style concurrency and resiliency on the JVM, gained a new implementation of concurrent streaming. It allows data processing pipelines to be defined using a functional API, imperative API, or both at the same time.

Streaming functionality isn't new in Ox: the previous implementation was based entirely on channels. While this worked, it had some drawbacks: each transformation stage introduced an asynchronous boundary. This might be inefficient: if you have a number of non-blocking and non-CPU-intensive stages, such as .filter, .mapStateful or .interleave, the asynchronous boundaries are simply not necessary. Hence, this approach caused too much concurrency.

To improve this situation, we're introducing the flow API. Channels remain the basic building block and continue offering an imperative way of defining data transformations. On top of channels, flows offer a functional API, with a number of high-level combinators well-known from other streaming libraries, such as Akka Streams or fs2.

Let's take a closer look at how flows work and interoperate with channels!

Just as the channels in Ox are based on an algorithm from Kotlin, asynchronous flows are also implemented there.

An example flow

Here's a short example of the flow API in action, runnable with Scala CLI:

//> using dep "com.softwaremill.ox::core:0.5.0"

import ox.flow.Flow
import java.net.URI

@main def capitals(): Unit =
  Flow
    .fromValues("Poland", "Argentina", "Italy", "Germany", 
      "Republic of India",  "France", "Japan", "United Kingdom", 
      "Australia", "USA")
    .map(_.toLowerCase().replace(" ", "%20"))
    .mapPar(3)(country =>
      val getCapital = new URI(
        s"https://restcountries.com/v3.1/name/$country?fields=capital")
      Flow
        .fromInputStream(getCapital.toURL().openStream())
        .linesUtf8
        .take(1)
        .runToList()
    )
    .runForeach(println)

The first characteristic of flows is that they are lazily evaluated. By invoking the .map, .filter, etc. methods, we only describe how the data should be transformed. Nothing happens until the flow is run.

The second important point is that concurrency is hidden. You describe what you need using the provided high-level methods; there's no need to use channels directly, not to mention other concurrency primitives.

Finally, we can use effectful blocking operations without any loss of expressivity of performance. That's thanks to Java's virtual threads and structured concurrency, as implemented in Java 21+ and Ox.

Dissecting the flow

To get a better understanding of how flows work, let's take a look at the various combinators used in the example above. First, we've got Flow.fromValues(...). This creates a Flow[String]: a description of a flow which, when run, will emit a number of strings. Here, this is a finite list of countries, but potentially flows might also be infinite.

Flow
  .fromValues(...)
  .map(country => ...)

Then, we .map this flow, lowercasing the names and doing a poor man's URL-encoding (don't do this in production ;) ). Once again, no data is processed, as we haven't run the flow yet. We've simply obtained another instance of Flow[String], which contains two stages: one producing the values and the other mapping them. If we were to run the flow at this point, both stages would run on the calling thread with no concurrency.

Flow
  .fromValues(...)
  .map(country => ...)
  .mapPar(3)(country =>
      ...
  )

It gets more interesting by the time we reach .mapPar. This combinator maps over each element of the incoming flow, running up to 3 mapping function invocations concurrently. This is useful if the mapping function performs effects or is otherwise blocking. And that's the case here: we perform an HTTP GET request for each country to convert the country name to the capital name.

When run, .mapPar runs the previously defined pipeline (which emits capitals) in a background fork. Any elements emitted by the flow are sent to a channel, which by default has a capacity of 16 (configurable). Another background fork receives elements from this channel (lowercased capitals) and creates more forks that perform the HTTP requests. Finally, in the calling thread, we gather the responses, taking care to keep the original order.

This entire process is (luckily) hidden from the end-user, and uses Ox's concurrency scopes & forks. That means that if anything goes wrong—e.g., the HTTP request throws an IOException—everything will be cleaned up, and only after that happens will the exception be propagated. "Cleaning up" means interrupting and waiting for completion of any forks that are still running.

Flow
  .fromValues(...)
  .map(_.toLowerCase().replace(" ", "%20"))
  .mapPar(3)(country =>
      ...
  )
  .runForeach(println)

We'll go back to the implementation of the mapping function in a second, but let's quickly cover the last operation on the main flow: .runForeach(println). This method actually runs the flow, blocking the calling thread until all elements are processed. Only when this method is called, data processing starts and any effects that are part of the stream happen.

Finally, let's take a look at the implementation of the country-to-capital mapping function:

val getCapital = new URI(s"https://restcountries.com/v3.1/name/$country?fields=capital")
Flow
  .fromInputStream(getCapital.toURL().openStream())
  .linesUtf8
  .take(1)
  .runToList()

More for demonstration purposes than real need, it also uses a Flow. We first open an InputStream (using Java's URL—again, don't do this in production; use sttp client instead!). This is used to create a Flow[Chunk[Byte]] with Flow.fromInputStream. This method emits chunks of bytes as read from the input stream and ensures that the InputStream is closed, whether the flow is completed successfully or with an error.

We then use the .linesUtf8 helper method, which parses byte chunks into lines. We're only interested in the first line (as there's usually one capital), and we run the flow, collecting all emitted elements into a list using .runToList(). What's missing here is JSON parsing, as that's the format of the responses, but we'll leave it as an exercise for the reader :).

Available combinators

The flow API contains a number of combinators well-known to any user of the Scala collections library—mapping, filtering, collecting, etc. There are also some stream-specific combinators, such as throttling, interleaving/interspersing, grouping elements based on element count or time, stateful mapping, and more.

Similarly, the Flow companion object contains a number of ways to conveniently create flows, such as iterating over provided values, unfolding over functions, or reading from an InputStream as above. If these are not enough, the .usingEmit method might be useful to define flows flexibly:

import ox.flow.Flow

def isNoon(): Boolean = ???

val intFlow = Flow.usingEmit: emit =>
  emit(1)
  for i <- 4 to 50 do emit(i)
  if isNoon() then emit(42)

It's also possible to introduce explicit asynchronous boundaries using .async(). This might be useful if producing the next element to emit and consuming the previous should run concurrently, or if the consumer's processing times vary, and the producer should buffer up elements. Under the hood, channels are used.

Imperative API

As mentioned in the beginning, the functional API is only one side of the streaming support in Ox. Channels offer an imperative API, which provides the ultimate flexibility but might be more verbose. With this approach, you can use the .send() and .receive() blocking operations to obtain, transform and emit data to subsequent pipeline stages.

Flows and channels are fully interoperable: you can create a flow from a channel and run a flow to a channel. As described above, concurrency within flow stages is implemented using channels.

As a short example, here's a method that transforms a channel of Strings into a channel of Ints:

import ox.Ox
import ox.channels.Source
import ox.flow.Flow

def transformChannel(ch: Source[String])(using Ox): Source[Int] =
  Flow.fromSource(ch)
    .mapConcat(_.split(" "))
    .mapConcat(_.toIntOption)
    .filter(_ % 2 == 0)
    .runToChannel()

Note that we have to run this method within a concurrency scope (using Ox), as running a flow to a channel creates a background fork, which runs the flow and sends all emitted elements to the returned channel. This concurrency scope is also the scope within which errors are handled.

Extending the flow API

You can create your own flow processing stages, by implementing a custom FlowStage:

trait FlowStage[+T]:
  def run(emit: FlowEmit[T]): Unit

trait FlowEmit[-T]:
  /** Emit a value to be processed downstream. */
  def apply(t: T): Unit

The run method should implement your data processing logic using the previously defined pipeline (available as a FlowStage instance). How the previous pipeline is run—whether synchronously or asynchronously—if fully up to the flow stage's implementation.

Ox contains one extension of the flow's API, tailored towards Kafka's producers and consumers, available as part of the kafka module. We hope to see many more combinators available both as part of the core API and integrations within the Ox project and external ones! If you'd like to participate, the community forum is a great place to start.

Try Ox today!

Ox is licensed under Apache2; the binaries are available on Maven Central. All you need to start experimenting with structured concurrency and blocking concurrent data streaming pipelines is adding the following dependency to your build:

// sbt dependency
"com.softwaremill.ox" %% "core" % "0.5.0"

// scala-cli dependency
//> using dep com.softwaremill.ox::core:0.5.0

We'd appreciate any feedback you have—both regarding the streaming feature and Ox's feature set in general. What are you missing to write direct-style Scala applications? The community forum and GitHub issues are great places to leave your comments/bug reports.

Finally, give Ox a star! :)

1

Blog Comments powered by Disqus.