Contents

Limits of Loom's performance

Limits of Loom's performance webp image

Some time ago, I stumbled across a gist comparing the performance of various streaming approaches on the JVM. This prompted me to check how a similar test—passing 10 million elements through an async barrier and summing them up—behaves when using Ox's channels.

The results were quite poor—depending on the buffer size—10-20x times slower than the best contender. Is it due to the implementation of Ox (which is at a prototype stage, without any optimization work) or some other factors, such as the usage of Java's virtual threads?

But let's start with a brief overview of Ox's channels. At their core, they allow direct-style, synchronous communication between virtual threads (which were introduced as part of project Loom in Java 21).

An Ox channel is similar to a limited-capacity queue, with the additional possibility of signaling completion, reporting errors, and using Go-like selects: which guarantee that exactly one clause will be evaluated when data or space in the channel is available. This combination of limited space, cancellability, and exactly-one-clauses make it necessary for a channels implementation to include a non-trivial amount of thread synchronization.

Porting Go-like channels to the JVM isn't a new idea: Kotlin's Channels implement a similar feature set, however, their design is based on coroutines and suspendable functions, instead of virtual threads. The Kotlin compiler transforms any usages of suspendable functions into a state machine, and the Kotlin runtime then manages scheduling of such functions, transformed into continuation-passing-style (CPS) onto a limited pool of platform threads.

All three implementations, Go, Ox, and Kotlin, deliver channels in two flavors: buffered (where sends don't block as long as the buffer isn't filled) and rendezvous (where two threads must "meet" to exchange values—as if the buffer had size 0). Needless to say, rendezvous channels are always slower than buffered ones. Let's restrict our perspective to only rendezvous channels and investigate the limits of performance of the various approaches mentioned above.

Check: Go-like channels in Scala: receive, send, and default clauses

Trying to port Kotlin's design to Loom

After learning about the performance differences between Kotlin and Ox channels, I started looking into the implementation of Kotlin's Channels. Their algorithm is well described in the Fast and Scalable Channels in Kotlin Coroutines paper by Koval, Alistarh, and Elizarov. The implementation has some additional corner cases covered. Still, the code is clean and quite well documented, so at least in theory, it should be possible to replicate the design.

There is, however, one crucial difference: the Kotlin implementation heavily relies on accessing and storing continuations of suspendable functions. This is natively available in Kotlin's coroutines. On the other hand, the Ox implementation would be relying on virtual threads. And in Java, we don't have direct access to the continuation (at least yet—that's in scope of Project Loom for the future)—so that part will need to be somehow simulated using existing concurrency primitives.

I started with a simple case: a rendezvous channel supporting only send() and receive(), following the approach outlined in the paper, with an additional simplification that instead of an infinite array (each operation reserves a brand new location), I've used a fixed 20-million-element pre-allocated array (which was enough for the tests). While this is unusable in the real world, it should give us some insights into what the performance of a complete implementation might be.

To my surprise, the performance of such an implementation was still far behind the Kotlin one (about 20x). This prompted a more thorough investigation.

10 millions meets & greets

To simplify the problem even further, I decided to look into the implementation of performing a rendezvous between two threads 10 million times, using Kotlin+coroutines and Java+virtual threads.

The problem statement is then:

  • we've got two threads: a sender and a receiver
  • the sender produces consecutive numbers, from 1 to 10 000 000
  • the receiver sums up the received numbers (the result should be 50 000 005 000 000)
  • the sender & receiver threads must meet to exchange the value: hence, the sender must block if the receiver isn't ready, and vice versa

We'll try to avoid using any of the built-in concurrent data structures, instead relying on primitive operations made available by each platform.

Let's first look at the Kotlin implementation:

import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine

fun usingCoroutines() {
  timed("coroutines") {
    runBlocking {
      val max = 10_000_000L
      // storing a Pair<Continuation<Unit>, Long> or Continuation<Long>
      val waiting = AtomicReference<Any>(null)

      launch {
        for (i in 1..max) {
          suspendCoroutine<Unit> { ourContinuation ->
            // if the CAS is successful, our coroutine gets suspended
            if (!waiting.compareAndSet(null, ourContinuation to i)) {
              val theirContinuation = waiting.get() as Continuation<Long>
              waiting.set(null)

              // if the CAS is not successful, we're resuming our
              // continuation, as the other party is already waiting
              theirContinuation.resume(i.toLong())
              ourContinuation.resume(Unit)
            }
          }
        }
      }

      launch {
        var sum = 0L

        for (i in 1..max) {
          val toAdd = suspendCoroutine<Long> { ourContinuation ->
            if (!waiting.compareAndSet(null, ourContinuation)) {
              val theirDataAndContinuation =
                waiting.get() as Pair<Continuation<Unit>, Long>
              waiting.set(null)

              theirDataAndContinuation.first.resume(Unit)
              ourContinuation.resume(theirDataAndContinuation.second)
            }
          }
          sum += toAdd
        }

        assert(sum == sumUpTo(max))
      }
    }
  }
}

inline fun <T> timed(label: String, block: () -> T): T {
  val start = System.currentTimeMillis()
  val result = block()
  val end = System.currentTimeMillis()
  println("$label Took ${end—start}ms")
  return result
}

fun sumUpTo(n: Long): Long = n * (n + 1) / 2

fun main(args: Array<String>) {
  for (i in 1..10) {
    println("Run $i")
    usingCoroutines()
  }
}

We've got two concurrently running coroutines (corresponding to the launch invocations). In every iteration, both coroutines first obtain their continuation object, and then race to set it in an atomic reference. The coroutine that wins this race gets suspended, waiting for the other party. The losing one first nulls out the waiting reference (so that subsequent iteration can perform the race again), and resumes both continuations with the appropriate values.

This code is quite far from the rendezvous channel implementation in Kotlin, but it captures the core idea of storing the continuation of the party that has to wait for a partner to exchange a value.

On my laptop (M1 Max), after warm-up, the average run-time for the above is an impressive 750 ms. Let's now turn our attention to Java.

Rendezvous with virtual threads

The Java implementation will be in some ways similar to the Kotlin one: we'll use an AtomicReference and its CAS to race the two threads and determine in each round which one waits and which one resumes its partner. However, since, as already mentioned, we don't have direct access to the continuation, we'll need another way of blocking the thread.

The lowest-level primitive for thread blocking that I've been able to find is LockSupport. It exposes a way to park the current thread and unpark a given one. However, park might also return without a matching unpark call, so it's necessary to additionally synchronize in some other way to verify that the resume condition is indeed fulfilled.

Here's the code:

import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

public class Rendezvous {
  private final AtomicReference<ThreadAndCell> waiting =
     new AtomicReference<>();

  public void test() throws Exception {
    long start = System.currentTimeMillis();

    final int max = 10_000_000;

    Thread t1 = Thread.ofVirtual().start(() -> {
      Thread ourThread = Thread.currentThread();

      for (int i = 0; i <= max; i++) {
        AtomicReference<Integer> ourCell = new AtomicReference<>(i);

        if (waiting.compareAndSet(null,
          new ThreadAndCell(ourThread, ourCell))) {
          // CAS was successful, we are the first thread: parking and
          // waiting for the data to be consumed
          while (ourCell.get() != -1) {
            LockSupport.park();
          }
        } else {
          // CAS was unsuccessful, there is already a thread waiting for
          // us: clearing `waiting` for the next iteration, sending the
          // data using the provided cell and unparking the other thread
          ThreadAndCell other = waiting.get();
          waiting.set(null);

          other.cell.set(i);

          LockSupport.unpark(other.thread);
        }
      }
    });

    Thread t2 = Thread.ofVirtual().start(() -> {
      long acc = 0L;
      Thread ourThread = Thread.currentThread();

      for (int i = 0; i <= max; i++) {
        // -1 -> no data provided yet
        AtomicReference<Integer> ourCell = new AtomicReference<>(-1);
        if (waiting.compareAndSet(null,
          new ThreadAndCell(ourThread, ourCell))) {
          // CAS was successful, we are the first thread: parking and
          // waiting for the data to be provided
          while (ourCell.get() == -1) {
            LockSupport.park();
          }
          acc += ourCell.get();
        } else {
          // CAS was unsuccessful, there is already a thread waiting for
          // us: clearing `waiting` for the next iteration, consuming the
          // data and unparking the other thread
          ThreadAndCell other = waiting.get();
          waiting.set(null);

          acc += other.cell.get();
          other.cell.set(-1);

          LockSupport.unpark(other.thread);
        }
      }

      assert acc == sumUpTo(max);
    });

    t1.join();
    t2.join();

    long end = System.currentTimeMillis();
    System.out.println("Took: " + (end—start) + " ms");
  }

  private long sumUpTo(int max) {
    return ((long) max * (max + 1)) / 2;
  }

  private record ThreadAndCell(Thread thread,
    AtomicReference<Integer> cell) {}

  public static void main(String[] args) throws Exception {
    for (int i=0; i<10; i++) {
      new Rendezvous().test();
    }
  }
}

The code isn't exactly trivial, but it does produce the correct sum, and the two threads perform 10 million rendezvous meetings. What's the performance? Well, not so good: I've got, on average, a result of 17.5 seconds (compared to Kotlin's 750 ms). A significant difference! Can we do any better?

Migrate to Java 21. Partner with Java experts to modernize your codebase, improve developer experience and employee retention. Explore the offer >>

Inspiration from SynchronousQueue

Java has a built-in concurrent data structure implementing the exact above scenario, SynchronousQueue. When using it, our example becomes significantly simpler:

import java.util.concurrent.SynchronousQueue;

public class RendezvousUsingSynchronousQueue {
  public static void test() throws Exception {
    long startTime = System.currentTimeMillis();
    final int max = 10_000_000;
    SynchronousQueue<Integer> data = new SynchronousQueue<>();

    Thread t1 = Thread.ofVirtual().start(() -> {
      int i = 0;
      while (i <= max) {
        try {
          data.put(i);
        } catch (InterruptedException e) {
          throw new RuntimeException(e);
        }
        i += 1;
      }
    });

    Thread t2 = Thread.ofVirtual().start(() -> {
      long acc = 0L;
      for (int i = 0; i <= max; i++) {
        try {
          acc += data.take();
        } catch (InterruptedException e) {
          throw new RuntimeException(e);
        }
      }
      assert acc == sumUpTo(max);
    });

    t1.join();
    t2.join();

    long endTime = System.currentTimeMillis();
    System.out.println("Took: " + (endTime—startTime) + " ms");
  }

  public static long sumUpTo(int max) {
    return (long) max * (max + 1) / 2;
  }

  public static void main(String[] args) throws Exception {
    for (int i = 0; i < 10; i++) {
      test();
    }
  }
}

Moreover, doing some test runs, it turns out that the performance is much better but still quite far from Kotlin's: between 5.5 and 8.5 seconds. What did we do wrong in the original rendezvous implementation?

Reading the code of SynchronousQueue is far from a trivial task. However, as far as I understand, there's an important optimization in place. You might have noticed that LockSupport.park is called in a loop, checking after each one if the condition allowing the code to progress is fulfilled. SynchronousQueue has a similar loop, but in the first iteration, it performs a Thread.yield, instead of parking the thread.

And indeed, introducing a similar change to our rendezvous implementation yields run times between 5.5 and 7 seconds. Similar to using SynchronousQueue, and with similar high variance in the timings.

Eliminating variance

This high variance is quite suspicious—where might it be coming from? The code has some indeterminism due to the race in the AtomicReference's CAS, but whoever wins runs very similar logic.

My suspicion was on parallelism: that different runs sometimes schedule both virtual threads on the same platform threads and sometimes on different ones. Hence, I limited parallelism by using only a single platform thread so that all virtual threads are scheduled on the same one (improving the usage of CPU-level caching, for example). This amounts to adding the following JVM options:

-Djdk.virtualThreadScheduler.parallelism=1
-Djdk.virtualThreadScheduler.maxPoolSize=1
-Djdk.virtualThreadScheduler.minRunnable=1

As it turns out, with this modification, both rendezvous implementations (using LockSupport.park only and with the initial Thread.yield) were faster, consistently taking about 5.1 seconds to complete—a huge difference, compared to the initial 17.5 s on the unconstrained test. Similarly, the SynchronousQueue code took about 5 seconds.

Being more active

Update 26/10/2023

Prompted by a comment from Alexandru Nedelcu, I decided to try some scenarios that perform active waiting more aggressively. So far, we tested with a single, optional Thread.yield.

Increasing the number of yields gives some promising results! It turns out that when doing Thread.yield up to 4 times (instead of just up to 1 time), we can eliminate the variance and bring down execution times to about 2.3 seconds. Increasing the number of yields doesn't bring any more improvement.

In addition to yielding, I tried out the option of calling Thread.onSpinWait while in a busy-loop—each time checking if the condition to terminate the loop (content of the AtomicReference) became true.

Doing a single, 10, or even 100 such busy-loops brings no improvement. But once we do about 1000 iterations, we can see run times go down significantly, although with a high variance: between 2 and 8.5 seconds.

int doSpin = spinIterations; // best result with 10000
int doYield = yieldIterations; // best result with 4
while (ourCell.get() != -1) {
  if (doSpin > 0) {
    Thread.onSpinWait();
    doSpin -= 1;
  } else if (doYield > 0) {
    Thread.yield();
    doYield -= 1;
  } else {
    LockSupport.park();
  }
}

However, increasing the number of busy-loop iterations to 10000, we get an average run-time of just below 2 seconds and no variance! This is the winning combination.

Out of curiosity, I also checked combinations of yielding + busy-looping, but this didn't bring any improvement. The remaining question is: are 10000 busy-wait iterations a standard, or did we just optimize the code for a single benchmark?

Hidden Java gems

Update 31/10/2023

As Simon Hartley point out in an email conversation, the results get even better when using the java.util.concurrent.Exchanger class. The JavaDoc describe it as a utility to swap elements between two threads, and compare it to a bidirectional form of SynchronousQueue (which we investigated earlier).

This doesn't hint at an increased performance, however if you try to run the same test as before, you'll get much better results, in the range of 1050-1300ms per run. Of course, in our test we have unidirectional data flow, so the consumer sends back a dummy value.

It's quite surprising that a bidirectional variant is so much faster than the uni-directional one (just to remind, tests using SynchronousQueue took about 7000s), but if you start examining the sources, it's obvious that the implementation is completely different, and as it turns out, much more optimized.

At first sight, it seems that the implementation is doing something similar to what we've discovered so far: doing busy-looping for some time, with an occasional Thread.yield, and finally doing LockSupport.park:

private static final int SPINS = 1 << 10; // 1024

int h = 0;
if (spins > 0) {
    h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
    if (h == 0)
        h = SPINS | (int)t.threadId();
    else if (h < 0 && (--spins & ((SPINS >>> 1)—1)) == 0)
        Thread.yield();
}
else if (slot != p)
    spins = SPINS;
else if (!t.isInterrupted() && arena == null &&
         (!timed || (ns = end—System.nanoTime()) > 0L)) {
    p.parked = t;
    if (slot == p) {
        if (ns == 0L)
            LockSupport.park(this);
        else
            LockSupport.parkNanos(this, ns);
    }
    p.parked = null;
}

The h variable is used to pseudo-randomly insert Thread.yield calls. If this pseudo-random test doesn't succeed, the whole computation is repeated; there are no explicit Thread.onSpinWait calls, but the spin-waiting is happening.

Update 5/11/2023

However, it seems there is more to the algorithm than the above busy-looping/parking strategy. From my attempts at optimizing the Rendezvous code, it seems that the biggest gains come from eliminating allocations (something that Flavio Brasil also mentions in his analysis of this test). We can replace usages of the top-level AtomicReference with a volatile field, plus some VarHandles to perform the compare-and-set operations. To avoid allocating the second per-iteration AtomicReference, we'll need to throw in another two volatile fields, but that's still a net win (in terms of time). With these optimizations in place, and with a simple 10k-step busy-looping wait (without the "fancy" Exchanger looping logic presented above), we get the best results so far, of about 900ms. Here's the code!

By the way: we can't use Exchanger directly in an implementation of channels, as in case of an interrupt we need to store information that it happened, without further blocking. It's also still to be determined, which of the above optimizations are transferable to a full implementation of channels. It's quite possible that we've simply ultra-optimized the code, just for this single benchmark!

Slowing down Kotlin

Update 2/11/2023

Łukasz Biały investigated the Kotlin implementation in more detail, and he pointed out an important detail. In the test, we are launching two coroutines within a runBlocking scope. From its documentation:

The default CoroutineDispatcher for this builder is an internal implementation of event loop that processes continuations in this blocked thread until the completion of this coroutine.

In other words, any coroutines launched within runBlocking without a dispatcher specified (which is what we're doing), will run on a single-threaded event loop (in the spirit of node.js). This does explain at least some of the performance: there's no parallelism, everything runs on one core, so e.g. the CPU's caches can be efficiently used.

Kotlin recommends explicitly offloading e.g. IO-bound jobs using a different dispatcher (e.g. Dispatchers.IO). There's also a "default" one, which is based on an executor. Modifying the tests to use it:

runBlocking {
    launch(Dispatchers.Default) {
        // ...
    }
    launch(Dispatchers.Default) {
        // ...
    }
}

gives results of about 1350ms: which is, in turn, slightly slower than our fastest Java implementation, using the Exchanger.

Hence, summing up: Kotlin's single-threaded event loop is the fastest, but perhaps more realistically, when we introduce true parallelism, we get results comparable to Java's virtual threads.

As a side note, while I still haven't understood the internals of Java's Exchanger, it behaves in a peculiar way, when we confine it to a single thread. Setting -Djdk.virtualThreadScheduler.parallelism=1 or =2, will cause the Exchanger-based test to dramatically slow down (to about 40-50s). Only with parallelism 3 or higher, we get the fantastic results described above. And as we've seen in previous sections, restricting the parallelism of virtual threads speeds up the other rendezvous Java implementations.

Limits of Loom

performance

Based on the above tests, it seems we've hit the limits of Loom's performance (at least until continuations are exposed to the average library author!). Any implementation of direct-style, synchronous rendezvous channels can be only as fast as our rendezvous test—after all, the threads must meet to exchange values—that's the assumption behind this type of channels.

I've been proved wrong twice, with a second round of improvements now bringing Java's performance on par (parallel version), or 50% worse than Kotlin's (event-loop version), so I won't claim that's the limit—but it certainly seems we're close to reaching it! What remains true, is that whatever the implementation of Channels that we come up with, we'll be limited by the fact that in rendezvous channels threads must meet. So the tests above definitely serve as an upper bound for the performance of any implementation.

Why is Kotlin faster?

Given the improvements to the tests suggested by Łukasz, this is no longer always true. Kotlin remains faster when we run everything on a single thread (which in many scenarios, is very useful in practice!), but when we introduce parallelism, Java's virtual threads turn out to be the winner.

As for the single-threaded, event-loop case: to properly answer "why is Kotlin faster" would require someone with better knowledge of Kotlin's compiler internals. However, my hunch is that delimited continuations compiled to state machines are faster to switch between than virtual threads. The cost of switching between virtual threads has been made very low compared to platform threads by the Loom team; however, Loom still needs to stash away & later restore the call stacks, while in Kotlin, everything is on the heap—the call stack is always very shallow (just the latest coroutine invocation). Plus, the fact that it's a compile-time transformation allows introducing optimizations that wouldn't be possible at a purely library level.

What are the consequences for Ox, then?

We have clearly defined targets: the current prototype implementation can be made up to 15x faster (at least when taking my laptop as a reference; the performance gains might be larger when considering selects, and smaller when we scale this to the full buffered channels algorithm). We also might achieve the same levels of performance, as Kotlin does (in the parallel version).

Still, the performance of Kotlin's single-threaded event-loop is out of reach, and that's a tradeoff we must accept. We aim at implementing direct-style Go-like channels with an intuitive API and flexibility in defining custom processing stages, based on virtual threads. This inherently comes at the expense of some performance, as the JVM might always introduce parallelism. Here, "managed" solutions, such as reactive streams implementations, have the advantage of controlling the whole stream evaluation process using their own runtime instead of relying on Java's.

This also means that channels based on virtual threads are not the right solution when all you need to do is sum up a stream of numbers—or, more generally, perform some "pure" computations. But they might be just the right one when working with a stream of Kafka messages or sending an HTTP request for each message.

Other than improving the raw performance of Ox's channels, another direction to consider is a dedicated API to work on batches of elements. That's an approach that is often taken by functional, effectful stream implementations (such as fs2). Not all operations can be performed on a batch of elements, and the semantics of batch-level operations might be subtly different, but exposing such tools to the user might allow them to gain the performance they need.

Yet another possibility is defaulting to buffered channels. That's the approach taken by, for example, Akka/Pekko Streams. The larger the buffer, the smaller the difference between Kotlin's Channels and two threads communicating through an ArrayBlockingQueue. But an ArrayBlockingQueue doesn't need to be optimal: it will be interesting to see the performance of the Kotlin algorithm adapted to virtual threads, using buffered channels.

If you have any other ideas for what to investigate in Project Ox, please do let us know! We may be missing some promising approaches as to how virtual threads might be used to implement streaming.

Reviewed by Michał Matłoka—thanks!

Java

Blog Comments powered by Disqus.