Contents

Implementing Raft using a functional effect system

Implementing Raft using a functional effect system webp image

One of the areas where functional effect systems are said to excel is concurrent programming. This looks great on paper, but is it true in practice? Let's find out!

What could be a better case study for concurrent programming than implementing the Raft consensus algorithm? Hopefully, you didn't come up with too many answers to that question as that's what we are going to explore below!

We'll use the Scala programming language, and the ZIO library, which provides an implementation of a functional effect system. However, when discussing the traits of the implementation, we'll try to separate them so that it's clear if a given characteristic stems from using FP, effect systems, Scala, or ZIO.

TL;DR

The article consists of three parts: an introduction to Raft itself, a tour of the implementation of Raft using Scala and ZIO, and finally, a discussion on how the programming model influences the design, both in positive and negative ways. If you're familiar with some of these topics, or are interested in only one of the parts, simply jump to the relevant section.

For the really impatient:

  • take a look at the code
  • at the very end, we've got a pros & cons table; but for the justifications, you'll have to jump back a bit!

What is a functional effect system?

A functional effect system combines functional programming and its main building blocks of first-class functions and immutable data with a way to capture and run side-effecting computations in a controlled way.

In more concrete terms, such a system is usually centred around a data structure, along with an API to manipulate it. This data structure, typically called IO, Task, or some close variant, is a lazily-evaluated description of how to compute its result. It might involve running multiple computations concurrently, with forks, joins, and arbitrary side effects, such as network communication.

The laziness and immutability allow this approach to be composable, that is larger computation descriptions can be created from smaller ones. Composability, along with resource safety and decoupling computation ordering from definition ordering are the main traits that should make a functional effect system the perfect choice for concurrent programming.

Such a description is then passed to an interpreter, which executes the operations as specified. Hence, it's the interpreter that ultimately has control as to when and how to run the side-effects, how to manage resources, interruptions, threading, concurrency, etc.

What is Raft?

In the words of its authors:

Raft is a consensus algorithm for managing a replicated log.

It is widely used in the industry through multiple implementations, such as the most popular one: etcd/raft. The goal of Raft was to design an algorithm that is understandable—and that is going to be one of the primary goals in our implementation as well! After all, the challenge in each "industry" programming task is not only to solve it, but also to write code that can be read and comprehended by others.

I'm not going to describe the Raft algorithm in full here as there are many tutorials and blogs out there. The Raft page gives a great introduction, along with a visualisation mode. You can also check out the guided visual tutorial. I'd personally recommend at least skimming the Raft paper as it is written in a very approachable way, providing motivation for the algorithm's design, as well as its details. Finally, the one-page summary from page 4 is indispensable when implementing Raft.

raft-cheatsheet

In case you need a recap, here's a quick summary of the algorithm. The goal of Raft is to replicate data from incoming requests to all nodes in the system. Each request appends an entry to the log, and each node should eventually hold a log with the same entries in the same order. So it's not only the data that we replicate, it's their ordering as well! Entries that have been replicated to a majority of nodes and committed are then applied to the state machine, which is the ultimate destination for the log entries. What exactly this state machine does is use-case dependent—it can be arbitrary code, implementing whatever functionality is required.

In order to maintain a replicated, sequential log, a leader is elected. It's the leader's responsibility to accept new requests, replicate them, and, finally, mark them as committed. The log index up to which entries are committed hence has to be replicated as well. But which node becomes the leader?

At the beginning, all nodes start as followers. If they don't hear from a leader, after a randomised timeout, they become candidates and ask other nodes to vote for them. When a node gathers a majority of votes, it becomes a leader.

Of course, nodes or communication links might fail, and after some time, a new leader might have to be elected. To let other nodes know that one leader is more recent than another, Raft maintains a logical clock in the form of terms. Once a node sees a request with a higher term, it knows that a new election has started, and that a new leader might emerge.

There are many fine details and conditions that have to be met in order to avoid electing two leaders, ensuring that a leader is always eventually elected, and that even partially replicated, but committed entries do eventually end up on all nodes—but for that, you'll have to read the full paper!

Saft: the implementation

fruit-juice

Let's get to the juicy details! The implementation that we'll be discussing is called Saft, meaning "Scala Raft". You might go and take a look at the code now, run the Raft simulation that's included with it, or just read on.

The state

Saft tries to remain as close to the vocabulary and data structures used in the Raft paper as possible. Let's start our exploration of the codebase with classes representing the state that is stored on each server. We'll model them as immutable data structures. For example, here's an excerpt from how the state that's stored on all servers (regardless of their role, be it follower, candidate or leader) is represented:

case class NodeId(number: Int)

opaque type Term <: Int = Int

opaque type LogIndex <: Int = Int

case class ServerState(
    log: Vector[LogEntry],
    votedFor: Option[NodeId],
    currentTerm: Term,
    commitIndex: Option[LogIndex],
    lastApplied: Option[LogIndex]
)

You can view the full source in the domain.scala and state.scala files. First, we've got some supporting data structures, such as the NodeId, which for our purposes will simply wrap a number. Each node (there's usually an odd number of them such as 3 or 5) needs to have a unique identifier. These numbers might be then mapped to network addresses, for example.

The Term and LogIndex are opaque types, a construct that introduces new types that here are subtypes of Int at compile time, but at run-time are erased and become normal Ints.

These are used to create the immutable data structure, ServerState, which holds data such as the log, the current term number or the index, up to which entries in the log are committed. If you take a look at the Raft cheat-sheet, you'll see the exact same data structure with the same names.

The ServerState (and other classes holding state, such as LeaderState) contains also a number of helper methods, which either update the state with new data, or compute some projection of the state, for example:

case class ServerState:
  def updateTerm(observedTerm: Term): ServerState =
    if observedTerm > currentTerm
    then copy(currentTerm = observedTerm, votedFor = None)
    else this

  def hasEntryAtTerm(t: Option[LogIndexTerm]): Boolean = t match
    case None               => true
    case Some(logEntryTerm) => log.length > logEntryTerm.index &&
      log(logEntryTerm.index).term == logEntryTerm.term

This clears up the implementation of the algorithm itself as it reads much more naturally: we can manipulate the state on a higher level of abstraction.

The messages

The RPC messages that are defined by Raft are also implemented as immutable data structures. As before, they follow the naming conventions from the paper. Hence, we have RequestVote and AppendEntries classes, but also some other ones that are not explicitly mentioned but are needed as well: classes representing replies to messages (e.g. RequestVotesResponse), as well as a message that clients send to servers, NewEntry. The code is quite unsurprising, for example:

case class RequestVote(term: Term, candidateId: NodeId,
    lastLog: Option[LogIndexTerm])
  extends RequestMessage with FromServerMessage

The full source is available in messages.scala.

There's one important detail: the messages are classified based on whether a client or a server is the sender/recipient, and if the message is a request or response. These classification traits (ToServerMessage, ResponseMessage etc.) are then used to ensure that a message can only be used in the appropriate context.

The event queue

Now that we have the basic data structures defined, comes an important design decision: how will the nodes communicate with each other?

Saft doesn't mandate a specific approach and multiple communication channels can be plugged in. However, the whole implementation is event-driven. All incoming requests are put onto an event queue. The events are then processed one-by-one, in a sequence. This provides a way to protect the state from being updated by two requests concurrently.

There are three types of events: requests, responses, and timeouts. Timeouts might happen when no communication from a leader has been received and a new election has to be started, or when the leader needs to send heartbeats to let its followers know that it's still alive.

Events are once again modelled as immutable data structures, here using an enum:

enum ServerEvent:
  case Timeout extends ServerEvent
  case RequestReceived(message: RequestMessage,
    respond: ResponseMessage => UIO[Unit]) extends ServerEvent
  case ResponseReceived(
    message: ResponseMessage with ToServerMessage) extends ServerEvent

Take a look at ServerEvent.scala for the full definition. You can see here how the message classification traits are used to narrow down the messages that might be used as parameters in each case.

You might wonder what the UIO type that we see here is. RequestReceived.respond is a function returning a lazily evaluated description of the effect that responds to the request with the given response message. The UIO differs from IO, which we've mentioned in the introduction, in that all errors should be handled; that's a feature of ZIO, which we'll discuss later.

In the implementation, access to the event queue is abstracted using the Comms interface. It allows fetching the next event to process, adding an event to own queue (which is used for timeouts that are created locally and asynchronously) and sending a message to any other node:

trait Comms:
  def next: UIO[ServerEvent]
  def send(toNodeId: NodeId,
    msg: RequestMessage with FromServerMessage): UIO[Unit]
  def add(event: ServerEvent): UIO[Unit]

As you can see, all return types here are again descriptions of effects, where all errors should be handled. For example, the effect returned by next should eventually compute a ServerEvent instance.

Disambiguating the state

It so happens that the word "state" is used in three different contexts in the Raft paper, so I think it's useful to disambiguate its usages. To make things clearer in code, we'll use different names for these concepts:

  • "state" as in a node's role: either a follower, candidate, or leader. We'll use the term role reified as a NodeRole enum instead;
  • "state" as in the data that's stored either persistently or in-memory on a server. We'll use ServerState, LeaderState, etc. for these data structures, as we have seen in the earlier section;
  • "state" as in the state machine, which is where committed log entries are applied on each server. We'll keep using StateMachine.

I think the first and last points are especially important. We've got a StateMachine.scala file in the repository, which is used to apply committed log entries using user-provided logic (specific to the purpose for which we use Raft in the first place). As a side note, this is best done in the background (however, keeping the sequentiality!) so that the rest of the algorithm is not impacted by user code.

There is, however, a different state machine here as well: the Raft algorithm itself defines one, with the follower, candidate, and leader states. The transition between these states depends on timeouts and received messages. However, we'll try to avoid using the state machine terminology for this purpose, instead talking about node roles.

The Node code

Finally, we've gathered enough structure to start implementing the code of the node itself. That's where the Raft algorithm is implemented, and it's the heart of Saft.

The general idea is to process events in a loop, keeping the current server state:

  • get the next event using Comms,
  • depending on the type of the event, invoke an appropriate handler method, passing in the current node role & state,
  • inside the handler method, depending on the node's current role and state, run the logic as defined by the Raft algorithm, returning new role & state.

This could be implemented as a recursive function, parameterized by the current (immutable) state instance, which calls itself with the new state, modified in response to the received event. However, there are also some side-effects that might happen, such as sending messages to other nodes, or restarting the timer. Finally, the state, if modified, needs to be persisted, and a reply has to be sent in case the event was a request. As in our programming model side effects are run in a controlled fashion, we have to account for all of the above.

Before implementing the main loop, we'll define one more data structure that will capture the whole state associated with the current role of a node. This can be found at the very beginning of Node.scala:

private enum NodeRole:
  def state: ServerState
  def timer: Timer

  case Follower(state: ServerState, followerState: FollowerState,
    timer: Timer) extends NodeRole
  case Candidate(state: ServerState, candidateState: CandidateState,
    timer: Timer) extends NodeRole
  case Leader(state: ServerState, leaderState: LeaderState,
    timer: Timer) extends NodeRole

Apart from the ServerState that's always tracked regardless of a node's role, we also have role-specific state (such as the number of votes received when the node is a candidate), and the currently running Timer.

Keeping track of timeouts

The task of the Timer class (defined at the bottom of Node.scala) is to put a Timeout event on the event queue, using Comms.add, after the election or heartbeat timeout passes. However, it can also be restarted—e.g. when a message from the leader is received. Again, as we are using a functional effect system, all side effects, including timeouts, are controlled. Hence, the method used to restart the timer should return a description of an effect, which:

  • stops the previous timer, if it hasn't yet completed,
  • puts the Timeout event on the queue, after the specified timeout. This should happen in the background, independent of what the node is currently doing.

Translated to code, the description of the above two effects is:

private class Timer(conf: Conf, comms: Comms,
    currentTimer: Fiber.Runtime[Nothing, Unit]):
  def restartElection: UIO[Timer] =
    for {
      _ <- currentTimer.interrupt
      newFiber <- ZIO.sleep(conf.electionTimeoutDuration)
          .flatMap(_ => comms.add(ServerEvent.Timeout)).fork
    } yield new Timer(comms, newFiber)

The Timer.currentTimer is a reference to a fiber—a light-weight thread—that represents the currently set timeout, running in the background. When restarting a timer, we first need to interrupt that fiber so it no longer schedules its timeout, and start a new one. This is done by sequencing the ZIO.sleep effect with comms.add(Timeout). When interpreted, this description is executed in the background because of the .fork operation, which causes the effect on which it is called to be executed on a newly created fiber, asynchronously.

Partner with Scala Experts to build complex applications efficiently and with improved code accuracy. Working code delivered quickly and confidently. Explore the offer >>

Main loop

As mentioned earlier, in the main loop, we first receive the next event to handle using Comms.next, and then execute the appropriate logic. However, there's one requirement of the algorithm that we can fulfil before inspecting what the message is. If, at any point, we receive a message from another node with a higher term than ours, we need to immediately convert to a follower (regardless of the current role), and start waiting for messages from the new leader. Hence the main loop is as follows:

def run(role: NodeRole): UIO[Nothing] =
  comms.next
    .flatMap {
      // If RPC request or response contains term T > currentTerm:
      // set currentTerm = T, convert to follower (§5.1)
      case e @ ServerEvent.RequestReceived(msg: FromServerMessage, _)
        if msg.term > role.state.currentTerm =>

        val state = role.state
        val newState = state.updateTerm(msg.term)
        for {
          newTimer <- role.timer.restartElection
          _ <- persistence(state, newState)
          newRole <- handleEvent(e,
            NodeRole.Follower(newState, FollowerState(None), newTimer))
        } yield newRole
      case e => handleEvent(e, role)
    }
    .flatMap(run)

def handleEvent(event: ServerEvent, role: NodeRole): UIO[NodeRole] =
  ???

In the first branch, if a message with a higher term is received, the effect description that is being returned to the interpreter has three steps: restarting the timer, persisting the modified state (with the new term), and, finally, handling the event using the follower role. Otherwise, the event is handled using handleEvent and the current role, which we'll inspect next.

That's a single step—what about handling subsequent events? This is handled by the recursive call to run at the very end of the method, using .flatMap(run). The next invocation of run will receive a modified NodeRole instance (adjusted in response to the incoming event), and so until infinity. Won't this blow up the stack? No, as we aren't really doing infinite recursion. Instead, run returns a description of a computation to the interpreter, where the last step is again calling run. As the interpreter runs the described computation step-by-step, it can unwind and manage the stack; this technique is called trampolining. Hence infinite non-tail recursion is safe, which is quite an important tool in functional programming.

Handling events

We can now proceed to handling individual events, taking in the current NodeRole and producing a possibly side-effecting computation, which produces a new NodeRole:

def handleEvent(event: ServerEvent, role: NodeRole): UIO[NodeRole] =
  (event match
    case ServerEvent.Timeout => timeout(role).map((ZIO.unit, _))
    case ServerEvent.RequestReceived(rv: RequestVote, respond) =>
      requestVote(rv, role)
        .map((resp, newRole) => (doRespond(resp, respond), newRole))
    case ServerEvent.ResponseReceived(aer: AppendEntriesResponse) =>
      appendEntriesResponse(aer, role).map((ZIO.unit, _))
    case … => …
  ).flatMap((response, newRole) =>
    persistAndRespond(response, role, newRole))

def persistAndRespond(response: UIO[Unit],
  oldRole: NodeRole, newRole: NodeRole): UIO[NodeRole] =
    val persist = if needsPersistence(oldRole, newRole)
      then persistence(oldRole.state, newRole.state)
      else ZIO.unit
    // (Updated on stable storage before responding to RPCs)
    for {
      _ <- persist
      _ <- response
    } yield newRole

For each event, depending on the exact type of request or response received, we call the appropriate method. These methods are fully typed, for example the requestVote one needs to return the response of the appropriate type, along with the updated role:

def requestVote(rv: RequestVote,
    role: NodeRole): UIO[(RequestVoteResponse, NodeRole)]

The signature tells it all! After calling the appropriate method, we sequence the effect it returns (which might include sending messages to other nodes, or restarting the timer), with two other effects using persistAndRespond: first persisting the state, if it has been modified (ZIO.unit is an effect that does nothing), and then sending the response. Hence, we fulfil the requirement of the protocol to update the stable storage before responding to RPCs in a universal way, decreasing the chance of a bug.

Handling a vote request

Finally, let's take a look at how a vote request is handled. Other handlers are similar in their mechanics, however, of course, distinct in the logic they implement. When handling a vote request, we match on the specific node role that the node currently has:

def requestVote(rv: RequestVote,
    role: NodeRole): UIO[(RequestVoteResponse, NodeRole)] =
  role match
    case follower @ NodeRole.Follower(state, followerState, timer) =>
      // Reply false if term < currentTerm
      // If votedFor is null or candidateId, and candidate's log is at
      // least as up-to-date as receiver's log, grant vote (§5.2, §5.4)
      if ((rv.term < state.currentTerm) ||
          state.votedForOtherThan(rv.candidateId) ||
          state.hasEntriesAfter(rv.lastLog)) {
        ZIO.succeed(
          (RequestVoteResponse(state.currentTerm, voteGranted = false),
           }follower))
      } else {
        val newState = state.voteFor(rv.candidateId)
        timer.restartElection.map(newTimer =>
          (RequestVoteResponse(newState.currentTerm, voteGranted = true),
            NodeRole.Follower(newState, followerState, newTimer))
        )
      }
    case candidate: NodeRole.Candidate => ZIO.succeed(
      (RequestVoteResponse(candidate.state.currentTerm,
        voteGranted = false), candidate))
    case leader: NodeRole.Leader       => ZIO.succeed(
      (RequestVoteResponse(leader.state.currentTerm,
        voteGranted = false), leader))

If the node is a follower, there are some conditions specified in the Raft paper that must be met in order to grant a vote to a prospective new leader. These are expressed using the helper methods on the state classes. If a vote is granted, the state is modified appropriately, the timer is restarted (as we've just received communication from a potential leader), and the response is returned along with the role, holding the modified state; the node remains in the follower state. If the conditions are not met, or if the node is not a follower, the vote is not granted, and the timer remains intact.

Becoming a candidate

From an implementation point of view, it's also interesting to see how messages are broadcast to all other nodes—this happens when an election is started, as well as when the leader sends out a heartbeat or new entries. Let's take a look at the startCandidate method, which is called from timeout when an election timeout occurs (that is when a Timeout event is received by nodes in the follower or candidate roles):

def startCandidate(state: ServerState, timer: Timer): UIO[NodeRole] =
  // On conversion to candidate, start election: Increment currentTerm,
  // vote for self
  val newState = state.incrementTerm(nodeId)
  // Reset election timer
  for {
    newTimer <- timer.restartElection
    // Send RequestVote RPCs to all other servers
    _ <- ZIO.foreachDiscard(otherNodes)(otherNodeId =>
      comms
        .send(otherNodeId, RequestVote(newState.currentTerm, nodeId,
          newState.lastIndexTerm))
        .fork
    )
  } yield NodeRole.Candidate(newState, CandidateState(1), newTimer)

Changing the state to increment the term, restarting the timer, and returning the new candidate role is a similar pattern to what we've seen before. What's interesting is using ZIO.foreach to create an effect, which sequences a collection of other effects: here we take the collection of all other node ids, and for each such id, create an effect that sends a RequestVote to that node. This sending is done in the background (notice the .fork) at the end of the effect description as sending might block until a response is received. If this also blocked the node from processing further events, we might end up in a deadlock!

Yes, that's exactly what happened when I first ran that code when using HTTP communication (where each request waits for a response). The deadlock possibility is one example of a property that wasn't detected by the compiler or prevented by the construction of the code!

Running a simulation

We've covered most of the components, so it's about time to run Saft and see it in action! There are two options. The first, SaftSim, runs an entirely in-memory simulation by running 5 nodes with in-memory communication and in-memory "persistence".

The InMemoryComms implementation creates 5 event queues. Sending an event to another node amounts to getting the event queue for that node and placing the event on it; the same is true for the response effect that is part of receiving a request:

class InMemoryComms(nodeId: NodeId,
    eventQueues: Map[NodeId, Queue[ServerEvent]]) extends Comms:
  private val eventQueue = eventQueues(nodeId)

  override def next: UIO[ServerEvent] = eventQueue.take

  override def send(toNodeId: NodeId,
      msg: RequestMessage with FromServerMessage): UIO[Unit] =
    eventQueues(toNodeId)
      .offer(
        ServerEvent.RequestReceived(
          msg,
          responseMsg =>  eventQueues(nodeId)
            .offer(ServerEvent.ResponseReceived(serverMsg)).unit
        )
      )
      .unit

  override def add(event: ServerEvent): UIO[Unit] =
    eventQueue.offer(event).unit

During startup, we create the Node instances and run the effect as described by the run method that we've inspected in the background, again using .fork; then we read the next command from the console and execute appropriate actions:

ZIO.foreach(nodes)((nodeId, node) => node.start.fork.map(nodeId -> _))
   .flatMap(handleNextCommand)

The simulation allows stopping running nodes by interrupting the fiber that corresponds to that node's node.start effect or by running the start effect again.

The second runnable Saft app uses http+json for communication and file-based json persistence. Take a look at SaftHttp. Here, you'll need to start 3 (or more) separate instances to get a fully working Raft implementation. The HttpComms implementation this time on one hand manages a node's queue, but to send a request to another node, performs an http request, waiting for a response (with a timeout). State is saved to a file in the json format as well. This isn't a production-ready solution, as writing a file doesn't atomically replace its contents, but it's good enough to test the implementation.

Differences from the Raft paper

While the Saft implementation tries to remain as close as possible to what's described in the Raft paper, in two instances, technicalities of the implementation force a divergence.

One such case is the AppendEntriesResponse message, which in addition to a simple success flag, as specified in the paper, also includes the node that sends the response and the coordinates of the entries that have been included in the request. This is needed as unlike it is assumed in the Raft paper, in our implementation, requests and responses aren't correlated—that is we don't know for which request we are receiving a response. Hence, we need to add that additional information to the response itself.

The second case is some additional state that we need to track for the leader. The Raft protocol requires that clients receive a response to their NewEntry request only once that entry has been applied to the state machine. As communication happens asynchronously, we need to remember "on the side" which requests are awaiting a response—hence the additional awaitingResponses collection of response effects to be sent.

What's not in the implementation

Saft only implements basic Raft functionality, omitting three features that are mandatory for a production-grade system:

  • deduplication of incoming client NewEntry requests (needed in case a request fails from the client's perspective and is repeated),
  • log compaction (needed when the log becomes large),
  • cluster membership changes (needed when adding or removing servers).

These could be implemented using the same functional approach, however, as the goal of Saft is currently educational, we've left these out for simplicity.

However, adding the above features might be a great exercise for the reader! :)

Evaluating Saft

Now that we know how Saft is implemented, let's look at how functional programming, using an effect system in general, Scala and ZIO in particular influence the design and readability of the code base.

How much boilerplate is there? Does the implementation capture the "essence" of the protocol, or is the logic drowned in technicalities? Is it possible to comprehend the algorithm by reading the code? What kind of properties are checked by the compiler, and what need to be tested? To what degree is the code "correct by construction", and what needs careful coding?

Of course, having written the code and being a Scala developer for the past several years definitely obstructs my view. As all opinion pieces, the below is fully subjective, however, as you have access to the source code, please verify whatever is claimed below by yourself!

Functional programming

Let's start with FP. Which traits of FP have we been using extensively? First of all, we've got the immutable data structures, representing server state, messages and node roles. While immutable data is especially useful when the data is shared among concurrently running threads, here we don't really have any sharing—the server state & node roles data is used only within a single fiber.

Still, immutability plays an important role. You don't have to worry if updating some piece of state won't impact the algorithm in other places—you gain a certain peace of mind when implementing the protocol. It's an example of local reasoning, that is the fact that you only have to keep in mind the immediate surrounding of the code you write, without storing a mental model of the entire codebase, to make sure that modifying the state won't affect another place.

The fact that after handling an event, we have access both to the old & new state is also used for persistence. In the implementation, the primary source of truth is the in-memory state representation, while persistence is only a "backup": that is, modifications first occur in-memory and are then propagated to be persisted.

This is a deliberate design choice, and if we were to implement e.g. persistence using a relational database, we would have to compare old & new states to see which parts of the state changed. This might not be the most efficient solution, but it's efficient enough—and allows persistence, which is required to be done by the protocol before sending a reply, to be implemented in a general way, outside of the message-specific handlers. This reduces the chances of introducing a bug of skipping persistence in one of the code branches.

But there are downsides as well. When calling a method that updates the state and returns a new one, we have to use the new instance from that point instead of the old one! Take a look at this fragment of the AppendEntries handler for a follower:

// If an existing entry conflicts with a new one (same index but
// different terms), delete the existing entry and all that
// follow it (§5.3). Append any new entries not already in the log
val newState = state.appendEntries(ae.entries,
  ae.prevLog.map(_.index)).updateCommitIndex(ae.leaderCommit)
val response = AppendEntriesResponse.to(nodeId, ae)(
  newState.currentTerm, success = true)

// If commitIndex > lastApplied: increment lastApplied,
// apply log[lastApplied] to state machine (§5.3)
for {
  newTimer <- timer.restartElection
  newState2 <- applyEntries(newState)
} yield (response,
  NodeRole.Follower(newState2, newFollowerState, newTimer))

Here we update the state twice, obtaining newState and newState2 values. Using applyEntries(state) or returning a stale state, e.g. when creating the updated follower role: NodeRole.Follower(newState, …) might lead to subtle and hard-to-spot bugs.

The compiler does offer some help, as we'll get a warning when a value is left unused, or when the result of a function is discarded. Still, bugs such as the above are possible. To prevent them, I think the compiler would need to support some form of linear types so that after a "modification" operations such as .appendEntries, the old object should no longer be used. Unless we wanted to calculate a delta for persistence … which would make this compiler feature even more tricky, or impossible at all to implement.

Type safety

We're not only using a functional programming language, we are using a statically-typed functional programming language. How does this impact the implementation of Raft?

One of the biggest gains comes from exhaustivity checking in pattern matching. The ServerEvent, Message, and NodeRole data types are sealed, meaning that all possible implementations are enumerated in their source files. This, in turn, means that the compiler can verify that during pattern matching, we've covered all the cases, and report a warning (or an error—that's configurable) if some case is missing. That way, we can be certain that in our implementation, we've covered all possible incoming events, messages, or node roles.

That's also where the classification traits for messages play a role: by specifying that e.g. a ResponseReceived event can only contain ToServerMessage messages, we don't have to come up with artificial handlers, e.g. for new entry responses (which a server will never receive as they are sent to clients), and still maintain the benefits of exhaustivity checking.

Another minor benefit comes from the usage of opaque types such as Term and LogIndex. Under the hood, they are Ints, but there's no way to mix them up—to the compiler, they are distinct types, unassignable from each other.

Finally, in a slight divergence from what's described in the Raft paper, we're using Optional values instead of 0s. In the paper, log indexing starts from 1, and 0 is a special value meaning that no log entries have yet been added. Magical values are a common source of bugs, so in Saft, we're using Option[LogIndex] instead. Sometimes this means a bit more code to handle the is-defined and undefined cases, instead of some smart index arithmetics, however, I think it makes the code clearer in the end.

Of course, we get all the usual benefits of type safety, such as code navigation, completion, or properly typed method calls—which do play an important role as well. If the RequestVote message handler is a method that specifies in its signature that a RequestVoteResponse has to be returned, we have no other choice but to return a matching response in every branch of the implementing code.

To end the type safety section, a small but interesting implementation detail is how the compiler verifies that the run method is indeed calling itself recursively indefinitely. Here's its signature:

def run(role: NodeRole): UIO[Nothing]

Note that we return an effect, which upon evaluation should yield a value of type Nothing. But, by definition, there are no values of that type! The only way to create a result with that type is to either call run recursively, or throw an exception. If in one of the branches, we forget the recursive invocation, the compiler will let us know.

Effect systems

Concurrency

When considering what kind of benefits or disadvantages using an effect system brings, one thing that stands out is effortless concurrency. We've seen that when implementing Timer or when broadcasting RequestVote and AppendEntries messages. Creating a description of a computation, which should run in the background, is a simple modification of the original computation description: just add a .fork!

We don't have to worry about threads, executors, scheduling: the runtime interpreter will take care of this aspect. As they are cheap (memory and CPU-wise), we're free to create as many light-weight threads—fibers—as we want. Having such a liberty cleans up the code as the technical aspects of managing concurrency and threads are simply not there.

Ordering

A more subtle property of effect systems, which, however, influences how you write code in the long run, is the separation of definition ordering from computation ordering. It doesn't matter when you define a given computation—as we are only defining a description. What matters is when that description is used. It's one less thing that you have to worry about: "is it safe to define this computation now"?

Instead, you can create values that represent the computations and sequence them explicitly later. This helps to cleanly express in code some of Raft's requirements. For example: always persist state before responding to clients in Node.persistAndRespond; we create the description of the "send response" effect before persistence is even considered. The value representing the response effect is passed as a parameter from handleEvent to persistAndRespond.

Similarly, it is quite trivial to suspend replying to the client when a new entry is added. Raft mandates that a response is sent only when that entry has been successfully appended to the node's state machine—and for that to happen, the entry must be replicated and committed. This might take some time—and definitely some inter-node communication. So when receiving a NewEntry request, we have to put the response aside until we get enough response from other nodes.

With an effect system, this amounts to accumulating pairs of (LogIndex, UIO[Unit]), consisting of the index, which we await being committed, and the response effect to be sent. The effect can be safely created as a value—what's important is when it will be composed with the description returned to the interpreter. And this happens in Node.appendEntriesResponse—the awaiting responses are composed to be sent in parallel (again, we're touching effortless concurrency here) using foreachPar(responses), which is sequenced, as Raft requires, after the entry is applied to the state machine.

Testing

We haven't mentioned testing so far, but it's always an important aspect. Raft uses timeouts extensively, so it might be tricky to test it—time-based tests are usually slow and sensitive to random flakiness. However, as we have a controlled environment in which we run effects, we can do better. Note that all timeouts are implemented using the effect system, here it's ZIO.sleep. This creates a description of an effect that instructs the interpreter to resume the current fiber after a given time. That's fundamentally different from ZIO(Thread.sleep(...)) as in this case, the particular reason for blocking is opaque to the interpreter.

Since the interpreter is aware of when we put our fibers to sleep and for how long, in tests, we have the possibility to use a test clock where we control the flow of time. When we sequence an effect in tests, it is evaluated until every fiber it created is suspended on some condition (e.g. waiting in a queue or sleeping). Then we get back control: we can push the time forward by an arbitrary amount of time, running the effects that would normally happen during that time.

This way, a ZIO.sleep(Duration.fromSeconds(10)) can take milliseconds in tests and still verify that the effect does what it's supposed to do. The same happens when testing our Node implementation, take a look at NodeTest.scala:

val forward5seconds = TestClock.adjust(Duration.fromSeconds(5))
test("should replicate a single entry") {
for {
  // given
  fixture <- startNodes(Conf.default(5))
  // when
  _ <- forward5seconds // elect leader
  _ <- newEntry(LogData("entry1"), fixture.comms)
  _ <- forward5seconds // replicate
  // then
  results <- ZIO.foreach(fixture.conf.nodeIds)(nodeId =>
    fixture.applied(nodeId).get
      .map(a => assertTrue(a == Vector("entry1"))))
  // finally
  _ <- fixture.interrupt
} yield results.reduce(_ && _)

Currently, there are only two very basic tests; for a production system, we would need much more of those. Because Saft's implementation allows providing alternate communication implementations, we could also simulate faulty communication channels by dropping some messages intermittently, and verifying if Raft is able to operate properly and recover e.g. in split-brain scenarios. We could also imagine writing some property-based tests to simulate a random ordering of failures.

Syntax

As a final note in the section, it's necessary to consider the syntactic implications of using an effect system in Saft's code. A common complaint when reading code written using a value-based, reified representation of effects is that we can no longer use the familiar procedural syntax, where sequencing two effects is writing two side-effecting statements using an explicit or implicit ;, e.g.:

persist()
respond()

Instead, we have to use functions such as .flatMap or .map, either directly or using syntactic sugar (for). The above then becomes:

for {
  _ <- persist
  _ <- respond
} yield ()

This is a very simplistic example, and when reading Saft's code, you'll find more complex examples. The syntactic overhead is especially visible when there are conditionals or loops; with effect systems, we can't simply use normal control flow constructs such as ifs and for loops, they have to be either wrapped using a multi-level flatMap or replaced with functions such as ZIO.foreach.

And that's a valid point to make when considering programming using effect systems. You have to get used to reading such code—if you are coming from C or Java, it won't be immediately familiar. It's a tradeoff—there are some benefits, as we've seen above, but they don't come for free.

That said, Saft's code doesn't suffer much from the inability to use procedural syntax, or language-provided control flow constructs. The structure of each message handler is quite similar, that is first updating the state, and then creating some effects, with branching happening at the initial stage.

ZIO

Error handling

ZIO has some unique features not found in other effect systems, which also impact the way Saft code is written. One of these features is error handling—ZIO has different types for effects that can throw arbitrary Throwable errors (Task), for effects where all errors are handled (UIO), and for anything in between (IO[E, A] where E is the error type and A the result type). Of course, even when we declare that errors are handled, exceptions might occur—but they are then considered as unrecoverable implementation defects and treated differently from "normal" errors.

In Saft, you've probably noticed that we use almost exclusively the UIO type, that is the code declares that all exceptions should be handled. While this isn't a problem in most of the codebase, as it doesn't perform I/O or throw exceptions (except for defects, which—by design—should terminate the node's fiber), however, it's important when implementing Comms.send:

trait Comms:
  def send(toNodeId: NodeId,
    msg: RequestMessage with FromServerMessage): UIO[Unit]

Typically, sending a message to another node will involve networking, so I/O errors might occur. What should happen then? What should a node do when there's a communication error with another node? It can still operate despite these errors, receiving messages from other nodes; so terminating the node is not the right choice. Instead, the send implementation should handle all errors, and if a message cannot be sent, it should be dropped. Raft is designed to withstand communication failures.

Hence the signature communicates the intent correctly: any errors that occur during send should be handled. For implementers of Comms, this constitutes important information that guides the implementation. Of course, we could go with another design and accept arbitrary errors to be returned by Comms.send by changing the return type to Task[Unit], and handling all errors inside the Node implementation instead.

Automatic supervision

A property of fibers that is not used extensively in Saft, but still worth mentioning, is automatic supervision. Whenever we fork a new fiber, it is forked in the parent's fiber scope. This means that when the parent fiber terminates, all child fibers terminate as well. We use this property in SaftSim, the in-memory Raft simulation. To simulate a failure of a node, its fiber is interrupted. Thanks to automatic supervision, we don't have to worry about stray child fibers still waiting for a timeout or sending messages—they get terminated as well, so cleanup is automatic. A good way to avoid fiber leaks!

High-level concurrency

Also on the topic of fibers, it is worth mentioning that ZIO discourages direct usage of .fork:

Using fibers can be tricky, so instead of using this method directly, consider other higher-level methods, such as raceWith, zipPar, and so forth.

However, that's what we end up with in Saft's implementation. While I agree that it would be best to use higher-level operations such as race e.g. for timeouts, the protocol requires more sophisticated behaviour (e.g. restarting the timer is conditional), which rules out using that method. Except for one foreachPar usage for sending replies, all other concurrency is managed directly using fibers. That, on one hand, might be tricky, but on the other shows that it's a powerful and flexible construct.

Naming

ZIO sparked many hot discussions, and one of them was around naming. In some cases, ZIO departs from the terminology used e.g. in Haskell or cats-effect, justifying the move by being more "programmer friendly". One such example is foreach, which "traditionally" has been called sequence or traverse. While I'm generally in favour of using widely accepted terminology instead of inventing new one, here I must say that using ZIO.foreach has been more intuitive than its cats-effect counterpart.

It always took me some mental effort to construct the correct traverse invocation, while using foreach seemed natural. And it reads well:

ZIO.foreach(otherNodes)(otherNodeId =>
  sendAppendEntry(otherNodeId, state, leaderState))

Other designs

The above is for sure one of many possible designs, when implementing Raft. I deliberately didn't study other implementations (which are listed on Raft's page), not to get influenced too much by them.

One pattern that might stand out is that Saft uses queues extensively. To communicate with a node, in the end, an event is put on a queue; the events are then consumed in order by a single fiber, thus protecting the node's state from concurrent access. Yes—it's an actor, with the actor's behaviour changing in response to the received events.

It might be worth considering if using an actor framework, such as Akka would make sense. It might, however, the amount of code in Saft that is needed to create an actor is quite minimal—I doubt if this can be further improved.

The current incarnation of Akka, Akka Typed, takes a similar approach of passing the current actor's state as a method parameter, where the method defines the actor's behaviour. It also provides an API to describe the actor's behaviour as a data structure—something similar to an effect system. However, it doesn't go as far as capturing every effect in a lazy manner, as it uses eagerly evaluated Futures. If you're interested in a more comprehensive comparison of Akka and a functional effect system, take a look at this article.

An interesting design note is that Akka Typed encourages a method-per-actor-state design. In Saft's case, this would translate to defining a method for each node's role. So we would have mutually recursive follower, candidate and leader methods, parameterized with the current server state (and time), each containing logic describing how to handle events in that node role.

That was also my initial design. However, it had two major flaws. First, persisting the server state had to be done in multiple places, in each of the node-role methods, instead of being centralised. Similarly, there were no methods responsible for handling RequestVote, AppendEntries requests, etc. as these have been handled in the node-role methods separately. Hence, ensuring that some reply has been sent for each request, and of the correct type, has been made impossible.

Refining node roles as a data structure and first matching on the received event instead of first matching on the node's role allowed to implement the above two requirements in a general fashion.

Once again, creating a data structure describing some concept (here: the node role) provides an additional degree of freedom and gives us more flexibility in the implementation.

Pros & cons

Let's summarise how the approach that we've chosen—using a type-safe functional programming language and an effect system affect the process of writing a Raft implementation:

ProsCons
Functional programmingImmutability
Local reasoning
Access old & new state
Old, stale values might be used
Type safetyCode navigation & completion
Exhaustivity checking
Opaque types
Optionals
Effect systemsEffortless concurrency
Separation of definition and computation ordering
Testing
Syntax overhead: monads, for-s, flatMaps
ZIOError handling
Automatic supervision
Naming
Not using the high-level concurrency operations
OtherRepresenting node role as a data structure

Correctness

All of the above is a means to an end: to help us write an implementation that is correct by construction, and whose properties are verified at compile-time. Scala's type system, while powerful, still lacks a lot of expressive power to prove that our implementation is correct. However, we do verify that some properties are met, such as:

  • each request & response is handled (due to exhaustivity of pattern matching)
  • each request receives a response of the appropriate type (due to typing of handler methods)
  • edge conditions (empty log) are properly handled (due to Option usage)
  • state is safe from a concurrency point of view (due to immutability)

You could say that all of these are the "easy" parts; the "hard" properties, such as liveness, state machine safety (as defined in the paper) etc. need to be carefully coded as described in the paper, and proved to hold externally. That's true: however the help we do get from the compiler and our construction is still very valuable as it lets us focus on the hard parts. The other traits of the implementation lend us hints during development which aid in writing a correct implementation, such as:

  • side-effects are properly sequenced (due to explicit ordering)
  • broadcasts happen in parallel (due to forking)
  • persistence always happens, and always before replies (due to construction of run method)
  • election / replication conditions are met (due to helper methods in State)

Especially the last point is really a readability argument: it's good if the implementation follows the paper closely, as then it's easier for a human to verify, that it follows what's specified in the paper, which we know that is proven to be correct.

Wrapping up

This completes our rather lengthy tour of Raft, effect systems, Saft implementation, and Saft evaluation. Regardless of whether you are familiar with Raft or only getting to know it, take a look at the code. Run the simulation, try to break it. How well does the code convey the algorithm? How comprehensible is it when reading?

It will be most interesting to see what kind of bugs the implementation still has—what kind of properties aren't detected by the compiler or made impossible by construction?

I've set out to write this implementation as a benchmark of the currently available tools for concurrent programming in Scala—and I'm pretty happy with the results of the experiment. But it will be most interesting to learn about your result of reviewing the code!

Scala

Blog Comments powered by Disqus.