Contents

Programmer-friendly structured concurrency for Java

Adam Warski

19 Jul 2024.14 minutes read

Programmer-friendly structured concurrency for Java webp image

Structured concurrency has proven to be a safe, expressive, and understandable approach to concurrency. Python libraries are a pioneer in the field, followed by others, including languages such as Kotlin.

Since Java 21, we have a powerful new concurrency primitive at our disposal: virtual threads. This feature consists of three parts: a user-level API, which builds on the abstractions known so far (the Thread class). Secondly, a runtime that manages the scheduling of virtual threads onto a small pool of platform threads. Finally, all blocking APIs in Java are retrofitted to be virtual thread aware.

Equipped with a new tool that allows us to return to direct-style, synchronous programming in Java, can we also innovate how we structure concurrent code? That is, bring the benefits of structured concurrency to the Java platform? Definitely! There's a "Structured Concurrency" JEP in progress (available as a preview feature in Java 21 & 22), which tackles exactly that.

With the JEP in place, aren't we done? Isn't structured concurrency simply there for Java, and can we, as application programmers, only reap the benefits? Yes and no. Yes, the APIs are or will probably be there as a stable feature in future Java releases. And no, as I'd consider the APIs defined in the JEP relatively low-level. They provide a good foundation, on top of which a safe, programmer-friendly API might be built. And that's exactly what we'll try to do.

Jox so far

The Jox library has been focused on providing a fast, scalable channel implementation for Java. This is based on an algorithm (see the paper) by the authors of channels in Kotlin and the Kotlin implementation itself. This functionality remains in Jox and is part of the channels module.

What's described below is a new module of Jox, structured, providing structured concurrency scopes and higher-level combinators.

Basics of a structured concurrency approach

The basic building block of any API that provides structured concurrency is a way to define scopes. Such a scope defines a syntactical boundary within which concurrently running computations (forks/threads) might be started. Most importantly, once a scope is completed, all such forks/threads are guaranteed to have finished as well. That is a scope never leaks any threads: once control steps over the scope, no "dangling" threads are left.

This allows us to reason about concurrent code in a structured way: the lifetime of any fork that we create is clearly visible from the program's structure.

The method that creates a new structured concurrency scope in Jox is called supervised (why this particular name will become apparent when we discuss error handling). Forks might be started within the scope's body. The result of the scope's body becomes the result of the supervised method. Here's a basic use-case of how you run two computations in parallel:

supervised(scope -> {
   var f1 = scope.fork(() -> {
       Thread.sleep(500);
       return 5;
   });
   var f2 = scope.fork(() -> {
       Thread.sleep(1000);
       return 6;
   });
   return f1.join() + f2.join();
});

After the scope is started, we create two forks, which run concurrently with the scope's body. A virtual thread backs each such fork. In the forks, we simulate complex computations (Thread.sleep is virtual-thread-aware, hence perfectly fine to use on Java 21+!), and return a numeric result. In the scope body, we block waiting for the first fork to complete using f1.join() (once again, this blocking is virtual-thread-aware, so it only deschedules the virtual thread; it doesn't block the underlying platform thread). We do the same for the second, obtaining the result.

When considering the "happy path", this example isn't particularly interesting, and we could easily achieve that without structured concurrency. However, one of the main difficulties in writing robust concurrent code is handling the cases when things go wrong.

Error handling in scopes

What happens if one of the forks throws an exception? That's where supervised scopes come in. In case any of the forks fail, that is—throw an exception—the entire scope will end. However, we can't simply end the scope and continue the execution of the code that comes after it. We need to make sure that we leave no dangling forks.

That's why any forks still running (or, more precisely, their backing virtual threads) are interrupted and we wait for them to complete. This is done using the usual Java interruption mechanism: injecting an InterruptedException on the first blocking operation. How this exception is handled is up to each individual fork. However, the scope won't complete until all forks have finished, including any exception and interruption handling.

Once all forks are done, the exception is rethrown, wrapped in an ExecutionException:

supervised(scope -> {
   var f1 = scope.fork(() -> {
       Thread.sleep(1000);
       return 6;
   });
   var f2 = scope.<Integer>fork(() -> {
       Thread.sleep(500);
       throw new RuntimeException("I can't count to 5!");
   });
   return f1.join() + f2.join();
});

Here, after 500ms, the second fork throws an exception. The supervisor is notified of the fact and ends the scope. Any forks still running are interrupted, including the scope's body. The Thread.sleep(1000) throws an InterruptedException, as does f1.join(). We don't have any resource cleanup (using finally) or any other exception handling, so both f1 and the scope's body end, and the scope completes—by throwing an exception, where the RuntimeException("I can't count to 5!") is set as a cause.

This might remind you of the "let it crash" strategy from actor systems. When there's an error, we fold everything (entire actor hierarchies, or here, entire scopes) to the point where we know how to handle the error. Same here: the code running the supervised scope can catch the exceptions, restart the scope, or run any other appropriate logic.

And that's it! We've got a simple set of rules that govern the execution of concurrency scopes, making them safe and easy to understand:

  • scopes only complete once all forks started within have completely finished
  • any exception thrown in any fork (or the scope's body), will cause the scope to end
  • when the scope ends, any forks that are still running are interrupted

Other types of forks & scopes

While supervised + fork are the most useful and "recommended" constructs, a couple of other variants might be useful in specific situations.

First of all, fork creates a daemon fork. If the body of the scope completes, the scope will end, interrupting any forks that are still running. That's because we consider the body of the scope as the "orchestrator" or "main driver" of the logic, which produces the scope's result. And if the result is known, there's probably no point in further running any forks.

Hence, once the scope's body reaches the end, any forks running in the background will probably not be needed anymore. That's why they are interrupted.

If that wouldn't be the case, and we'd want the scope to end only once a particular fork has completed, we can start it using the forkUser method—the user is in contrast to daemon threads. In fact, the body of the scope is run as a user fork!

We can also start unsupervised forks using forkUnsupervised. This is less safe, as an exception thrown in the fork might go unnoticed. To "discover" the exception, you have to call the blocking .join() method, which will either return the result of the fork or throw the exception, with which it completed. There's also a forkCancellable, which additionally exposes a way to cancel a fork on-demand.

Finally, you can start entirely unsupervised scopes (using unsupervised), within which you can only start unsupervised forks. This is slightly more efficient—as there's no additional fork created to run the body of the scope under supervision—however, it is much less safe.

Comparing with JEP's 453 StructuredTaskScope

You might think that all of this sounds nice, but what about the StructuredTaskScope that is defined as part of JEP 453, which aims to implement structured concurrency in Java?

The StructuredTaskScope is more flexible but also harder to use. That's why I consider it a lower-level abstraction, on top of which libraries such as Jox might be built. And indeed, Jox's scopes use the JEP's scopes under the hood.

Just to be clear, there's nothing wrong with providing low-level constructs. That's often the task of the JDK: providing solid building blocks for library authors. As another example, consider the reactive streams APIs with Publisher and Subscriber, which are not meant to be ever implemented directly by application developers.

The question then is: why is StructuredTaskScope lower-level? Let's take a look at a basic example where we run two computations in parallel (from the JEP's description):

Response handle() throws ExecutionException, InterruptedException {
   try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
       Supplier<String>  user  = scope.fork(() -> findUser());
       Supplier<Integer> order = scope.fork(() -> fetchOrder());

       scope.join()            // Join both subtasks
            .throwIfFailed();  // ... and propagate errors

       // Here, both subtasks have succeeded, so compose their results
       return new Response(user.get(), order.get());
   }
}

First, in the model proposed by the JEP, computations are driven by a specific scope implementation (e.g., ShutdownOnSuccess or ShutdownOnFailure, but custom ones can be provided as well). These implementations gather the results & exceptions and decide if the scope should finish or not. On the other hand, the scope's body should "match" the scope implementation and properly use the method it exposes. Hence, the logic behind orchestrating the scope is divided between the scope's implementation and the scope's body.

In the scope body itself, we can fork subtasks the same as in Jox (using scope.fork). While Jox has a couple of types of forks, there's only one in JEP.

As mentioned, the scope's body has to call the appropriate methods of the scope in the proper order. In case of ShutdownOnFailure, it's .join and .throwIfFailed. Failure to do so, in that order, will result in an exception being thrown. And only once the scope is joined, the results of individual forks might be inspected.

The solution proposed by the JEP is very flexible—for sure, there are scenarios that can be elegantly implemented using the approach above, but which would be more cumbersome or slightly less efficient in Jox. However, in 99% of common use cases, I think the APIs provided by Jox are safer and more programmer-friendly.

It's just much harder to use the Jox's APIs incorrectly. At the same time, it's quite easy to misuse the API provided by the JEP scopes. For example:

  • we might call the scope methods in the wrong order
  • we might forget to call one of the scope's methods
  • we might try to obtain the results of a fork before calling .join

Jox's end-scope-on-exception is a good default, minimizing the chances of an exception going unnoticed. In the JEP approach, since the logic is dispersed across the scope and scope body implementation, it's less clear how errors are handled. The concurrency is still structured (that is, no threads are left dangling after the scope completes), but it's not as immediately clear as how exceptions are handled.

Finally, the JEP scopes seem to be designed to follow a fixed blueprint: fork computations—join—process results. With Jox forks, there's more flexibility. Forks might started, joined, then more forks might be started, etc. The entire logic driving the computation is contained in the scope's body. This is also crucial for implementing streaming operations, which we'll discuss below.

High-level concurrency combinators

As safe, straightforward, and programmer-friendly as supervised scopes might be, we might still climb the ladder of abstraction and provide higher-level combinators, implementing commonly used functionality.

First of all, we've got par, which allows running computations in parallel—something that we've seen as an example of the supervised scope. This might be more concisely written as:

var result = par(List.of(() -> {
   Thread.sleep(500);
   return 5;
}, () -> {
   Thread.sleep(1000);
   return 6;
}));

Under the hood, a supervised scope is started hence the exception-handling semantics are the same.

Similarly, we've got the race method, which allows racing two (or more) computations. The first one to provide a non-exceptional result will become the result of the race; any others will be interrupted:

var result = race(() -> {
   Thread.sleep(1000);
   return 10;
}, () -> {
   Thread.sleep(500);
   return 5;
});
// result will be 5, the other computation will be interrupted
// on Thread.sleep

There's also a timeout method, which is a specialized version of race that provides a time limit on a computation.

Use-case: functional, blocking streaming

The Channel data structure, which has been available in Jox for some time, provides an imperative interface: the .send() and .receive() methods. This provides the necessary flexibility (especially when combined with select); however, many use cases can be expressed much more concisely, elegantly, and thus in a more readable and maintainable way using a "functional" interface.

Such a "functional" interface is what we already know from reactive streaming libraries, such as Akka Streams. Can we implement one on top of channels to get a virtual-thread-backed, direct-style, synchronous version of stream processing?

It turns out that yes, we can—with two caveats. First, we get "hot streaming" operators, which immediately start after being invoked. That is in contrast to "cold streaming", as known from the mentioned reactive implementations, which first create a description of the streaming pipeline, and processing is started only later, on-demand.

Secondly, we need a way to start concurrent operations and manage any errors that arise as part of them. That's where scopes and supervision come in.

Consider a mapping function. If we have a Source<Integer> (an interface implemented by Channel, which allows receiving elements), we can implement a .map operation in the following way:

  1. create a new channel onto which results will be produced
  2. start a thread which will do the following in a loop:
    • receive an element
    • apply the mapping function
    • send the result to the new channel
    • in case of an error, propagate it to the channel
  3. return the new channel as the result of .map

Scopes quite obviously give us a way of forking new threads (what is required in step 2). Channels already contain a way to propagate errors. Additionally, scopes provide safety: if any of our streaming combinators has a bug, this won't go unnoticed and will cause the entire scope to end.

That's what is implemented in Jox's channel-ops module. Currently it's very basic, but it proves the approach is viable. Channel operations are run given a scope. Here's the mapping example (in Jox, the method is called .collect, as it combines mapping & filtering when returning null map results):

var result = supervised(scope -> new SourceOps(scope)
   .fromIterable(List.of(1, 2, 3))
   .collect(n -> n * 10)
   .toSource().toList());

System.out.println("result = " + result);

In the above, two virtual threads are created. The first one is to create the channel from an iterable. The successive elements are sent to the channel once it has enough space (channels are buffered, by default, with a buffer of size 16). The second is for performing the mapping operation. Here, it's pretty simple, and creating a virtual thread to multiply each channel's element by 10 is overkill (in fact, for such situations, we provide .collectAsView, which is a lazily-evaluated, no-overhead version of .collect).

But in general, these mapping functions might contain blocking code (remember, blocking is cheap when used with virtual threads!). For example:

supervised(scope -> {
   new SourceOps(scope)
       .tick(1000, "tick")
       .collect(tick -> 
                httpClient.get("https://example.org/stock_ticker"))
       .collect(dbClient::persist)
       // blocking the scope's body by ignoring the result of 
       // persisting responses
       .toSource().forEach(persistResult -> {});
       // will never happen, as ticks are infinite, hence the scope
       // only ends if there's any exception in the pipeline
    return null;
});

We can still use the imperative Channel interface when it's convenient, as we can obtain the underlying Source using .toSource() at any point. On the other hand, we can use the high-level "functional" interface by wrapping an existing source with SourceOps.forSource(scope, source), as long as we are within a concurrency scope.

Ox, Jox's older brother

Originally, we started work on structured concurrency in a Scala library called Ox. There, we researched a couple of variants of how scopes and forks might be created, arriving at the current design. As we're only starting to port functionality from the Scala library to Java, Ox currently has a much richer feature set.

The basic building blocks are similar: supervised scopes, forks, par, and race methods. However, Ox also contains an extended approach to error handling, where exceptions are recommended for "panics", and for "expected" error handling, we've got the Either data type. Thanks to the boundary-break mechanism, this provides type safety and concise syntax.

Moreover, Ox contains features such as resource management, retries, repeats, local actors, parallel collection operations, direct-style utilities, and more. There's a rich set of streaming operations, which you might know from other streaming libraries (providing grouping of data, sliding windows, I/O operations, etc.). Hence, if you'd like to see what kind of features are possible using the approach described above, take a look at what Ox has to offer!

Try it out!

Jox 0.3 is available on GitHub and Maven Central to try it out! The structured module contains the implementation of scopes, channels—the Kotlin-inspired selectable channels and channel-ops operations on channels, which leverage scopes. The project is Apache2 licensed.

Jox integrates nicely with all the other Java features, starting with virtual threads, through try-with-resources and exceptions, and blocking I/O operations.

We would appreciate some feedback—either a comment here or an issue on GitHub. Enjoy!

Blog Comments powered by Disqus.