Contents

Integrating callbacks with structured concurrency in Scala

Integrating callbacks with structured concurrency in Scala webp image

Structured concurrency allows to run asynchronous tasks within clearly defined boundaries of supervision. Introduced as a preview feature in Java 19, it helps to manage the lifecycle of running forks by requiring the developer to define a scope and enforcing the rule that tasks can be executed only within such a scope:

Invoice generateInvoice() throws ExecutionException, InterruptedException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { 
        Supplier<Issuer> issuer = scope.fork(this::findIssuer);
        Supplier<Customer> customer = scope.fork(this::findCustomer);
        Supplier<List<Item>> items = scope.fork(this::findItems);

        scope.join().throwIfFailed(); 

        return new Invoice(issuer.get(), customer.get(), items.get());
    }
}

Scopes can be nested, so that different modules in the call stack can guarantee that, in case of interruptions, the boundary of forks that have to be canceled is isolated. See Robert Pudlik’s blog for more details about working with scopes in Java.
In Scala, the Ox library is a toolkit for performing simple but safe and powerful concurrency operations. Its core features leverage structured concurrency based on scopes, just like plain Java 19, but with the power of the Scala syntax and type system.

import ox.*
import scala.concurrent.duration.*

supervised {
  val f1 = fork {
    sleep(2.seconds)
    1
  }

  val f2 = fork {
    sleep(1.second)
    2
  }

  (f1.join(), f2.join())
}

One of the rules in structured concurrency is:

Once the code block passed to the scope completes, any daemon forks that are still running are interrupted.

This means that the lifecycle of forks has predictable boundaries, which should be easy to follow when reading or maintaining the codebase. Scopes can then be nested, preserving modularity and local reasoning. In Scala, the Ox library enables exactly that — safe, robust concurrent programming with familiar abstractions, leveraging structured concurrency and a direct style.
You can start forks:

def forkComputation(p: Int)(using Ox): Fork[Int] = fork {
  sleep(p.seconds)
  p + 1
}

supervised {
  val f1 = forkComputation(2)
  val f2 = forkComputation(4)
  (f1.join(), f2.join())
}

You can define sophisticated retry policies:

def eitherOperation: Either[String, Int] = ???
retryEither(
  RetryPolicy.backoff(3, 100.millis, 5.minutes, Jitter.Equal),
  ResultPolicy.retryWhen(_ != "fatal error")
)(eitherOperation)

Work with timeouts:

def computation: Int =
  sleep(2.seconds)
  1

val result1: Try[Int] = Try(timeout(1.second)(computation)) // failure: TimeoutException

You can process streaming data:

supervised {
Source
  .fromValues(1, 2, 3)
  .map(_ + 1) // runs in a fork
  .foreach { … }
}

and do a lot of other convenient async operations, all that running on virtual threads, which are impressively fast and cheap. Underneath Ox’s Source there’s a Channel[A] - a construct similar to a queue, but with additional features like completion support, downstream error propagation, and advanced element selection semantics:

val c = Channel.bufferedDefault[String]
channel.send(“msg”)
channel.receiveOrClosed() // String | Closed

Operations like send or receive happen directly on the call site, so they don’t need any forks, hence no supervision context. However, constructing a Source with transformations adds async stages that run in a fork:

supervised {
(channel: Source[Int])
  .map(_ + 1) // runs in a fork, needs a scope
  .foreach { … }
} // after this block, all forks get interrupted

Any operation that has to leverage forks needs a clear boundary, which is typically a supervised {} block. This, similarly to what we saw in Java, defines a scope that finishes by closing all started forks, and vice versa.

Problem definition

Such a way of structuring scopes fits well with standard logic flows, when there is a defined continuous “parent flow” controlled by the user, for example, a HTTP request handling process that produces a response and ends. Within such a flow, there can be many branches, but it all boils down to a well-defined finite sequence of actions, some of which may be concurrent, ultimately joining and ending with a single result. A lot of typical request-handling use cases keep such a structure, similarly with actions triggered by some events. To compare with effectful systems like Cats Effects, one can say that such a flow represents a fiber of a parent effect, which is built of sub-effects, composed into a monadic flow, with occasional forks and joins if needed.
However, there are cases where interfaces produce multiple values and are structured with handler methods (callbacks), which are detached from any controllable main flow.
Let’s say we’d like to leverage channels for background processing of messages using an interface like:

class MessageHandler(transformationPipeline: Source[Message] => Source[Message]):
  val channel: Channel[Message] = Channel.buffered[Message]

  def onMessage(msg: Message): Unit = // called from the outside when a message is available
     channel.send(msg) // happens directly here, no supervision context is needed

  // start background processing
  def startProcessing(): Unit = 
    fork { // won’t compile, there’s no scope!
      transformationPipeline(channel: Source[Message]) 
      .foreach {
          case req: Request => handleRequest(msg)
    }

  def handleMessage(msg: Message): Unit = 
    // actual processing of a single message

// somewhere else
// won’t compile either, .map and .filter need a scope as well!
new MessageHandler(src => src.map(fun).filter(filterFun)).startProcessing() 

Creating such a detached fork is impossible because it requires an outer scope. How to deal with such a problem? Our background task (startProcessing()) may be started from an arbitrary thread, like Netty’s internal handler triggered by one of its callbacks. Structured concurrency constrains us to know exactly when a scope starts and ends, and in this case, the end is unknown, as the messages for processing may keep coming until a channel is explicitly closed by another callback. Additionally, transformations passed by the user may also need a scope.

The Solution

To address this callback-structure mismatch we need to define special scopes.

  • An outer long-living scope. It would be responsible for starting ad-hoc forks, so that we are sure these forks are not killed, because the scope lives as long as MessageHandlers are created and used.
  • A nested scope around single background processing, guaranteeing that in case of interruption, only this particular processing and all its child forks (.map, .filter, etc.) are released, while the outer scope still runs forks for other MessageHandlers.

To implement this, we can leverage Ox’s Actor, which works in a similar way to the well-known Actor model implemented in Akka. An Actor lives within a certain scope and receives messages, which are put into its inbox by a tell method, returning immediately. Then, a background fork running underneath will read the message and process it. In our case, this message can be a function that starts the processing pipeline. We’ll add another, nested scope inside this fork to make sure all the operations are properly isolated. This way, such an actor becomes a dispatcher, scheduling asynchronous tasks to the “outer scope”:

class OxDispatcher()(using ox: Ox):
  private class Runner:
    def runAsync(thunk: Ox ?=> Unit, onError: Throwable => Unit): Unit =
    fork {
        try supervised(thunk)
        catch case e => onError(e)
    }.discard

  private val actor = Actor.create(new Runner)

  def runAsync(thunk: Ox ?=> Unit)(onError: Throwable => Unit): Unit = actor.tell(_.runAsync(thunk, onError))

Such a dispatcher hides the complexity of fork and scope management from the user, it can be conveniently used to start safe, arbitrary background processes:

dispatcher.runAsync(
  source.map(...).filter(...).foreach(...),
  onError = …
)

The thunk passed to runAsync will execute asynchronously on the virtual thread pool outside of the call site, similarly with error handling. Note that its signature is a context function Ox ?=> Unit. Context functions are a new feature introduced in Scala 3, allowing to define a function with an implicit parameter. We can use it to pass a Source transformation, which will leverage an implicit scope (Ox instance) managed within the actor. Our example becomes

class MessageHandler(transformationPipeline: Ox ?=> Source[Message] => Source[Message], oxDispatcher: OxDispatcher):
  val channel: Channel[Message] = Channel.buffered[Message]

  def onMessage(msg: Message): Unit = // called from the outside when a message is available
     channel.send(msg) // happens directly here, no supervision context is needed

  // start background processing
  def startProcessing(): Unit = 
    dispatcher.runAsync( 
      transformationPipeline(channel: Source[Message]) 
      .foreach {
          case req: Request => handleMessag(msg)
      },
      onError = e => logger.error(e)
    )

  def handleMessage(msg: Message): Unit = 
    // actual processing of a single message

// somewhere else
val dispatcher = new OxDispatcher()

// somewhere else
new MessageHandler(src => src.map(fun).filter(filterFun), dispatcher).startProcessing() 

We have delegated scope management outside of the MessageHandler - it now only needs an instance of the dispatcher. Let’s picture the entire dynamics between a callback-based entity and a dispatcher actor:

ox-dispatcher
The green boundary is our safety supervision zone. Of course, the question now becomes “in what scope should I create the dispatcher?”. This is depicted as “global concurrency scope”, the blue zone. Since message handlers can come and go during the entire server lifecycle, you will often end with a global supervised {} wrapper around the entire server lifecycle, so there will be a single dispatcher created when the server starts.

Scopes in Tapir

In tapir-netty-sync server all these details from the user, so they can just run the server using:

NettySyncServer()
  .host("0.0.0.0")
  .port(8080)
  .startAndWait()

This will create a managed long-living scope internally, and this scope will be used to create an OxDispatcher, which is also an internal concern isolated from the user code.
Right now Tapir’s NettySyncServer supports Ox for its Web Socket endpoints, where all you provide is a Ox ?=> Source[Req] => Source[Resp] processing pipeline:

object WebSocketNettySyncServer:
  val wsEndpoint =
    endpoint.get
    .in("ws")
    .out(webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](OxStreams))

  // Your processor transforming a stream of requests into a stream of responses
  val wsPipe: Pipe[String, String] =
    requestStream => requestStream.map(_.toUpperCase)

  // The WebSocket endpoint, builds the pipeline in serverLogicSuccess
  val wsServerEndpoint = wsEndpoint.serverLogicSuccess[Id](_ => wsPipe)

  def main(args: Array[String]): Unit =
    NettySyncServer()
    .host("0.0.0.0")
    .port(8080)
    .addEndpoint(wsServerEndpoint)
    .startAndWait()

See the full example on GitHub.
Underneath, a reactive Publisher/Subscriber machinery integrates with Netty in a callback-based fashion using an OxDispatcher, while you can still work with Tapir in a functional and declarative, direct style.

Limitations

The Proposed solution can be reused for other callback-based integrations, and if you run into such a situation, you need to understand the limitations of using such a dispatcher:

  • A single actor - while the underlying actor’s responsibility is only to start forks, it is a single point of task scheduling with a FIFO inbox. In the case of multiple entities calling runAsync() the fairness of starting underlying forks is not very sophisticated - whoever schedules their tasks faster, will be granted their start first. In our performance tests on 2500 concurrent users we didn’t notice any issues, but some cases may need multiple separate actors in the implementation to make sure they don’t interfere.
  • Fire-and-forget - a task started with runAsync() may run forever or for too long. It’s unsafe territory. You need to carefully code the passed thunk to ensure it has well-defined end conditions. The call site of runAsync() isn’t the same as the actual start of a fork, so it’s not possible to create a runAsyncCancellable() variant that would give us a cancellation switch.
  • Unbounded actor mailbox - there is no control over the tasks that can be started with runAsync(), the dispatcher actor’s inbox is unlimited. If needed, such boundaries have to be managed externally, with higher-level control structures.
    Within Tapir’s NettySyncServer, server internals take care of all these safety concerns, and users don’t interact directly with dispatchers.

Summary

Structured concurrency feels like a set of hard constraints incompatible with callback-based interfaces, but I hope I have demonstrated that both approaches can be safely reconciled without sacrificing safety or developer experience. If you’re interested in writing Tapir a server using direct-style Scala with Ox on Java 21 virtual threads, give it a try!

Blog Comments powered by Disqus.