Contents

Prototype Loom-based concurrency API for Scala

Adam Warski

03 Feb 2023.11 minutes read

Prototype Loom-based concurrency API for Scala webp image

Java 19 brought a preview of virtual threads and an API for structural concurrency. Java 20 extends this with ScopedValues, a much-improved version of ThreadLocals.

When writing concurrent code in Java, the first option is to work directly with threads (virtual or not). However, this is usually not the best idea. While possible quite easily from Java and Scala, it's too easy to run into deadlock, race conditions, or heisenbugs.

Another approach is the new StructuredTaskScope (described in more detail below). Quite obviously, this API is tailored for Java. Moreover, at times it's pretty low-level and easy to misuse: you need to call the proper methods in the correct order, or you'll get a StructureViolationException exception.

Given Scala's more advanced type system and other features, we should be able to provide a better developer experience. Below is a prototype of what a Loom-based concurrency API for Scala might look like.

Maintaining structure

Java's approach to concurrency, introduced in recent releases, is structured. It might be novel to Java, but it's also present (among others) in some Python libraries and the Kotlin programming language.

But what is structured concurrency? The best introduction is probably the "Go considered harmful" article. It's a long but interesting read. If you need a quick refresher: concurrency is structured when the syntactic code structure bounds the lifetime of threads. When new threads are spawned within a syntactic block, we can be sure that these threads are terminated when executing the block ends.

I like to think about structured concurrency as an aspect of purity: a function satisfies the structural concurrency property, iff it is pure with respect to threading side-effects.

As mentioned, the main API for structured concurrency in Java revolves around the StructuredTaskScope class. It is designed to be used within a try-with-resources block, which delimits the lifetime of any threads spawned within the block. So, for example, here's how to run two computations in parallel. If one computation fails, the other is interrupted, and the whole block throws that exception:

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
  Future<String> user = scope.fork(() -> findUser());
  Future<Integer> order = scope.fork(() -> fetchOrder());

  scope.join();
  scope.throwIfFailed();

  return new Response(user.resultNow(), order.resultNow());
}

The Scala equivalent, the design of which we're exploring here, has the following form:

scoped {
  val user: Fork[String] = fork(findUser())
  val order: Fork[Integer] = fork(fetchOrder())

  Response(user.join(), order.join())
}

Let's examine this in more detail. The scoped and fork functions come from the Ox object (the name comes from cOncurrency eXtensions). As the name suggests, scoped delimits a scope within which new threads can be started. Once the scoped block completes, it is guaranteed that all child threads have finished.

This is the same guarantee that StructuredTaskScope provides; indeed, this class is used under the hood. Hence, the scoped construct forms the basis of implementing the structured concurrency approach in Ox.

The fork method, quite unsurprisingly, starts a new thread running the given code (here, findUser() or fetchOrder()). The result is a Fork instance. It has two methods: a blocking .join() to get the result, and a blocking .cancel(), which interrupts the underlying thread, and waits until the thread terminates.

In essence, Fork is similar to a (virtual) Thread, a Future or a Fiber (known from functional effect systems), as it represents a running computation. However, the name is different to avoid clashes.

Once the code inside the scoped block completes, any threads that are still running (which were not joined or have happened to complete), will be interrupted, and the whole block will only complete once they complete.

Racing and paralleling

In the Java API, various patterns of running code concurrently can be implemented by providing an appropriate implementation of StructuredTaskScope. For example, using StructuredTaskScope.ShutdownOnFailure we can run some computations in parallel, shutting down the scope when a first error occurs. Similarly, StructuredTaskScope.ShutdownOnSuccess can be used to shutdown on first success, allowing to implement a race between two computations.

Ox's API diverges from the Java approach and always uses a fixed StructuredTaskScope implementation (which is neither of the above—one which never shuts down the scope on its own). But it also allows some additional flexibility.

The fork method, by default, propagates any errors to the scope's main thread—hence, in the example above, if any of the forks fail, the whole scope will short-circuit and fail as well. This is similar to StructuredTaskScope.ShutdownOnFailure.

However, there's also another variant: forkHold, which does not propagate any exceptions. Instead, it allows running forks which might fail, and where failures are dealt with within the scope. The failure is thrown as an exception when the fork's join method is called.

We can mix & match both of these types of forks within a single scope. fork works best for background tasks that are not joined, and hence the only way to discover that they failed is by interrupting the main thread. forkHold is designed for situations where we join the computations, inspecting their (successful or failed) results.

What about racing? Ox provides a race method as a primitive. It is implemented using a forkHold + a queue, on which the main thread blocks, awaiting the first forked result to complete. Furthermore, this can be quite trivially used to implement a timeout method, for example:

// In Ox.scala
def timeout[T](duration: FiniteDuration)(t: => T): T =
  raceSuccess(Right(t))({ 
      Thread.sleep(duration.toMillis)
      Left(()) 
  }) match
    case Left(_) => 
      throw new TimeoutException(s"Timed out after $duration")
    case Right(v) => v

// In OxTest.scala
"timeout" should "short-circuit a long computation" in {
  val trail = Trail()
  scoped {
    try
      timeout(1.second) {
        Thread.sleep(2000)
        trail.add("no timeout")
      }
    catch case _: TimeoutException => trail.add("timeout")

    trail.add("done")
    Thread.sleep(2000)
  }

  trail.trail shouldBe Vector("timeout", "done")
}

Partner with Scala Experts to build complex applications efficiently and with improved code accuracy. Working code delivered quickly and confidently. Explore the offer >>

Escaping the structure

Structured concurrency is useful, but it also might prove challenging to work with at times. However, there is an escape hatch that allows us some more flexibility.

First, what about nesting forks?

scoped {
  val f1 = fork {
    val f2 = fork {
      Thread.sleep(1000)
      println("f2 complete")
    }

    Thread.sleep(500)
    println("f1 complete")
    f2
  }

  f1.join().join()
}

Such nesting is, of course, allowed, and the above code (as you might expect or not) prints f1 complete followed by f2 complete. Even though f2 is nested within f1, it won't be interrupted when f1 finishes. Only scoped creates a boundary within which the structured concurrency principle must hold.

Scopes might be nested as well, with the expected semantics.

Secondly, it is also possible to extract code, which includes forks to separate methods or classes. To do that, we need a way to express that we are within scope. This is done using a given parameter of type Ox.

Behind the scenes, the fork and forkHold methods require a given Ox to be in scope. Such an instance is provided by the scoped method, using context functions.

For example:

def run()(using Ox): Int = {
  val f1 = fork { Thread.sleep(1000); 5 }
  val f2 = fork { Thread.sleep(2000); 7 }
  f1.join() + f2.join()
}

scoped {
  println(run())
  println(run())
}

In other words, the presence of a using Ox in a method's type signature represents a requirement for a capability to fork computations. As before, the structural properties are enforced by scoped.

Hence, a "normal" threading model, where threads can be spawned at any time, without lifetime constraints can be implemented by wrapping the application's main method in scoped, passing a given Ox to each method/class, and using forkHold at will.

Scoped values

Java 20 introduces scoped values, which aim to replace ThreadLocals, as in many ways they are broken, especially when used in conjunction with VirtualThreads.

The major improvement is that the value of a ScopedValue is inherited by child threads (forked within a StructuredTaskScope), without sacrificing performance or memory usage.

Moreover, the value of a ScopedValue can only be changed structurally within the code passed to the ScopedValue.where method. However, this Java API also has to be used with care, as we can't change scoped values after starting a scope (otherwise, an exception will be thrown on the next fork).

To improve safety in this area, Ox provides its wrapper, ForkLocal, which differs from ScopedValue in two ways: first, it has a default value (instead of null). Second, setting a new value always starts a new scope:

val trail = Trail()
val v = Ox.ForkLocal("a")
scoped {
  forkHold {
    v.scopedWhere("x") {
      trail.add(s"nested1 = ${v.get()}")

      scoped {
        forkHold {
          trail.add(s"nested2 = ${v.get()}")
        }.join()
      }
    }
  }.join()

  trail.add(s"outer = ${v.get()}")
}

trail.trail shouldBe Vector("nested1 = x", "nested2 = x", "outer = a")

The code

The source code of Ox is available on GitHub. The Ox class and the methods in the Ox companion object implement the thin layer on top of Java's structured concurrency tools to provide the functionality described above.

There's also a small test suite, covering (in a rather non-comprehensive way) the basic use cases.

Feel free to explore and comment!

Interruptions

I think the main problem with the approach described above, which I've encountered so far, are interruptions. These are modeled as exceptions in Java, with all the consequences: they can be caught and ignored.

The deficiencies of Java's interruption model would probably be an article on its own, so we won't explore this subject in more depth here. But as an example: imagine that you have a process running "forever", doing some side-effects, and if any exceptions happen, they are logged, and the processing continues:

scoped {
  fork {
    forever {
      try processSingleItem()
      catch case e: Exception => logger.error("Processing error", e)
    }
  }

  // do something else that keeps the scope busy
}

The fork above will never terminate, even if executing the scope's code ends: the block can only finish its execution when all child threads have terminated, and interrupting the fork will be caught & logged. The way to fix the above is to catch InterruptedException separately and re-throw it or use NonFatal:

scoped {
  fork {
    forever {
      try processSingleItem()
      catch case NonFatal(e) => logger.error("Processing error", e)
    }
  }

  // do something else that keeps the scope busy
}

It's a subtle difference, which might cause hard-to-find bugs. But it seems we are stuck with the current Java interruption model for the foreseeable future.

Comparing with functional effects

I think the question that everybody who has used the more functional flavors of Scala has is how this compares to functional effect systems, such as cats-effect or ZIO?

This, again, could be its own series of articles. I think we're just scratching the surface of how Loom might be leveraged from Scala, so it might be too early to draw conclusions.

However, to get a feel of how programming using Ox might look like, I ported some of the examples from an article comparing Akka, Monix (cats-effect predecessor), and Scalaz IO (ZIO predecessor) a while back. Hence, in the sources, you can find:

Everything is structured

Apart from having to remain vigilant on interruptions, one important conclusion from the above experiments is that structured concurrency strongly influences the code's overall structure.

For example, the RateLimiter starts a background process, which executes the rate-limited actions. However, because all concurrency is structured, this dictates a specific way of using that functionality. Therefore, we have to make sure that the RateLimiter is used structurally so that its lifetime also corresponds to the structure of the source code:

RateLimiter.withRateLimiter(2, 1.second) { rateLimiter =>
  rateLimiter.runLimited {
    println(s"${LocalTime.now()} Running …")
  }

  // other logic
}

In a way, we are back to callbacks: the withRateLimiter method takes as a parameter a function, which should be called back with a running rate limiter. Once this function completes, the scope is closed, closing the rate limiter. Scala provides a nice syntax for this, but we might still end up with a long callback chain when allocating all of the "resources" in our application.

But how do they compare?

Make no mistake: I think functional effect systems, such as ZIO and cats-effect, are currently the best way to develop concurrent business applications. Some of their advantages compared to Ox/Loom:

  • an asynchronous runtime which works on JVM 8+, as well as on JS and Native platforms
  • higher control over thread usage, which impacts fairness
  • safe cancellation model, with out-of-bounds, uncatchable interruptions
  • a rich set of concurrency operations for declarative concurrency
  • a safe, advanced implementation of fiber-locals
  • advanced error handling
  • fearless refactoring through referential transparency
  • possibility to control time in tests, which prevents flakiness and allows precision
  • composable resources

But they have some disadvantages as well, which are not present in Ox:

  • monadic, instead of direct syntax
  • virality of the IO/ZIO data types
  • custom control structures, instead of for loops or try-catch

A major problem in the Loom/Ox approach is that it mostly glosses over error handling. A candidate to implement this might be the experimental CanThrow capabilities—we are using a pre-release JDK already, so why not include an upcoming Scala 3 feature? That's an area requiring more research, though.

Summing up

Remember that Ox is only a prototype, and more of an exploration project, than a complete solution. If you'd have some ideas on how to use Loom + Scala differently or some thoughts on the advantages and disadvantages of this approach, please comment below!

Is structured concurrency the way to go? Should every resource be used structurally? What about the forking model? What cannot be expressed using Ox that is easily done using a functional effect system? That's just a handful of questions which might be worth exploring further.

Scala

Blog Comments powered by Disqus.