Structured concurrency and scoped values in Java
The work on JEP-428 resulted in the introduction of structured concurrency in Java 19. In this article, we will delve into the concept of structured concurrency, compare it with the current API, and explore the problems it solves.
Find out what Java Experts say about the newest Java release: Java21
What is structured concurrency
Structured concurrency is a paradigm that facilitates and simplifies concurrent programming, allowing you to take a structured approach to multithreaded code. This enhances resource management and error-handling predictability, reducing common pitfalls associated with uncontrolled spawning and termination of threads or tasks, like thread leaks, leading to more maintainable, reliable, and efficient concurrent code.
Check:
In addition, the Java implementation uses virtual threads from the Loom project, making it less costly for us to create new threads.
Example
To illustrate well what problems we can encounter in the current API, we will use the example of an invoice. We can do it on one thread, download data about the invoice issuer, customer, and purchased items one by one. However, we want to download this data in parallel to speed up the process. In the first example, we'll use ExecutorService
and see what happens.
(Un)structured concurrency
Invoice generateInvoice() throws ExecutionException, InterruptedException {
try (ExecutorService executorService = Executors.newFixedThreadPool(3)) {
Future<Issuer> issuer = executorService.submit(this::findIssuer);
Future<Customer> customer = executorService.submit(this::findCustomer);
Future<List<Item>> items = executorService.submit(this::findItems);
return new Invoice(issuer.get(), customer.get(), items.get());
}
}
This approach brings with it a couple of problems:
- When
findCustomer
takes longer to execute thanfindItems
, andfindItems
throws an exception, code will be blocked atcustomer.get()
, till the thread executingfindCustomer
ends. As a result, we will waste resources waiting for the subtask to finish. - If the thread calling
generateInvoice()
is interrupted, the subtasks in the executor are not interrupted and get leaked. They can even operate indefinitely without control.
Structured concurrency
Invoice generateInvoice() throws ExecutionException, InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // [1]
Supplier<Issuer> issuer = scope.fork(this::findIssuer);
Supplier<Customer> customer = scope.fork(this::findCustomer);
Supplier<List<Item>> items = scope.fork(this::findItems);
scope.join().throwIfFailed(); // [2]
return new Invoice(issuer.get(), customer.get(), items.get());
}
}
- [1]: We create a new StructuredTaskScope with the policy ShutdownOnFailure. If an exception is thrown by findIssuer, findCustomer, or findItems, each thread will be stopped and an error will be returned.
- [2]: Waits for all subtasks in scope to be finished and throws ExecutionException on any failure
The code below is free of the problems mentioned above:
- When
findCustomer
takes longer to execute thanfindItems
, butfindItems
throws exception,findItems
will be terminated - If the thread calling
generateInvoice()
is interrupted, cancellation is propagated to subtasks - All subtasks will be completed before leaving the scope
At first glance, we can see that not much has changed, the structure of the code is very similar. Instead of using ExecutorService
, we use StructuredTaskScope
, both implement AutoClosable
, so we're able to use try-with-resources. But they differ in a couple of things, one of the biggest is that most implementations of ExecutorService
use regular threads, and StructuredTaskScope
uses the new lightweight threads from the Loom project. We can see in the code that ExecutorService
returns objects wrapped in Future<T>
, and in the other case, it is Supplier<T>
.
They have a different purpose and lifecycle, more precisely, ExecutorService
does not have it clearly defined, it can exist throughout the application lifecycle and act as a simplified interface for thread pools. The tasks themselves in ExecutorService
may or may not be related to each other.
The StructuredTaskScope
for this is to help manage subtasks that need to be completed before the main task and are related to each other in some way. In addition, the new API for structured concurrency has mechanisms to define the relationship between these subtasks, they are called Policy
and are explained in the next section.
Structured concurrency policies
A policy is a set of rules for how a StructuredTaskScope
should behave in the event of success or failure of one or more subtasks. So far in the article. we have not mentioned other policies, we have focused on ShutdownOnFailure
, this is one of the two basic implementations that is in the language standard.
ShutdownOnFailure
If any subtask fails, the rest of the subtasks are interrupted. Think of the example with invoicing - getting it right depends on having all the necessary information, such as customer details, issuer details, and items. If any of this information is missing, we are unable to generate the invoice correctly.
ShutdownOnSuccess
It stops the remaining threads once any subtasks have been finished successfully, or throws an exception when all subtasks fail. This is useful when receiving identical information from multiple sources simultaneously, with execution speed being prioritized over specific sources.
Other Policies
Although we only have 2 standard policies, we can create our own policy implementation based on our requirements. To do this, we need to create our own class by extending StructuredTaskScope<T>
.
Summary
Structured concurrency is a very good addition to the standard Java API that will simplify writing concurrent code. This is another new feature that takes advantage of the promising Loom Project, which could change the approach to asynchronicity and multithreading in Java in the near future.
If you would like to know more about the Loom project, read our blog posts.
Reviewed by: Rafał Maciak, Łukasz Rola, Dariusz Broda