Contents

Go-like channels in Scala: receive, send, and default clauses

Adam Warski

21 Jul 2023.7 minutes read

Go-like channels in Scala: receive, send, and default clauses webp image

Ox, an experimental concurrency library using Project Loom and Scala 3, offers features such as developer-friendly structured concurrency, scoped values, and implementation of Go-like channels.

Channels have been introduced some time ago, along with higher-level combinators, allowing working with streams in both functional and imperative styles, depending on the use-case.

However, the select operation on channels has been limited to receiving values only. We're now lifting this limitation and extending the feature set of ox's channels to include support for receive, send, and default clauses.

What does this mean exactly? How can you use this feature in practice? Read on to find out!

What's a select?

The select method is what distinguishes channels from ordinary queues. Given a select invocation with a number of clauses, exactly one of them will be satisfied—or in other words, exactly one clause will be selected.

Most often, this is useful for receiving a value from exactly one channel while keeping the others intact. For example:

val tick: Source[Tick] = Source.tick(1.second, Tick)
val strings: Source[String] = ...

select(tick, strings) match
  case Tick =>                   println("Got a tick")
  case s: String =>              println("Got a string")
  case ChannelClosed.Done =>     println("All channels are done")
  case ChannelClosed.Error(_) => println("There's an error")

select will block until a value from one of the channels can be received. If both channels have a value, only one channel will be modified. (That's in contrast to racing two dequeue operations on queues, which might end with a value being removed from each queue, and one of them discarded.)

Selecting sends and receives

What's described above has already been available in ox for some time. However, now it's also possible to select among both send and receive operations. This can be done by creating clauses, which are then passed to select. It is guaranteed that exactly one such clause will be satisfied, returning a clause-specific result.

For example, we can block until a value is sent to channel c, or received from channel d:

val c = Channel[Int]()
val d = Channel[Int]()

select(c.sendClause(10), d.receiveClause)

As a side note, here both c and d are unbuffered channels, so for this operation to ever finish, there must be a concurrently running process that will eventually receive from c or send to d.

The above select invocation has the following return type:

c.Sent | d.Received | ChannelClosed

Note that the Sent and Received types are inner types of the c and d values. For different channels, the Sent / Received instances will be of distinct classes, allowing distinguishing which clause has been satisfied.

There's also the possibility that all channels are done, or at least one is in an error state—in which case, no clause is satisfied, but the closed reason is returned. If we don't want to handle these cases explicitly, they can be converted to an exception using the .orThrow extension method on the result.

Hence, we have:

select(c.sendClause(10), d.receiveClause).orThrow: c.Sent | d.Received

The results of a select can be inspected using a pattern match:

select(c.sendClause(10), d.receiveClause).orThrow match
  case c.Sent()   => println("Sent to c")
  case d.Received(v) => println(s"Received from d: $v")

If there's a missing case, the compiler will warn you that the match is not exhaustive and give you a hint as to what is missing. Similarly, there will be a warning in case of an unneeded, extra match case. For example:

[warn] -- [E029] Pattern Match Exhaustivity Warning: /test.scala:68:44
[warn] 68 | select(c.sendClause(10), d.receiveClause).orThrow match
[warn]  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[warn]  | match may not be exhaustive.
[warn]  |
[warn]  | It would fail on pattern case: d.Received(_)
[warn]  |
[warn]  | longer explanation available when compiling with `-explain`
[warn] one warning found

Longer example: accumulating data

As a longer example, let's say we have an upstream producer, which regularly sends data onto a channel c.

We also have a slower consumer, which needs to receive an aggregation of the data produced onto c, when it is ready to receive it. The aggregation will be sent through the d channel.

How can we implement the buffering component, which accumulates data? One strategy might be to use a select as described above.

First of all, we must create a scoped block, providing the syntactic boundary, which delimits the lifetime of any fibers that we start. Inside, we fork a computation, which will continually produce data onto the c channel:

scoped {
  val c = Channel[Int]()

  // the fast producer
  fork {
    forever {
      c.send(1)
      Thread.sleep(100)
    }
  }

  // more to come!
}

We also implement the slow consumer, which expects to receive the accumulated data on the d channel:

  // inside the scoped block:
  val d = Channel[Int]()
  val consumer = fork {
    forever {
      println("Received: " + d.receive().orThrow)
      Thread.sleep(1000)
    }
  }

Finally, we can implement the accumulating component, also inside the scoped block:

  fork {
    @tailrec def loop(acc: Int): Unit =
      select(d.sendClause(acc), c.receiveClause).orThrow match
        case d.Sent()   => loop(0)
        case c.Received(n) => loop(acc + n)

    loop(0)
  }

We're using a tail-recursive function to accumulate the data received so far, but not yet sent to the consumer. Inside the loop, we try to send the accumulated data over d. As the channel is unbuffered, this will only succeed if there's a waiting d.receive invocation waiting on another thread.

If the consumer is not ready, we try to receive a new data element from the producer. We inspect which clause has been satisfied—and either restart the loop with an empty accumulator or add the received value to the current one.

At the end, we must block the scope on consumer.join(), as otherwise, it would complete immediately. Here you can find a scala-cli script with the entire example. Make sure you have scala-cli installed, and run:

 scala-cli run --jvm adoptium:1.20 --java-opt --enable-preview --java-opt --add-modules --java-opt jdk.incubator.concurrent accumulator.sc

Design considerations—comparing to Kotlin

While implementing the above feature, it turned out that Kotlin's experimental select expressions have a similar design: you create clauses and pass them to the select method. What is different is that the clauses include callbacks, which are invoked when the clause is satisfied. E.g.:

suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
    select<Unit> { // <Unit> means that this select expression does not produce any result
        fizz.onReceive { value -> // this is the first select clause
            println("fizz -> '$value'")
        }
        buzz.onReceive { value -> // this is the second select clause
            println("buzz -> '$value'")
        }
    }
}

This is in contrast to ox's design, where a clause is just a value, and to act depending on which clause has been satisfied, you must pattern-match to inspect which result it is.

The motivation behind this design choice is that programming in Scala is more functional in its nature and more often uses recursion to implement long-running, looping processes. This, in turn, requires using tail-recursion, so there are no stack overflows. And tail-recursion is at odds with callbacks.

Hence, ox uses a tail-recursion-friendly approach, even though a bit more code is required to first create the clauses and then enumerate the possible results in the match. But, the compiler has your back and verifies that the match is exhaustive, plus you can continue working with immutable data structures and recursive functions while benefiting from the concurrency model offered by Go-like channels.

Default clauses

Last but not least, it's worth mentioning that there's another type of clause that can be used—defaults. When a Default clause is present (at most, one can be specified), the behavior of select is modified.

If no clause can be immediately satisfied, the result of the select is the value specified in the default clause. That way, select does not block. For example:

val c = Channel[Int]()

select(c.receiveClause, Default(5)).orThrow match
  case c.Received(v)    => println(s"Received from d: $v")
  case DefaultResult(v) => println(s"No value available, using: $v")

Try it out!

Ox 0.0.7 is available now, offering:

  • light-weight, programmer-friendly threading based on Project Loom's virtual threads
  • structured concurrency, based on JEP 428
  • scoped values based on JEP 429
  • Go-like channels, with receive, send, and default clauses—as described above!

All of that is based on JVM and Scala: a functional programming language, using some of the features that are unique to Scala 3: union types, braceless syntax, extension methods, and context functions.

There's more work ahead in ox, especially around error handling and improving the functional aspect of the streaming API, but if you'd have an occasion to experiment with ox, please leave us your feedback!

We have a dedicated section on our community forum, but a comment on the blog is just as good.

For example, maybe you have a better naming strategy? I'm not 100% sold on the .receiveClause and .sendClause names, though I didn't come up with anything better that would differentiate them from .receive() and .send(v), which receive/send a single value on the channel. What do you think about the way select and its result are implemented—can this be somehow improved? And finally—which area of the current API feels clunky or missing features?

And now, back to designing the next ox feature!

Blog Comments powered by Disqus.