Contents

Contents

Critique of JEP 505: Structured Concurrency (Fifth Preview)

Critique of JEP 505: Structured Concurrency (Fifth Preview) featured image

With the just-released Java 25, we have the opportunity to test the fifth preview of structured concurrency APIs. Turns out, creating such an API is not that easy a task!

The API allows you to create forks (virtual threads) within scopes, that run your tasks. Certain constraints are imposed, aiming to provide a safe and comprehensible way to work with concurrent, I/O-bound tasks in Java applications.

The right set of constraints is often the root of the most powerful features. However, from my attempts in porting Jox, a virtual-thread-native reactive-streaming-like library, from Java 21 to Java 25, I feel that the constraints currently in place are not always the best ones. That's why I'd like to offer my critique of the current design.

What is Structured Concurrency?

Structured concurrency is an approach where the syntactical structure of the code determines the lifetime of the threads created within that code.

Structured concurrency is well-introduced in the JEP itself, as well as in articles by Martin Sustrik, who coined the term, and Nathaniel J. Smith, who popularized it in Python. Structured concurrency is also widely used in Kotlin. Hopefully, these materials are sufficient to explain the concept; hence, I won't provide a detailed introduction here.

Let me provide a simple example. Say we want to run two methods concurrently: findUser() and fetchOrder(), combining their results when successful. We also want to short-circuit on failure: interrupt the other task if one fails. Finally, we want a guarantee that the scope within which we create the tasks will only finish once all threads have terminated (successfully, with an exception, or due to interruption).

These are precisely the guarantees and features that StructuredTaskScope from the JEP provides:

Response handle() throws InterruptedException {
    try (var scope = StructuredTaskScope.open()) {
        Subtask<String> user = scope.fork(() -> findUser());
        Subtask<Integer> order = scope.fork(() -> fetchOrder());
        scope.join();
        return new Response(user.get(), order.get());
    }
}

The Good

Before we dive into the weaker points of the specification, I'd like to be clear that I don't think that the current JEP is all bad; quite the opposite.

First and foremost, it properly implements the main idea of structured concurrency: using StructuredTaskScope, you have the guarantee that the scope won't complete until all forks (virtual threads) created within have completed. No thread leaks, and no cancellation delays.

Moreover, the current design is consistent with other Java features, as it uses the try-with-resources pattern. We have a clearly delineated region of code, where given resources are being used (virtual threads created to run forks), with a guarantee that the scope is always closed (properly cleaning up if needed).

Finally, using concurrency scopes has minimal overhead (or none at all), compared to unstructured variants of the code (e.g., relying on ExecutorServices and Futures).

But of course, it's always more interesting to discuss the other side: what might be the problems, then?

Non-uniform cancellation

JEP 505 works great when the scope's main body doesn't contain much logic and just follows the required, fixed structure: forking subtasks, joining the scope, and obtaining the results.

However, in reality, your forks/subtasks will often use shared state, communicate with each other (using queues or channels), and require the body of the scope to be the "coordinator" or "driver" of the whole process.

As an example, let's take a website crawler, where we want each page to be processed concurrently (by a separate subtask). Here's a sketch of how one might go about implementing such a crawler:

sealed interface CrawlCommand permits CrawlUrl, CrawlDone {}
record CrawlUrl(String url) implements CrawlCommand {}
record CrawlDone() implements CrawlCommand {}

public class Crawler {
    void doCrawl(String url, Queue<CrawlCommand> commandQueue) {
        // fetch the URL, extract links, for each send CrawlUrl() etc.
        // finally send CrawlDone()
    }

    public void crawl(String rootUrl) throws InterruptedException {
        try (var scope = StructuredTaskScope.open()) {
            var visited = new HashSet<String>();
            var commandQueue = new LinkedBlockingQueue<CrawlCommand>();

            scope.fork(() -> doCrawl(rootUrl, commandQueue));
            var inProgress = 1;

            while (inProgress > 0) {
                switch (commandQueue.take()) {
                    case CrawlUrl c -> {
                        if (visited.add(c.url())) {
                            inProgress++;
                            scope.fork(() -> 
                                doCrawl(c.url(), commandQueue));
                        }
                    }
                    case CrawlDone _ -> inProgress--;
                }
            }

            scope.join();
        }
    }
}

In the body of the scope, we maintain a set of links that have already been visited and a queue of commands. New subtasks can only be forked from the thread that created the scope (that's another constraint imposed by the API), hence this needs to be managed centrally.

We also need to maintain the number of tasks that are currently running, so that we know when the whole process is done. That's why the CrawlDone should be sent to the queue when a link is done processing.

Here, we are just crawling for the sake of it, but typically, you'd also need to aggregate the subtask results into an overall result. This could be done by implementing a custom Joiner, passed when opening the scope, or by collecting and inspecting the subtasks after scope.join() completes.

Now, let's consider what happens if one of the crawling subtasks throws an (unhandled) exception. Because we use the default Joiner, once any subtask fails, the whole scope is cancelled. This causes all other subtasks to be interrupted—and rightly so.

However, what happens to the "driver" of the process, the scope's main body? Most probably, it will hang indefinitely on commandQueue.take(). Unlike all the forked subtasks, the body of the scope is not interrupted. And under this design, it cannot be; there's no way to do it correctly (as such an interruption could "escape" the scope).

The solution is to properly handle errors in the subtasks and notify the scope body using a dedicated command. However, this approach relies on discipline & testing, which somewhat undermines the error-handling model of scopes. Specifically, that in the event of an exception, the scope is cancelled & resources are cleaned up.

Are we done yet?

Another problem arises when you have different types of subtasks, some of which run the main logic of the computation, and some of which play a supportive role.

Continuing the web crawler example, suppose you want to add some logic that periodically sends metrics on the number of links that have been found, in the background.

This can be coded as yet another subtask, started when the scope starts. The subtask would receive data from the crawling subtasks whenever they encounter a new link, using yet another queue.

However, such a monitoring subtask is different from the crawling subtasks: it never completes normally, and in fact, it's not expected to ever successfully complete. You might think of it as a demon thread vs. a user thread.

But what about the scope.join()? It will wait indefinitely until all subtasks are complete. Subtasks aren't cancellable (which, by the way, would be very useful for certain problems), so one way to solve this would be to send a dedicated command to the monitoring subtask's queue, notifying it that the crawling is done.

However, what we'd really like to do is to tell the scope, "we're done, let's cancel the scope now and compute the result". Currently, the only way to dynamically decide if the scope should be cancelled is by implementing a Joiner; but here, this won't work, as the completion depends on the logic contained in the main body (& in the command queue).

We can resort to a trick, such as maintaining an isDone flag and submitting an empty task to a custom joiner:

void main() throws ExecutionException, InterruptedException {
    var isDone = new AtomicBoolean(false);
    try (var scope = StructuredTaskScope.open(
        new CancellableJoiner<>(isDone))) {
        // some logic

        isDone.set(true);
        scope.fork(() -> {});

        scope.join();
    }
}

class CancellableJoiner<T> 
    implements StructuredTaskScope.Joiner<T, Void> {

    private final AtomicBoolean isDone;
    CancellableJoiner(AtomicBoolean isDone) { this.isDone = isDone; }

    @Override
    public boolean onFork(
        StructuredTaskScope.Subtask<? extends T> subtask) {
        return isDone.get();
    }

    @Override
    public Void result() throws Throwable {
        return null;
    }
}

This allows us to decide when the scope is done dynamically. But then, I don't think we should be using "tricks" in a brand-new concurrency API to solve quite regular problems.

The Joiner interface is a step forward compared to the previous incarnations of the JEP (it was introduced in the 5th preview), but I think it still falls short of the flexibility that some problems require. With the current approach, the scope's logic is inherently split between the Joiner implementation and the scope's body, making it challenging to keep those two in sync, especially since Joiner's methods can be run from any thread.

Nitpick: timeout

There are also two minor problems that I believe are relatively easy to solve, but they currently represent "rough edges" in the specification.

First, the timeout is a configuration of the scope:

try (var scope = StructuredTaskScope.open(
    Joiner.<T>allSuccessfulOrThrow(),
    cf -> cf.withTimeout(Duration.ofSeconds(5)))) {
        // scope's body
    }

While it could have been a method:

timeout(Duration.ofSeconds(5), () -> { 
    /* same as in the scope's body */ 
});

Implemented (in a library) using the basic machinery that StructuredTaskScope offers:

public class Timeout {
    public static <T> T timeout(Duration d, Callable<T> body) 
        throws InterruptedException {

        try (var scope = StructuredTaskScope.open(
            new FirstComplete<T>())) {
            scope.fork(() -> {
                Thread.sleep(d.toMillis());
                throw new TimeoutException();
            });

            var bodyTask = scope.fork(body);
            scope.join();
            return bodyTask.get();
        }
    }

    static final class FirstComplete<T> 
        implements StructuredTaskScope.Joiner<T, Void> {

        private volatile Throwable t;
        @Override
        public boolean onComplete(
            StructuredTaskScope.Subtask<? extends T> subtask) {
            if (subtask.state() == 
                StructuredTaskScope.Subtask.State.FAILED) {
                t = subtask.exception();
            }
            return true;
        }

        @Override
        public Void result() throws Throwable {
            if (t != null) {
                throw t;
            } else {
                return null;
            }
        }
    }
}

(as a bonus, that's how you can implement a custom Joiner, here one which implements race semantics, waiting for any task to complete—successfully or with an exception).

Why is there a dedicated configuration parameter for the entire scope? I'm not sure. It creates a special case for something that can be expressed using existing abstractions. Moreover, a timeout method is more lightweight (syntactically) than running an entire scope. And it opens up the possibility of implementing retries, repeats, and other resiliency patterns in a consistent way (that is, also as methods).

Nitpick: subtask.get() is non-blocking and confusing

Once you fork a subtask within a scope, you get a Subtask instance, which represents a running task within the scope. It has a .get() method; however, I find its behavior a bit surprising, especially considering the API of Future that has been in Java for a long time.

Future.get() in Java is blocking: the (virtual) thread will be blocked until the future is completed successfully or with an error. Alternatively, you can use the non-blocking Future.resultNow() to get the current value of the future.

Despite the same name, Subtask.get() works like Future.resultNow(). That is, it's non-blocking, and additionally should not be used before calling scope.join()—which of course is a matter of programming discipline, not something that the compiler can verify.

JEP 505's API is already challenging as it requires one to call its method in a specific order (scope.fork(), scope.join, subtask.get()). This in itself is fragile and an easy way to introduce a bug. However, naming introduces an additional source of confusion, as the semantics of subtask.get() are so different compared to long-existing APIs. Renaming a method is an easy fix, so I keep my fingers crossed that, if anything, at least this source of confusion will be removed.

Alternative designs

You might wonder, are these inherent problems that any structured concurrency implementation must face, or are there alternative designs?

There are, but offering a different set of tradeoffs. One of them is in the Jox project on which I'm working. It offers supervised concurrency scopes, where:

  • The main body of the scope is interrupted when the scope is being shut down due to an exception, just as any other fork
  • There are different types of forks (forkUser and fork), depending if the fork must complete for the scope to complete, or if it's a background "daemon" fork that should be interrupted when all user-forks complete
  • Forks can be created as cancellable
  • There's no fixed structure that a scope's body must follow. fork.join() is blocking, and new forks can be started at any time (including after some forks have already been joined)

But there are downsides as well:

  • Each scope starts an additional virtual thread, which runs the scope's body; the calling thread is used as a "supervisor"
  • The scope's body is passed as a lambda, instead of using Java's try-with-resources

Of course, the design space is possibly much wider! And to compare, the "simple example" from the introduction, written using Jox scopes, looks as follows:

Response handle() throws InterruptedException {
    return supervised(scope -> {
        Fork<String> user = scope.fork(() -> findUser());
        Fork<Integer> order = scope.fork(() -> fetchOrder());
        return new Response(user.join(), order.join());
    });
}

Notice that you no longer have to explicitly join the scope using scope.join(), and that there's no way to misuse obtaining the results of the computations using Subtask.get(). However, the scope's body is a lambda, not the body of a try-with-resources.

Why Structured Concurrency APIs are so important

One might say that the APIs offered by JEP 505 are designed for the most common, usually straightforward use-cases, and that they should first and foremost address their needs. At the same time, the more complex problems will still require high discipline when it comes to error or completion handling. Alternatively, solutions like Jox, built on the Java structured concurrency API, can be used in those cases.

However, my feeling is that the ambitions of the Structured Concurrency API are way beyond the simple cases: to offer a new building block for the future of concurrency in Java.

Whenever you need to distribute work across multiple virtual threads, StructuredTaskScope should be your go-to tool, rather than creating them manually. Just as parallel streams are the tool to use for CPU-intensive tasks, here we have a solution for IO-bound ones.

The importance of Structured Concurrency within the Java concurrency ecosystem is also exemplified by Scoped Values, a feature that has been made stable in Java 25. ScopedValues are designed as a successor of ThreadLocals, tailored to work well with virtual threads.

One of the problems with ThreadLocals is that they are not inherited across threads; InheritableThreadLocal is expensive and should not be used with virtual threads. ScopedValues provide a solution: they are inherited, but only within structured concurrency scopes!

Hence, to use one of the main features of scoped values and enjoy working with inherited local values, you'll have to use StructuredTaskScope! We are now encouraged to create as many threads as needed, at will, without worrying (too much) about overhead—hence inheritance of context is now much more important than with thread pools.

TL;DR

Summing up, while powerful, the current design of the JEP has a couple of smaller and bigger problems:

  • non-uniform cancellation, as the body of the scope does not participate in the error handling mechanisms of the subtasks
  • scope's logic split between the Joiner implementation and the scope's body, making it hard for the scope's body to cancel the scope when it determines the job is done
  • the redundant timeout configuration parameter, as timeout can be implemented using the mechanisms of StructuredTaskScope directly, moreover offering a more lightweight API
  • confusing method naming, as Subtask.get() has different semantics than Future.get().

There are still a couple of Java iterations before Java 29 (the next LTS), so hopefully the JEP will be further refined, building on top of the great work delivered so far, to remediate or fix some of the problems outlined above.

Have you tested JEP 505? What kind of problems does it fit, and where did you encounter problems?

Reviewed by Dariusz Broda

Blog Comments powered by Disqus.