Go-like channels in Scala: receive, send, and default clauses
Ox, an experimental concurrency library using Project Loom and Scala 3, offers features such as developer-friendly structured concurrency, scoped values, and implementation of Go-like channels.
Channels have been introduced some time ago, along with higher-level combinators, allowing working with streams in both functional and imperative styles, depending on the use-case.
However, the select
operation on channels has been limited to receiving values only. We're now lifting this limitation and extending the feature set of ox's channels to include support for receive, send, and default clauses.
What does this mean exactly? How can you use this feature in practice? Read on to find out!
What's a select?
The select
method is what distinguishes channels from ordinary queues. Given a select
invocation with a number of clauses, exactly one of them will be satisfied—or in other words, exactly one clause will be selected.
Most often, this is useful for receiving a value from exactly one channel while keeping the others intact. For example:
val tick: Source[Tick] = Source.tick(1.second, Tick)
val strings: Source[String] = ...
select(tick, strings) match
case Tick => println("Got a tick")
case s: String => println("Got a string")
case ChannelClosed.Done => println("All channels are done")
case ChannelClosed.Error(_) => println("There's an error")
select
will block until a value from one of the channels can be received. If both channels have a value, only one channel will be modified. (That's in contrast to racing two dequeue operations on queues, which might end with a value being removed from each queue, and one of them discarded.)
Selecting sends and receives
What's described above has already been available in ox for some time. However, now it's also possible to select among both send and receive operations. This can be done by creating clauses, which are then passed to select
. It is guaranteed that exactly one such clause will be satisfied, returning a clause-specific result.
For example, we can block until a value is sent to channel c
, or received from channel d
:
val c = Channel[Int]()
val d = Channel[Int]()
select(c.sendClause(10), d.receiveClause)
As a side note, here both
c
andd
are unbuffered channels, so for this operation to ever finish, there must be a concurrently running process that will eventually receive fromc
or send tod
.
The above select
invocation has the following return type:
c.Sent | d.Received | ChannelClosed
Note that the Sent
and Received
types are inner types of the c
and d
values. For different channels, the Sent
/ Received
instances will be of distinct classes, allowing distinguishing which clause has been satisfied.
There's also the possibility that all channels are done, or at least one is in an error state—in which case, no clause is satisfied, but the closed reason is returned. If we don't want to handle these cases explicitly, they can be converted to an exception using the .orThrow
extension method on the result.
Hence, we have:
select(c.sendClause(10), d.receiveClause).orThrow: c.Sent | d.Received
The results of a select
can be inspected using a pattern match:
select(c.sendClause(10), d.receiveClause).orThrow match
case c.Sent() => println("Sent to c")
case d.Received(v) => println(s"Received from d: $v")
If there's a missing case, the compiler will warn you that the match
is not exhaustive and give you a hint as to what is missing. Similarly, there will be a warning in case of an unneeded, extra match case. For example:
[warn] -- [E029] Pattern Match Exhaustivity Warning: /test.scala:68:44
[warn] 68 | select(c.sendClause(10), d.receiveClause).orThrow match
[warn] | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[warn] | match may not be exhaustive.
[warn] |
[warn] | It would fail on pattern case: d.Received(_)
[warn] |
[warn] | longer explanation available when compiling with `-explain`
[warn] one warning found
Longer example: accumulating data
As a longer example, let's say we have an upstream producer, which regularly sends data onto a channel c
.
We also have a slower consumer, which needs to receive an aggregation of the data produced onto c
, when it is ready to receive it. The aggregation will be sent through the d
channel.
How can we implement the buffering component, which accumulates data? One strategy might be to use a select
as described above.
First of all, we must create a scoped
block, providing the syntactic boundary, which delimits the lifetime of any fibers that we start. Inside, we fork a computation, which will continually produce data onto the c
channel:
scoped {
val c = Channel[Int]()
// the fast producer
fork {
forever {
c.send(1)
Thread.sleep(100)
}
}
// more to come!
}
We also implement the slow consumer, which expects to receive the accumulated data on the d
channel:
// inside the scoped block:
val d = Channel[Int]()
val consumer = fork {
forever {
println("Received: " + d.receive().orThrow)
Thread.sleep(1000)
}
}
Finally, we can implement the accumulating component, also inside the scoped block:
fork {
@tailrec def loop(acc: Int): Unit =
select(d.sendClause(acc), c.receiveClause).orThrow match
case d.Sent() => loop(0)
case c.Received(n) => loop(acc + n)
loop(0)
}
We're using a tail-recursive function to accumulate the data received so far, but not yet sent to the consumer. Inside the loop
, we try to send the accumulated data over d
. As the channel is unbuffered, this will only succeed if there's a waiting d.receive
invocation waiting on another thread.
If the consumer is not ready, we try to receive a new data element from the producer. We inspect which clause has been satisfied—and either restart the loop
with an empty accumulator or add the received value to the current one.
At the end, we must block the scope on consumer.join()
, as otherwise, it would complete immediately. Here you can find a scala-cli script with the entire example. Make sure you have scala-cli installed, and run:
scala-cli run --jvm adoptium:1.20 --java-opt --enable-preview --java-opt --add-modules --java-opt jdk.incubator.concurrent accumulator.sc
Design considerations—comparing to Kotlin
While implementing the above feature, it turned out that Kotlin's experimental select
expressions have a similar design: you create clauses and pass them to the select
method. What is different is that the clauses include callbacks, which are invoked when the clause is satisfied. E.g.:
suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
select<Unit> { // <Unit> means that this select expression does not produce any result
fizz.onReceive { value -> // this is the first select clause
println("fizz -> '$value'")
}
buzz.onReceive { value -> // this is the second select clause
println("buzz -> '$value'")
}
}
}
This is in contrast to ox's design, where a clause is just a value, and to act depending on which clause has been satisfied, you must pattern-match to inspect which result it is.
The motivation behind this design choice is that programming in Scala is more functional in its nature and more often uses recursion to implement long-running, looping processes. This, in turn, requires using tail-recursion, so there are no stack overflows. And tail-recursion is at odds with callbacks.
Hence, ox uses a tail-recursion-friendly approach, even though a bit more code is required to first create the clauses and then enumerate the possible results in the match
. But, the compiler has your back and verifies that the match
is exhaustive, plus you can continue working with immutable data structures and recursive functions while benefiting from the concurrency model offered by Go-like channels.
Default clauses
Last but not least, it's worth mentioning that there's another type of clause that can be used—defaults. When a Default
clause is present (at most, one can be specified), the behavior of select
is modified.
If no clause can be immediately satisfied, the result of the select is the value specified in the default clause. That way, select
does not block. For example:
val c = Channel[Int]()
select(c.receiveClause, Default(5)).orThrow match
case c.Received(v) => println(s"Received from d: $v")
case DefaultResult(v) => println(s"No value available, using: $v")
Try it out!
Ox 0.0.7 is available now, offering:
- light-weight, programmer-friendly threading based on Project Loom's virtual threads
- structured concurrency, based on JEP 428
- scoped values based on JEP 429
- Go-like channels, with receive, send, and default clauses—as described above!
All of that is based on JVM and Scala: a functional programming language, using some of the features that are unique to Scala 3: union types, braceless syntax, extension methods, and context functions.
There's more work ahead in ox, especially around error handling and improving the functional aspect of the streaming API, but if you'd have an occasion to experiment with ox, please leave us your feedback!
We have a dedicated section on our community forum, but a comment on the blog is just as good.
For example, maybe you have a better naming strategy? I'm not 100% sold on the .receiveClause
and .sendClause
names, though I didn't come up with anything better that would differentiate them from .receive()
and .send(v)
, which receive/send a single value on the channel. What do you think about the way select
and its result are implemented—can this be somehow improved? And finally—which area of the current API feels clunky or missing features?
And now, back to designing the next ox feature!