Contents

Propagating OpenTelemetry context when using Virtual Threads & Structured Concurrency

Propagating OpenTelemetry context when using Virtual Threads & Structured Concurrency webp image

The Context plays a central role in OpenTelemetry: it stores the metadata of the current invocation chain, such as the trace currently in progress. This allows correlating multiple spans into a single logical unit.

In the Java OpenTelemetry SDK, the Context is by default passed using ThreadLocals. Does this work as expected when using Java 21's+ Virtual Threads and Structured Concurrency? And if not, what are the problems?

Sequential invocations

Let's first examine a simple case of sequential invocations, where each starts a span. No threads are involved yet, but this will be useful as a baseline.

In the below example, we have a parent method starting a parent span, which invokes two nested methods, each of which starts a nested span. Here's the code:

import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;

import java.util.concurrent.ExecutionException;

public class Demo {
  private static Void nested(int id, Tracer tracer) throws InterruptedException {
    var span = tracer.spanBuilder("nested-test-span-" + id).startSpan();
    try (var scope = span.makeCurrent()) {
      Thread.sleep(500);
      // pretending to do some work—e.g. sending an HTTP request
    } finally {
      span.end();
    }

    return null;
  }

  private static void parent(Tracer tracer) throws InterruptedException {
    var span = tracer.spanBuilder("parent-test-span").startSpan();
    try (var scope = span.makeCurrent()) {
      nested(1, tracer);
      nested(2, tracer);
    } finally {
      span.end();
    }
  }

  public static void main(String[] args) throws InterruptedException, ExecutionException {
    var otel = AutoConfiguredOpenTelemetrySdk.initialize();

    // span
    var tracer = otel.getOpenTelemetrySdk().getTracer("otel-demo");
    parent(tracer);

    // make sure telemetry is sent
    Thread.sleep(1000);
  }
}

In the main method, we initialize OpenTelemetry using environment variables. The following ones are useful for testing:

OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
OTEL_METRIC_EXPORT_INTERVAL=500
OTEL_SERVICE_NAME=demo

The exporter endpoint should point to a running instance, e.g., using Grafana's LGTM, a trivial one-container Docker setup. The accompanying docker-compose.yml is as follows:

services:
  # OpenTelemetry Collector, Prometheus, Loki, Tempo, Grafana
  observability:
    image: 'grafana/otel-lgtm'
    ports:
      - '3000:3000' # Grafana's UI
      - '4317:4317' # Exporter

That way, we get both an exporter and a UI, which helps visualize test results.

The parent method starts a span and invokes nested twice, where each starts a child span. After running the example, you should see something similar to the following in the UI:

sequential

So far, so good. Let's now try to add some parallelism.

Parallel invocations

If the nested invocations are independent and take a non-trivial amount of time (they might perform some I/O, for example), we might run them in parallel. There are a couple of ways to do this, but we'll take the route paved by the Java structured concurrency JEP (available in preview from Java 21).

For a more thorough introduction to structured concurrency, see, e.g. this article; here we'll just use the StructuredTaskScope API. The core property of structured concurrency is that the syntactical structure of the code determines the lifetime of threads running the subtasks.

That is since the StructuredTaskScope is nested within the try (var scope = span.makeCurrent()), it's guaranteed that any threads started in the task scope are finished by the time we close the observability scope & span:

private static Void nested(int id, Tracer tracer) throws InterruptedException {
  var span = tracer.spanBuilder("nested-test-span-" + id).startSpan();

  try (var scope = span.makeCurrent()) {
    Thread.sleep(500);
  } finally {
    span.end();
  }

  return null;
}

private static void parent(Tracer tracer) throws InterruptedException, ExecutionException {
  var span = tracer.spanBuilder("parent-test-span").startSpan();
  try (var scope = span.makeCurrent()) {
    try (var taskScope = new StructuredTaskScope.ShutdownOnFailure()) {
      Supplier<Void> n1 = taskScope.fork(() -> nested(1, tracer));
      Supplier<Void> n2 = taskScope.fork(() -> nested(2, tracer));

      taskScope.join().throwIfFailed();

      n1.get();
      n2.get();
    }
  } finally {
    span.end();
  }
}

In the code above, we start two tasks running in the background and then wait until they are completed using .join(). This is similar to using futures, but with the additional guarantee that even if one of the tasks fails, the other won't be left "dangling", but will be interrupted instead.

However, the observability context is no longer propagated when we try to run this example. Instead of a single correlated trace, we get three separate ones:

parallel1
parallel2
parallel3

The total time of the parent's span went down to 500ms (due to parallelization), but we lost observability into the entire process.

Parallel invocations, with observability

What's the problem? Remember that the OpenTelemetry Context is propagated using ThreadLocals. This works great when correlating invocations and spans within a single thread; however, it breaks when using multiple threads: either "normal" (that is, "platform" threads) or virtual ones. That's because ThreadLocals are not inherited.

OpenTelemetry tries to fix that problem in a couple of ways. For example, it's possible to wrap an Executor using Context.taskWrapping() so that the context is propagated to any tasks started using that executor. However, there are no such utilities for structured concurrency (yet)—and for a good reason, as the API in question is still in preview.

Hence, we have to implement such propagation by hand. One possibility is to provide a ThreadFactory when creating the StructuredTaskScope. Such a custom factory might take care of propagating the context manually. In a way, we implement inheritance of the context-holding thread-local value by hand:

static class PropagatingVirtualThreadFactory implements ThreadFactory {
  private ThreadFactory delegate = Thread.ofVirtual().factory();
  @Override
  public Thread newThread(Runnable r) {
    var parentContext = Context.current();
    return delegate.newThread(() -> {
      parentContext.makeCurrent();
      r.run();
    });
  }
}

In the custom factory, we first capture the current context when a new thread is created using Context.current(). Then, we create a new virtual thread (by delegating to a JDK-created Thread.ofVirtual().factory()), and before running the code block, set the captured context as "current", this time in the newly created thread.

The code changes to the parent method are minimal:

private static void parent(Tracer tracer) throws InterruptedException, ExecutionException {
  var span = tracer.spanBuilder("parent-test-span-1").startSpan();
  try (var scope = span.makeCurrent()) {
    try (var taskScope = new StructuredTaskScope.ShutdownOnFailure(null, 
          new PropagatingVirtualThreadFactory())) {
      Supplier<Void> n1 = taskScope.fork(() -> nested(1, tracer));
      Supplier<Void> n2 = taskScope.fork(() -> nested(2, tracer));

      taskScope.join().throwIfFailed();

      n1.get();
      n2.get();
    }
  } finally {
    span.end();
  }
}

When running the example, the spans are correlated once again within a single trace:

parallel_fixed

What about ScopedValues?

Alongside the structured concurrency JEP, Java also introduced a ScopedValue API (see the JEP), a next-generation ThreadLocal, which is inherited when creating child virtual threads.

Scoped values are designed to be used alongside StructuredTaskScopes in a structured way. Citing an example from the JEP:

private static final ScopedValue<String> X = ScopedValue.newInstance();

void foo() {
  where(X, "hello").run(() -> bar());
}

void bar() {
  System.out.println(X.get()); // prints hello
  where(X, "goodbye").run(() -> baz());
  System.out.println(X.get()); // prints hello
}

void baz() {
  System.out.println(X.get()); // prints goodbye
}

Note that a new value for a ScopedValue can only be set for the duration of evaluating the code block passed using where(...).run(...). Once again, the syntactical structure of the code determines the lifetime—here, it's of the value binding, while before, it was the thread's lifetime.

While this design has specific nice properties, it is unsuitable for implementing an alternative OpenTelemetry ContextStorage. Implementations of this interface are responsible for storing and propagating the OpenTelemetry context, and as we mentioned earlier, by default, a ThreadLocalContextStorage is used.

It might be tempting to create a ScopedValueContextStorage implementation and thus have the context automatically propagated when creating child virtual threads, however, the context storage API is unstructured:

public interface ContextStorage {
  Scope attach(Context toAttach);
  Context current();
}

The lifetime of a context attached to a storage doesn't need to follow the code's structure (even if that is usually the case when using the try-with-resources construct). Instead, the attach method returns a Scope, which can be closed by the code at an arbitrary moment.

This design makes scoped values and context storage fundamentally incompatible.

In summary

"Out of the box", no integration is available between the OpenTelemetry Java SDK and Virtual Threads, other than propagating the context within a single thread.

However, Virtual Threads encourage, by their design, the free creation of new forks, which parallelize code execution, increasing the performance / throughput of the services involved. Doing so means that the telemetry context becomes lost, causing uncorrelated spans to be reported as individual traces instead of a single combined trace.

This can be avoided by manually propagating the context. The simplest form is to capture the parent context before forking a computation and re-set it in the child thread. Such propagation can be somewhat automated using a custom ThreadFactory, which is also compatible with the StructuredTaskScope API.

Finally, it seems that without design changes to the ContextStorage, it won't be possible to create an implementation leveraging the upcoming ScopedValues. These solve the inheritance problem but also follow a structured usage approach that must mirror the syntactical structure of the code—which is at odds with how ContextStorage works.

Blog Comments powered by Disqus.