Go-like channels using project Loom and Scala
Go's design is often seen as controversial because of its choice not to include many programming constructs available in "modern" languages. However, where Go innovates is in its approach to concurrency. The underlying concurrency model (Communicating Sequential Processes) isn't exactly new, but it has only seen widespread industry adoption thanks to Go.
Since goroutines and channels proved successful, let's explore if we can implement something similar (or better?) in the JVM ecosystem.
Ox refresher
The channels we develop will be an extension of the Ox library, which is a thin wrapper for Java's Virtual Threads and other features introduced in project Loom, using the Scala programming language. The aim of Ox is to prototype a programmer-friendly API for building concurrent applications. There are two basic constructs:
scoped
, which serves as the syntactic structured concurrency delimiter and determines the lifetime of threads created withinfork
, which can only be run within ascoped
block, and starts a concurrently running thread of execution; in many ways, this is equivalent to Go'sgo
The above is described in more detail in a separate article.
Basics of channels
A channel is a light-weight construct and can be created as follows:
import ox.channels.*
val c = Channel[String]()
As in Go, channels by default provide rendezvous communication; a sender and receiver must meet to exchange a value. However, channels can also be buffered when provided with an additional capacity
parameter.
We can send data to a channel:
val c = Channel[String]()
scoped {
fork {
c.send("Hello")
c.send("World")
c.done()
}
// TODO: receive
}
Here we're sending data to a channel inside a forked thread, as each .send
is blocking. In the main thread, we'll be receive()
-ing data:
trait Channel[T]:
def receive(): ClosedOr[T]
// ... other methods ...
// where
type ClosedOr[T] = Either[ChannelState.Closed, T]
As hinted above, a channel might be completed using channel.done()
, or an error might be signaled using channel.error(Exception)
. Hence the return type of receive()
is an Either[ChannelState.Closed, T]
, where T
is the type of elements in the channel. If we don't want to handle errors explicitly, we can always unwrap the return value and throw an exception in case the channel is closed using c.receive().orThrow
.
Here, we want to print out whatever arrives in the channel until the channel is done. We're using a foreverWhile
helper method, which invokes the given code block as long as it returns true
:
// implementation for the TODO: receive
val t = fork {
foreverWhile {
c.receive() match
case Left(e: ChannelState.Error) => println("Error", e); false
case Left(ChannelState.Done) => false
case Right(v) => println(s"Got: $v"); true
}
}
t.join()
Note that this must be run within the same scoped
block as before so that the forks run concurrently, and we need to invoke .join()
to block until the receiver is done; otherwise, the scope would interrupt any threads that have been left running.
Partner with Scala Experts to build complex applications efficiently and with improved code accuracy. Working code delivered quickly and confidently. Explore the offer >>
Beyond queues
What we've seen so far is just a bit more than a queue—one which can be completed with a "done" or an error. However, channels in Go—and channels in Ox as well—offer more than that.
Specifically, we can select
from a number of channels. The signature of the two-channel variant is (Source
is one of the traits implemented by Channel
):
def select[T1, T2](
ch1: Source[T1], ch2: Source[T2]): ClosedOr[T1 | T2]
Here we're using Scala 3's union types to indicate that we can get a value from the first channel or a value from the second channel. What's important is that we'll receive exactly one value—and exactly one value will be taken off from the given channels.
As a short example, given a strings: Channel[String]
, we'll summarise the number of characters received within each second.
On the producer side, we'll send a string of a random length with random sleep in-between. On the consumer side, we'll create a second source, which emits a value every second:
case object Tick
val tick = Source.tick(1.second, Tick)
Then, we'll do a select(strings, tick)
to receive either the next String
, or the next Tick
. Here's the full code:
@main def charactersEachSecond(): Unit =
@tailrec
def producer(s: Sink[String]): Nothing =
s.send(Random.nextString(Random.nextInt(100)))
Thread.sleep(Random.nextInt(200))
producer(s)
case object Tick
def consumer(strings: Source[String]): Nothing =
scoped {
val tick = Source.tick(1.second, Tick)
@tailrec
def doConsume(acc: Int): Nothing =
select(strings, tick).orThrow match
case Tick =>
log.info(s"Characters received this second: $acc")
doConsume(0)
case s: String => doConsume(acc + s.length)
doConsume(0)
}
val c = Channel[String]()
scoped {
fork(producer(c))
fork(consumer(c))
log.info("Press any key to exit ...")
System.in.read()
}
Beyond queues, part 2
If you've ever worked with streaming systems in Java or Scala, such as Akka Streams, or fs2, you've probably grown used to the idea of defining data processing pipelines using high-level combinators, such as .map
, .filter
, .mapAsync
and the like. That's often much more readable and convenient than manually receiving from a stream, transforming the value, and sending it further.
We can implement something similar with our Channel
s! For example, there's a .map
method on Source
:
scoped {
val c = Channel[String]()
val c2: Source[Int] = c.map(s => s.length())
}
The .map
method needs to be run within a scope (otherwise, you'll get a compilation error), as at the moment it is invoked, it starts a new thread, which receives from the source (here: c
), applies the transformation (s => s.length()
), and sends the value to a new channel, which is then returned (c2
). The new channel is direct by default, but a buffered one can be created by supplying an additional capacity parameter to .map
.
Hence each transformation step will create a concurrently running thread. If you'd like to run a couple of transformations at once within a single thread, that's also possible:
trait Source[T]:
def transform[U](f: Iterator[T] => Iterator[U])(using Ox): Source[U]
This method allows arbitrarily transforming the elements incoming from the source, available as an Iterator[T]
. For example, below, we've got a producer of an infinite stream of increasing numbers and a consumer which takes a handful of them:
@main def someNumber(): Unit =
scoped {
val c = Channel[Int]()
fork {
@tailrec def produce(n: Int): Nothing =
c.send(n)
produce(n + 1)
produce(0)
}
c
.transform(_.filter(_ % 2 == 0).map(_ + 1).take(10))
.foreach(n => println(n.toString))
}
The .foreach
method on a stream runs the given code block for each received element as long as the channel is not done.
Streaming model
The Ox channels offer a hybrid of procedural-style stream processing (using .receive()
and .send()
directly), and functional stream transformations, using .map
and other higher-level methods.
Keep in mind that this is only a prototype; not much above
.map
and.transform
is available, but I think they demonstrate the principles well.
Compared to other streaming solutions, Ox offers a "push" processing model, where elements are eagerly produced and pushed onto the channels. This is in contrast to a "pull" model (such as the one implemented by fs2/zio-streams), where elements are computed lazily on-demand when there's a consumer.
Ox is closest to an instantly-materialized Akka Streams. We don't have the intermediate step of creating a complete description of a processing pipeline, and materialising it later into a running stream. This has downsides: we must pay attention when we transform the stream; each .map
instantly creates a new processing thread, so care must be taken when calling it (there's no referential transparency!). On the other hand, it is easier to create custom processing stages (by directly interacting with the Channel
), or create complex processing topologies. But again, this might mean getting into a deadlock easier. Tradeoffs!
Eager or lazy map
As described in the previous sections, .map
(and other transformations) is eager, meaning that it instantly starts a new lightweight thread running the transformation when called.
An alternate design would be to return a Source
, which runs the mapping operation lazily—when .receive()
is called. This wouldn't require starting a separate processing thread.
However, I think there are two arguments as to why the eager version is the proper choice here.
First, as channels offer streaming in the push model, elements are produced eagerly. Wouldn't we expect this from every source? When we start a thread that produces to a non-buffered channel, it will compute the first element and get suspended on .send()
, waiting for a matching .receive()
invocation. There's no laziness here (there is backpressure, though); consequently, a mapped channel should eagerly produce the elements.
Secondly, the mapping function might contain side-effecting, blocking operations. In that case, we do want the mapping to be performed asynchronously. This is especially valid when used with buffered channels.
Maybe we simply need two variants of .map
? One for transforming e.g., Stream[Int]
into a Stream[Either[String, Int]]
, which should be cheap; the second for running potentially blocking, longer-running computations. One obstacle here is proper naming, and another—adjusting select
to work with such lazily mapped channels. If you have any thoughts on the above, let us know!
What's next?
After scopes and forks, channels open a new frontline for discussions around designing a direct-style concurrency library for Scala and the JVM. Martin Odersky recently unveiled another proposal, async (see the slides from the Scalar conference for some background, video coming soon!). The proposal is more of a draft and focuses on evolving the concept of Future
s.
An implementation of go-like channels is also available for Scala 2, integrating with effect systems—see the scala-gopher project.
As far as Ox is concerned, there are still a couple of areas to research, most notably:
- error propagation (mentioned in the previous blog,
fork
vsforkHold
) - error handling (eithers, or exceptions? Maybe Scala's
CanThrow
?) - performance (unknown at the moment)
And above all: your feedback. We've set up a dedicated community forum to discuss the approach taken by Ox. Do you think channels, as presented above, make sense? Should they be push- or pull-based? Are eager streams fine, or is a staged design, where we first create a description of a stream and run it only when it's complete, fundamentally better? Can you see some design flaws? Let us know!
An early release of Ox is available, so you can try it out or browse its source code.