Contents

Concurrency with Cats Effect

Krzysztof Atłasik

17 Apr 2023.8 minutes read

Concurrency with Cats Effect webp image

Concurrency is hard. Brian Goetz in “Java Concurrency in Practice” writes:

Writing correct programs is hard; writing correct concurrent programs is harder. There are simply more things that can go wrong in a concurrent program than in a sequential one.

To our advantage, Scala has many amazing tools helping to tame concurrent programming, like Akka, ZIO or Finagle, or Cats Effect. With this article, I will focus on features offered by the latter.

What is Fiber?

A basic building block of CE concurrency is fiber. In a nutshell, fiber is a lightweight logical thread that represents a sequence of actions. In the case of CE, it’s a list of operations suspended with IO monad and sequenced with flatMap.

Fibers have a low footprint compared to system threads. Most importantly, they are quite cheap in terms of memory usage. Hence, they are not as scarce resources as threads. We don’t need to create fiber pools, we can just spawn a new one whenever we need it.

Unlike system threads, fibers do not rely on the operating system's scheduler to switch between contexts. Instead, they use a framework-level scheduler that controls when and how they are executed. The single fiber is not locked to a particular thread. Cats Effect runtime can multiplex tens of thousands of fibers over a few system threads. On JVM the runtime usually uses a fixed-size CPU-bound thread pool, but for single-threaded environments like Scala.js, it could even run on a single thread.
image1

Managing fibers

In CE, we can fork computations with method start. It returns an instance of type Fiber, which exposes the method join. Invoking it forces the call site fiber to wait for joined fiber’s completion.

import cats.effect.*
import scala.concurrent.duration.*

val task = for {
  _ <- IO.println("Task started")
  _ <- IO.sleep(1.second)
  _ <- IO.println("Task completed")
} yield ()

for {
  fiber <- task.start
  _ <- IO.println("Hello from main fiber")
  //we wait approx. 1s till forked fiber completes
  _ <- fiber.join
} yield ()

Another method available on Fiber is cancel. It allows for the fiber's cancellation along with computations running on it. Method cancel won’t return control until all resources allocated in the fiber are properly finalized, and all underlying operations are canceled.

With onCancel(fin: IO[Unit]) we might configure a finalizer that will be run when a task is canceled.

import cats.effect.*
import scala.concurrent.duration.*

val task = for {
  _ <- IO.sleep(5.seconds)
  _ <- IO.println("Computations complete")
} yield ()

for {
  fiber <- task.onCancel(IO.println("Task canceled!")).start
  _ <- IO.sleep(1.second)
  //fiber would complete after 5s, but is canceled after 1s
  _ <- fiber.cancel
} yield ()

The join method returns Outcome, which has subtypes: Succeeded, Errored, and Canceled. We can do pattern-matching on the outcome to inspect the completed fiber status.

The value returned from the fiber will be available as the field of Succeeded outcome. This way, we can pass values from the forked task to the call site.

import cats.effect.*
import scala.concurrent.duration.*

val task = IO.sleep(500.millis) *> IO.pure("Hello world!")

for {
 fiber <- task.start
 _ <- fiber.join.flatMap {
   case Outcome.Succeeded(value) => value.flatMap(
      v => IO.println(s"Computed value: $v")
   )
   case Outcome.Errored(e) => IO.println(s"Error: $e")
   case Outcome.Canceled() => IO.println("Task was canceled!")
 }
} yield ()

Method join comes with another variant: joinWith(onCancel: F[A]). It allows specifying a fallback computation in case the fiber is canceled. It has two specialized variations: joinWithNever, which is a shortcut for joinWith(IO.never), and joinWithUnit (a shortcut for joinWith(IO.unit)).

Timeouts

We can use a timeout method to specify the time limit after which the fiber will be canceled.

task
  .onCancel(IO.println("Task has timed out!"))
  .timeout(500.millis)
  .start

Method timeoutTo allows for specifying the fallback for canceled computation.

task
  .onCancel(IO.println("Task has timed out!"))
  .timeoutTo(500.millis, planB)
  .start

Similarly to cancel timeout methods will always wait until all underlying resources allocated in fiber are properly terminated. We can skip waiting with timeoutAndForget. After being called, it immediately yields control back to the call site and runs finalizers asynchronously. Be advised if finalizers never finish, the fiber will leak.

Structured concurrency

If fiber is not joined or canceled, it will run parallel to the call site fiber. It will end only if its computations are complete or raise an error. If the computation running on fiber never ends, the fiber won’t ever complete.

val task = for {
 time <- IO.realTimeInstant
 _ <- IO.println(f"Current date and time is $time")
 _ <- IO.sleep(1.second)
} yield ()

for {
 _ <- task.foreverM.start //task is running as an infinite loop
                          //and fiber is never canceled
 _ <- IO.sleep(5.seconds)
 _ <- IO.println("Bye")
} yield ()

This can lead to resource leaks. Fibers are pretty lightweight, but if we spawn enough of them can still deplete available memory.

Methods like start and join are CE concurrency primitives. They are handy tools but should be used with caution. If we’re careless, we can easily leak resources or cause deadlocks.

To prevent leaks we have to either explicitly cancel redundant fibers or use safer methods like background. The background method bounds fiber lifecycle with Resource datatype. Whenever the resource is finalized, it will also cancel the underlying fiber.

val backgroundTask: Resource[IO, Unit] = task
  .onCancel(IO.println("Closing the background task"))
  .background
  .void

backgroundTask.surround {
  for {
    _ <- IO.sleep(5.seconds)
    _ <- IO.println("Bye") //after foreground task is complete 
                           //the background fiber will be canceled
  } yield ()
}

The status of the fiber conforms to the syntactic structure of the code. This way of handling the lifecycle of the fibers is called structured concurrency.

Another structured concurrency tool offered by CE is Supervisor. The supervisor is a resource that can spawn fibers whose lifecycle will be bound with its lifecycle. Whenever the supervisor is finalized it also makes sure to terminate all its children fibers.

Supervisor comes with two modes. We can choose the mode by passing await flag when the resource is created. If await is set to false the supervisor will cancel all its alive threads when it terminates. This is the default setting. If await is set to true, the termination of the supervisor will block until all fibers are completed.

Supervisor[IO](await = false).use { supervisor =>
  for {
    _ <- supervisor.supervise(
      IO.println("A").andWait(1.second).foreverM
    )
    _ <- supervisor.supervise(
      IO.println("B").andWait(2.second).foreverM
    )
    _ <- IO.sleep(5.seconds) //fibers will print A and B 
                             //for 5 seconds and then terminate
  } yield ()
}

By adding the import clause import cats.effect.implicits.* we can also bring extension method supervise for IO into the scope. This will allow us to call supervise similarly to as we invoke start.

import cats.effect.implicits.*

Supervisor[IO](await = true).use { supervisor =>
   IO.println("Hello")
     .andWait(1.second)
     .foreverM
     .timeout(5.seconds)
     .supervise(supervisor)
}

Concurrency operators

CE provides many convenient higher-level abstractions for common concurrency-related operations.

For instance, with IO.race we can take two IO, run them in parallel, and wait for the faster one. The slower computation will be canceled. The result type of race is IO[Either[A, B]].

val raced: IO[Either[String, Int]] = IO.race(
 IO("Faster").delayBy(5.seconds),
 IO(999).delayBy(10.seconds)
)

Method IO.racePair gives us more control over how to handle losing fiber. It returns both the Outcome of the faster task and the Fiber object instance we can use to cancel or join slower fiber. The return type might seem a little bit convoluted at first: Either[(OutcomeIO[A], FiberIO[B]), (FiberIO[A, OutcomeIO[B])]. If the left task wins we get the left value of the Either: the tuple containing the outcome of the left fiber and the fiber handle for the right. If the right wins, we get the right value, this time with the outcome of the right and handle of the left fiber.

IO.racePair(IO(10), IO(5).delayBy(2.seconds)).flatMap {
 case Left((resL, fibR))  => 
   fibR.cancel *> IO(s"Left fiber outcome: $resL")
 case Right((fibL, resR)) => 
   fibL.cancel *> IO(s"Right fiber outcome: $resR")
}

If we need to run two computations in parallel and wait for both of them to complete, we’ve got several options. Method IO.both returns a tuple containing both results: IO[(A, B)]. If any of the operations is canceled or fails the second one will also be canceled.

IO.both(IO(1).delayBy(1.second), IO("Hello"))

IO.bothOutcome doesn’t terminate the other fiber, even if the first one is canceled. It returns the tuple containing the outcomes of both fibers.

The Cats library provides many operators that allow for the parallel execution of effects. They are implemented as extension methods, so to use them, we need to make appropriate imports that will bring proper implicits into the scope.

import cats.syntax.all.*

Method parTupled is quite similar to IO.both. It can be called on the tuple of two IO and will run both in parallel. The result type is IO[(A, B)]. The semantics of this method are the same as those of tupled, with the difference that tupled runs effect sequentially.

If we want to execute an arbitrary number of effects concurrently, we can use parallel counterparts of traverse and sequence: parTraverse and parSequence.

val r: IO[List[Int]] = List(
 IO(1).delayBy(1.second),
 IO(2).delayBy(2.seconds),
 IO(3).delayBy(3.seconds)
).parSequence

We can limit the number of tasks being executed at once with variants parTraverseN and parSequenceN. They accept an additional Int parameter that specifies the maximum parallelism.

By default, both parTraverse and parSequence preserve the order of the task, which might cause a small performance overhead. On the other hand, parTraverseUnordered and parSequenceUnordered don’t keep the order, which might potentially make them execute faster.

Last but not least, we can use two useful symbolic operators: &> and <&. They also allow the execution of two effects in parallel. The failure in either of the IO will cancel the other one. If the whole computation is canceled, both actions will be canceled. Operator &> returns value from the right and <& from the left.

//we'll get 1 after approx. 2s
IO(1).delayBy(1.second) <& IO(2).delayBy(2.seconds)

We can think of those operators as counterparts of sequential *>, <*, << and >>.

Summary

This article is meant to be an introductory level tutorial of concurrency features provided by Cats Effect. CE provides much more than described here. For instance, I didn’t mention powerful data structures designed for managing concurrent access to data, like Ref or Queue. Perhaps that’s a good subject for another article. I hope you’d find this blog post insightful. Take care!

Blog Comments powered by Disqus.