Contents

Supervision, Kafka and Java 21: what’s new in Ox

Adam Warski

30 Aug 2023.6 minutes read

Supervision, Kafka and Java 21: what’s new in Ox webp image

Ox is a JVM library that facilitates structured concurrency. It integrates the capabilities introduced in Java 21 (such as virtual threads) with Go-style channels and features from the Scala programming language.

Although it's currently experimental, I'd like to introduce a few new features and would appreciate your feedback!

Supervision

Let's begin by discussing supervision. At the core of Ox is the scoped method. This method initiates a new scope. Inside this scope, you can start multiple forks that run concurrently. When the code block given to scoped is done, any forks that are still running get interrupted. The entire scope is only completed when all its forks have completed. Essentially, the code's structure determines the threads' lifetime. This is how Ox achieves structural concurrency.

For example, the following runs two forks concurrently:

scoped {
  val f1 = fork {
    Thread.sleep(2000)
    println("Fork 1 is done")
  }

  val f2 = fork {
    Thread.sleep(1000)
    println("Fork 2 is done")
  }

  // without the joins, the scope would end immediately, 
  // thus cancelling f1 and f2, and we wouldn't see any output
  f1.join()
  f2.join()
}

This approach is effective, but it necessitates careful error handling. If a fork encounters an error and throws an exception, the only way to detect this fact is by using the .join method on that fork. Doing so will rethrow the exception, which can terminate the scope or be managed differently.

However, there's a risk: a fork that's never joined might "fail silently". To address this, the newest version of ox introduces a default scoping method called supervised, building on the already mentioned scoped method.

Within the supervised scope, if any fork—including the main code block—fails, the whole scope ends, canceling any running forks. Moreover, the scope will also end when all the forks, including the main code block, run to completion without errors. To illustrate, consider the modified example below:

supervised {
  fork {
    Thread.sleep(2000)
    println("Fork 1 is done")
  }

  fork {
    Thread.sleep(1000)
    println("Fork 2 is done")
  }
}

We get the same behavior but with additional safety guarantees in case any fork fails (and a slightly smaller amount of code).

Summarising, there are now two scoping methods in ox:

  • scoped, which ends the scope once the main code blocks completes; fork failures can be discovered only using fork.join()
  • supervised, which ends the scope when all forks have completed successfully, or at least one has failed.

In both cases, ending the scope means that any running forks are canceled (interrupted and awaited until they complete). Also, in both cases, the scoping method returns only when all forks have completed, maintaining the structural concurrency property.

Two helper methods have also been introduced to make the supervision more flexible: forkDaemon will still end the scope when the fork fails but isn't required to complete to end the scope in the successful scenario. And forkUnsupervised, which within supervised works the same as fork within scoped.

supervised is a bit more expensive than scoped (it starts an extra supervising fork), but otherwise, I think it is a much safer alternative to scoped.

Also, a side note on how this impacts channels and streaming: all of the stream-processing methods, such as .map, .merge, or stream-producing methods, such as Source.tick or Source.fromIterable, start daemon forks. That way, errors are handled, but the scope's lifetime is defined by user-provided logic.

Kafka

We added basic Kafka integration to the streaming component to evaluate the ox approach in practical situations. Now, you can create sources that read data from a Kafka topic and establish drains to send data to a topic. Additionally, there's an option to commit offsets of messages used to create the published data.

As an example, here's a Kafka stream processor, which consumes data from one topic, assumes it's all numbers, and sends their doubled values to another topic; source offsets are committed only once the publish succeeds, hence implementing at least once delivery:

supervised {
  KafkaSource
    .subscribe(consumerSettings, sourceTopic)
    .map(in => (in.value.toLong * 2, in))
    .map((value, original) => 
      SendPacket(ProducerRecord(destTopic, value.toString), original))
    .applied(KafkaDrain.publishAndCommit(producerSettings))
}

Taking this apart, KafkaSource.subscribe creates a daemon fork, which reads data from a Kafka topic, and publishes it to a newly created channel, which is then returned to the user. consumerSettings contains configuration such as Kafka bootstrap servers or the consumer group id.

The two .map invocations each start a daemon fork, which performs the mapping operation. Such an operation can be blocking, although here, we do simple in-memory transformations.

Finally, KafkaDrain.publishAndCommit returns a function, which consumes a source of SendPackets in a blocking way. Each send packet contains records to publish, along with records which should be committed once the publish succeeds.

We get a running Kafka pipeline by applying this function to the previously constructed source. In case of an exception in any stage, the pipeline is interrupted, and the exception is propagated to the user. However, the propagation will happen only once everything is cleaned up (no dangling forks are left behind).

Another example might involve pushing data from a Kafka topic to a remote HTTP endpoint and storing the responses:

import sttp.client4.*
val backend = DefaultSyncBackend()

def sendRequest(data: String): Response[String] =
  quickRequest.get(uri"https://endpoint.com").body(data).send(backend)

supervised {
  KafkaSource
    .subscribe(consumerSettings, sourceTopic)
    .mapPar(10)(in => (sendRequest(in.value), in))
    .mapAsView((response, original) => 
      SendPacket(ProducerRecord(destTopic, response.body), original))
    .applied(KafkaDrain.publishAndCommit(producerSettings))  
}

We specify that, at most, ten concurrent HTTP calls can happen at any time. .mapPar guarantees that the ordering of output data follows the ordering of input data. Also, we use .mapAsView, which doesn't start a fork to process the data as it arrives; instead, it returns a view of the original source, where the mapping is lazily computed on demand.

We're using sttp 4 to perform the HTTP calls, and same as before, we publish the data while committing corresponding source offsets.

That's just the start of a possible Kafka integration, but it would be great to know your thoughts on such an "eager" streaming implementation based on ox's Channels, supervised scopes, and virtual threads. For more details, see the Kafka section in the readme and the scaladocs on the public methods.

Java 21

Last but not least, ox has been updated to use Java 21 EA (a final release will be available towards the end of September). The structured concurrency and scoped values preview APIs changed slightly since Java 20, so the code needed some updates.

The most significant one is that forks are now, by default, not cancellable; instead, there's a forkCancellable method, which returns a CancellableFork. It's a bit more expensive to run than a regular fork, but cancellation shouldn't be needed that often, given the structural properties given by scoped and supervised (which still cancel any running forks when the scope ends).

A side-effect of the update is that ox now relies on a nightly Scala build, as Scala 3.3.0 doesn't seem to work with Java 21.

Find out what Java Experts say about the newest Java release: Java21

Feedback wanted!

It would be great to know what you think about the API and the direction that we've taken. Does supervised provide a good approach to error handling? How about the Kafka API? Is the code clear enough, and does it provide the necessary resilience? What use cases would you like to see covered by ox?

Let us know, experiment, and leave feedback either on our community form or as an issue in the GitHub project.

Blog Comments powered by Disqus.