Contents

Jox 0.1: virtual-thread friendly channels for Java

Adam Warski

23 Feb 2024.6 minutes read

Jox 0.1: virtual-thread friendly channels for Java webp image

The goal of the jox project is to facilitate communication between virtual threads:

  • competitive in terms of performance to the state-of-the-art (on the JVM, and generally)
  • with a low memory footprint so that the number of channels might scale together with the number of virtual threads used
  • offering a feature set inspired by Go channels

After a couple of development iterations, it's our pleasure to announce the 0.1.0 release of the library. This marks a milestone on the road to the 1.0.0 release, which will guarantee API binary compatibility.

The core functionalities are there—now it's time to get your feedback on what's missing, whether the API is convenient to use, and which areas require polishing! This also includes documentation: the JavaDocs are pretty rich, but some aspects might need better explanations. Please report if you find such a spot.

Jox's implementation is based on Kotlin's channels. If you find the below interesting—please do us a favor and star the project! Jox is designed and benchmarked to be used with virtual threads (available in Java 21+), although it will run on Java 17+.

The library has no transitive dependencies, to use simply add:

<dependency>
    <groupId>com.softwaremill.jox</groupId>
    <artifactId>core</artifactId>
    <version>0.1.1</version>
</dependency>

What does a Channel offer?

Let's do a quick overview of the functionalities of a Channel.

Firstly, channels can be buffered, unbuffered (rendezvous—sender & receiver threads must meet to exchange values), or unlimited. Here, we are creating a channel with a buffer of size 4, sending two values and receiving one:

// creating a buffered channel
var ch = new Channel<Integer>(4);

ch.send(1);
ch.send(2);
System.out.println(ch.receive());

The send() method blocks (of course, in a virtual-thread-aware way) if the buffer is full and the value cannot be sent. Similarly, receive() blocks if there's nothing to receive. Blocking calls can be interrupted using Java's interruption mechanism.

So far, that's precisely what we can get when using Java's BlockingQueue.

Selection

Having an alternate queue implementation could be interesting, but channels offer something more: they can be used in a select operation, which, given several channel clauses, guarantees that exactly one will be completed.

For example, the following selects one element from one of the channels:

// creating rendezvous (unbuffered) channels
var ch1 = new Channel<String>();
var ch2 = new Channel<String>();

// try to send values to both channels
Thread.ofVirtual().start(() -> {
  try {
    ch1.send("v1");
  } catch (InterruptedException e) {
    throw new RuntimeException(e);
  }
});
Thread.ofVirtual().start(() -> {
  try {
    ch2.send("v2");
  } catch (InterruptedException e) {
    throw new RuntimeException(e);
  }
});

// receive a value from one of the channels
System.out.println(
  select(ch1.receiveClause(), ch2.receiveClause())
);

Regardless of how threads are scheduled, exactly one element will always be received and the other left intact. Similarly, we might use send clauses or mix receive & send clauses. Each clause accepts a callback, which is called when the clause is selected:

// creating rendezvous (unbuffered) channels
var ch1 = new Channel<Integer>();
var ch2 = new Channel<Integer>();

// start threads which receive & discard the values
Thread.ofVirtual().start(() -> {
  try {
    ch1.receive();
  } catch (InterruptedException e) {
    throw new RuntimeException(e);
  }
});
Thread.ofVirtual().start(() -> {
  try {
    ch2.receive();
  } catch (InterruptedException e) {
    throw new RuntimeException(e);
  }
});

// select on of the send clauses
var sent = select(
  ch1.sendClause(13, () -> "first"), 
  ch2.sendClause(25, () -> "second")
);

// will print out "first" or "second", depending on which thread invokes
// receive() first
System.out.println("Sent: " + sent);

Default clauses are also supported, selected if none of the other clauses can be immediately satisfied.

Completion

Finally, channels can be closed either with an error or as "done" when there are no more values to send.

// creating a buffered channel
var ch = new Channel<String>(4);

ch.send("hello");
ch.done();

System.out.println("Received: " + ch.receiveSafe());
System.out.println("Received: " + ch.receiveSafe());

This will print out Received: hello and Received: ChannelDone[]. The receiveSafe() method does not throw an exception when the channel is closed but returns either a value from the channel or a reason why the channel is closed. On the other hand, receive() will throw an exception when the channel is done, or in an error state.

When a channel is done, any pending sends are allowed to complete, but no new ones are allowed; if there are only pending receives, they end immediately. When there's an error, both pending sends and receives throw an exception:

// creating a rendezvous channel
var ch = new Channel<String>();

var child = Thread.ofVirtual().start(() -> {
  try {
    // send() will block, until there's a matching receive()
    ch.send("Hello!");
  } catch (ChannelClosedException e) {
    System.out.println("Got exception: " + e);
  } catch (InterruptedException e) {
    throw new RuntimeException(e);
  }
});

ch.error(new RuntimeException("Something went wrong!"));
child.join();

// Will print "Got exception: …"

Both done() and error() are useful when signaling channel completion in pipelines that use multiple threads to process the data and multiple channels to communicate between them. By propagating channel closure, it's possible to wind down such a pipeline in case of success and failure.

Performance

EDIT 27/02/2024: After some more testing, it turned out that the thread synchronization optimisations in jox's Channel are beneficial only for rendezvous channels, and impact the performance of buffered channels negatively. Hence, since jox 0.1.1 they are enabled conditionally (depending on the channel's capacity). The tests below only show results for buffered channels (which are the default), and have been updated with the better jox results.

We benchmark jox Channels against Java's built-in ArrayBlockingQueue, Channels in Kotlin and channels in go. The benchmarks are run on an M1 Max (10 cores). We're using a buffer of size 16 for the channels and a queue of size 16.

First, in the parallel benchmark (Kotlin version, Go version), we start 10 000 thread/coroutine/goroutine pairs connected by channels, and send 100 000 values from the first thread/coroutine/goroutine in the pair, to the second:

parallel

Go is the clear winner, closely followed by Kotlin, with jox channels and Java's BlockingQueues lagging a bit behind. An average of 14ns/operation (here, operation is a single send-receive pair) means that we can do 70 million of such operations per second: that's by no means slow. Rather, we are talking about fast and even faster.

Second, in the chained benchmark (Kotlin version), we start 10 001 threads/coroutines, connected by a sequence of 10 000 channels. The first thread sends 100 000 values through the channel, which are received by the second and passed to the third, etc., all the way to the last one, which discards the values:

chained

Here jox lags behind Java, and a bit more behind Kotlin. Note that both jox and Kotlin implement the same algorithm, although they differ in the underlying asynchronous runtime used: virtual threads in the case of jox, coroutines in the case of Kotlin. Plus, of course, it's quite possible that jox might be fine-tuned to improve performance further!

These benchmarks are not ideal, as they include the time needed to start/stop the threads and are prone to outliers taking longer to set up/teardown. Hence, take the results with caution, and if you have ideas on how to improve the benchmarking code—please share.

jox use-cases

The first use-case of jox is the ox Scala structured concurrency & resiliency library. There, jox is used as the basis for the asynchronous & blocking streaming API. But of course, the more the merrier! If you'd have any interesting use cases to share, please do so on our community forum!

Going forward

With the first minor release out of the way, what kind of functionality would you expect from this kind of library? The API is currently quite minimal (send/receive/error/done/isClosed/select), which on the one hand, is a plus, and on the other, some often-needed functionalities might be missing.

Let us know! Open an issue on GitHub, vote on an existing issue, or leave feedback on our community forum.

Read: Designing a (yet another) retry API

Blog Comments powered by Disqus.