Propagating OpenTelemetry context when using Virtual Threads & Structured Concurrency
The Context
plays a central role in OpenTelemetry: it stores the metadata of the current invocation chain, such as the trace currently in progress. This allows correlating multiple spans into a single logical unit.
In the Java OpenTelemetry SDK, the Context
is by default passed using ThreadLocal
s. Does this work as expected when using Java 21's+ Virtual Threads and Structured Concurrency? And if not, what are the problems?
Sequential invocations
Let's first examine a simple case of sequential invocations, where each starts a span. No threads are involved yet, but this will be useful as a baseline.
In the below example, we have a parent method starting a parent span, which invokes two nested methods, each of which starts a nested span. Here's the code:
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import java.util.concurrent.ExecutionException;
public class Demo {
private static Void nested(int id, Tracer tracer) throws InterruptedException {
var span = tracer.spanBuilder("nested-test-span-" + id).startSpan();
try (var scope = span.makeCurrent()) {
Thread.sleep(500);
// pretending to do some work—e.g. sending an HTTP request
} finally {
span.end();
}
return null;
}
private static void parent(Tracer tracer) throws InterruptedException {
var span = tracer.spanBuilder("parent-test-span").startSpan();
try (var scope = span.makeCurrent()) {
nested(1, tracer);
nested(2, tracer);
} finally {
span.end();
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
var otel = AutoConfiguredOpenTelemetrySdk.initialize();
// span
var tracer = otel.getOpenTelemetrySdk().getTracer("otel-demo");
parent(tracer);
// make sure telemetry is sent
Thread.sleep(1000);
}
}
In the main
method, we initialize OpenTelemetry using environment variables. The following ones are useful for testing:
OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
OTEL_METRIC_EXPORT_INTERVAL=500
OTEL_SERVICE_NAME=demo
The exporter endpoint should point to a running instance, e.g., using Grafana's LGTM, a trivial one-container Docker setup. The accompanying docker-compose.yml
is as follows:
services:
# OpenTelemetry Collector, Prometheus, Loki, Tempo, Grafana
observability:
image: 'grafana/otel-lgtm'
ports:
- '3000:3000' # Grafana's UI
- '4317:4317' # Exporter
That way, we get both an exporter and a UI, which helps visualize test results.
The parent
method starts a span and invokes nested
twice, where each starts a child span. After running the example, you should see something similar to the following in the UI:
So far, so good. Let's now try to add some parallelism.
Parallel invocations
If the nested invocations are independent and take a non-trivial amount of time (they might perform some I/O, for example), we might run them in parallel. There are a couple of ways to do this, but we'll take the route paved by the Java structured concurrency JEP (available in preview from Java 21).
For a more thorough introduction to structured concurrency, see, e.g. this article; here we'll just use the StructuredTaskScope
API. The core property of structured concurrency is that the syntactical structure of the code determines the lifetime of threads running the subtasks.
That is since the StructuredTaskScope
is nested within the try (var scope = span.makeCurrent())
, it's guaranteed that any threads started in the task scope are finished by the time we close the observability scope & span:
private static Void nested(int id, Tracer tracer) throws InterruptedException {
var span = tracer.spanBuilder("nested-test-span-" + id).startSpan();
try (var scope = span.makeCurrent()) {
Thread.sleep(500);
} finally {
span.end();
}
return null;
}
private static void parent(Tracer tracer) throws InterruptedException, ExecutionException {
var span = tracer.spanBuilder("parent-test-span").startSpan();
try (var scope = span.makeCurrent()) {
try (var taskScope = new StructuredTaskScope.ShutdownOnFailure()) {
Supplier<Void> n1 = taskScope.fork(() -> nested(1, tracer));
Supplier<Void> n2 = taskScope.fork(() -> nested(2, tracer));
taskScope.join().throwIfFailed();
n1.get();
n2.get();
}
} finally {
span.end();
}
}
In the code above, we start two tasks running in the background and then wait until they are completed using .join()
. This is similar to using futures, but with the additional guarantee that even if one of the tasks fails, the other won't be left "dangling", but will be interrupted instead.
However, the observability context is no longer propagated when we try to run this example. Instead of a single correlated trace, we get three separate ones:
The total time of the parent's span went down to 500ms (due to parallelization), but we lost observability into the entire process.
Parallel invocations, with observability
What's the problem? Remember that the OpenTelemetry Context
is propagated using ThreadLocal
s. This works great when correlating invocations and spans within a single thread; however, it breaks when using multiple threads: either "normal" (that is, "platform" threads) or virtual ones. That's because ThreadLocal
s are not inherited.
OpenTelemetry tries to fix that problem in a couple of ways. For example, it's possible to wrap an Executor
using Context.taskWrapping()
so that the context is propagated to any tasks started using that executor. However, there are no such utilities for structured concurrency (yet)—and for a good reason, as the API in question is still in preview.
Hence, we have to implement such propagation by hand. One possibility is to provide a ThreadFactory
when creating the StructuredTaskScope
. Such a custom factory might take care of propagating the context manually. In a way, we implement inheritance of the context-holding thread-local value by hand:
static class PropagatingVirtualThreadFactory implements ThreadFactory {
private ThreadFactory delegate = Thread.ofVirtual().factory();
@Override
public Thread newThread(Runnable r) {
var parentContext = Context.current();
return delegate.newThread(() -> {
parentContext.makeCurrent();
r.run();
});
}
}
In the custom factory, we first capture the current context when a new thread is created using Context.current()
. Then, we create a new virtual thread (by delegating to a JDK-created Thread.ofVirtual().factory()
), and before running the code block, set the captured context as "current", this time in the newly created thread.
The code changes to the parent
method are minimal:
private static void parent(Tracer tracer) throws InterruptedException, ExecutionException {
var span = tracer.spanBuilder("parent-test-span-1").startSpan();
try (var scope = span.makeCurrent()) {
try (var taskScope = new StructuredTaskScope.ShutdownOnFailure(null,
new PropagatingVirtualThreadFactory())) {
Supplier<Void> n1 = taskScope.fork(() -> nested(1, tracer));
Supplier<Void> n2 = taskScope.fork(() -> nested(2, tracer));
taskScope.join().throwIfFailed();
n1.get();
n2.get();
}
} finally {
span.end();
}
}
When running the example, the spans are correlated once again within a single trace:
What about ScopedValues
?
Alongside the structured concurrency JEP, Java also introduced a ScopedValue
API (see the JEP), a next-generation ThreadLocal
, which is inherited when creating child virtual threads.
Scoped values are designed to be used alongside StructuredTaskScope
s in a structured way. Citing an example from the JEP:
private static final ScopedValue<String> X = ScopedValue.newInstance();
void foo() {
where(X, "hello").run(() -> bar());
}
void bar() {
System.out.println(X.get()); // prints hello
where(X, "goodbye").run(() -> baz());
System.out.println(X.get()); // prints hello
}
void baz() {
System.out.println(X.get()); // prints goodbye
}
Note that a new value for a ScopedValue
can only be set for the duration of evaluating the code block passed using where(...).run(...)
. Once again, the syntactical structure of the code determines the lifetime—here, it's of the value binding, while before, it was the thread's lifetime.
While this design has specific nice properties, it is unsuitable for implementing an alternative OpenTelemetry ContextStorage
. Implementations of this interface are responsible for storing and propagating the OpenTelemetry context, and as we mentioned earlier, by default, a ThreadLocalContextStorage
is used.
It might be tempting to create a ScopedValueContextStorage
implementation and thus have the context automatically propagated when creating child virtual threads, however, the context storage API is unstructured:
public interface ContextStorage {
Scope attach(Context toAttach);
Context current();
}
The lifetime of a context attached to a storage doesn't need to follow the code's structure (even if that is usually the case when using the try-with-resources construct). Instead, the attach
method returns a Scope
, which can be closed by the code at an arbitrary moment.
This design makes scoped values and context storage fundamentally incompatible.
In summary
"Out of the box", no integration is available between the OpenTelemetry Java SDK and Virtual Threads, other than propagating the context within a single thread.
However, Virtual Threads encourage, by their design, the free creation of new forks, which parallelize code execution, increasing the performance / throughput of the services involved. Doing so means that the telemetry context becomes lost, causing uncorrelated spans to be reported as individual traces instead of a single combined trace.
This can be avoided by manually propagating the context. The simplest form is to capture the parent context before forking a computation and re-set it in the child thread. Such propagation can be somewhat automated using a custom ThreadFactory
, which is also compatible with the StructuredTaskScope
API.
Finally, it seems that without design changes to the ContextStorage
, it won't be possible to create an implementation leveraging the upcoming ScopedValue
s. These solve the inheritance problem but also follow a structured usage approach that must mirror the syntactical structure of the code—which is at odds with how ContextStorage
works.