Contents

Reactive Event Sourcing in Java, Part 2: Actor Model

Andrzej Ludwikowski

26 Oct 2021.9 minutes read

In this part of the series, we will address the problem of concurrent access. Our domain code is very elegant but even though we are using records and some immutable collection, it is not entirely safe in a multithreaded environment. For example, we want to achieve the guarantee that in case of reserving the same seat concurrently, one request will succeed and one will fail. How to implement this? In most cases, you will introduce some sort of optimistic (or pessimistic) locking at your database level. In this scenario, you are not handling concurrency per se. You rather move this responsibility to a database. This is fine for a standard system with a moderate load, but we are building a reactive solution that should be easy to scale. Also, you are limited to databases that support such locking, which is not always the case. Fortunately, there are other options to handle concurrency. It's time to introduce our main guest which is the Actor and Akka stack that implements the Actor Model.

Actor

An actor is a very simple yet clever abstraction that will help you easily build very complex concurrent systems. You can think of it as a protector of your state. If you want access to your state from many threads, in a safe way, wrap it inside the actor and talk only to the actor. Communication with the actor is possible only by sending messages. Each actor has its own message box and it will consume one message at a time. Hence, with correct implementation, there is no way that 2 or more concurrent threads will do something with your state.

A persistent actor (also known as an event sourced actor) from Akka, additionally, is able to persist its state in a form of events. With the new typed actor API, you might also encounter a notion of an entity. It is used with sharding, when you want to spread your actors across different nodes. This might be confusing, so at this point, whenever you see an Akka Entity, remember that in fact, this is an actor. Of course, I’m skipping a lot of details right now, but for the sake of this part, I think it should be enough to understand what is going on.

Once you add the required dependencies to pom.xml, you can create your first actor. We will focus only on the typed API. It’s much safer to use it and it’s recommended as a default choice. Be aware that you can still use the old, classic API. When it comes to functionalities, it's basically the same, but if this is your first adventure with Akka, stick to the typed API. It will be much harder to do some silly mistakes and the compiler will check a lot of things for you.

To create an actor or entity, we need to define its behavior. Behavior is an abstraction to encapsulate an actor's contract. If you send me this message, I will reply with that, or not reply at all, depending on what you want to achieve. Let’s check it and implement an AdderActor.

record Add(int a, int b, ActorRef<Integer> replyTo) {
}

public class AdderActor extends AbstractBehavior<Add> {

    public AdderActor(ActorContext<Add> context) {
        super(context);
    }

    @Override
    public Receive<Add> createReceive() {
        return newReceiveBuilder().onMessage(Add.class, add -> {
            int result = add.a() + add.b();
            add.replyTo().tell(result);
            return Behaviors.same();
        }).build();
    }
}

We need to extend AbstractBehavior and type it with our input message Add. The input message is a record with ints a and b, and a reference to the asking actor required to reply with the result. Handling such a message is very simple. On message Add, first, reply with the result (tell method) and then return the same behavior. Everything is typed, so there is no way to reply with e.g. String, the compiler will check the contract for you. For more examples, just follow the official documentation.

Event sourced behavior

In most cases, you shouldn’t implement bare actors. You should use higher-level abstraction from the Akka libraries and for Event Sourcing, there is a special behavior, called EventSourcedBehaviour. Because we want to send a reply for each message, we can force the compiler to check this by using EventSourcedBehaviorWithEnforcedReplies. Event sourced behaviors are parameterized with 3 other types: Command, Event, State. Looks pretty familiar, don’t you think? However, there is a small change here. Instead of using our domain command, we will use an envelope ShowCommandEnvelope that contains the domain command and an actor reference required to reply.

public sealed interface ShowEntityCommand extends Serializable {

    record ShowCommandEnvelope(ShowCommand command, ActorRef<ShowEntityResponse> replyTo) implements ShowEntityCommand {
    }
}

Why is it useful? First, our domain is untouched by the Akka library and second, we will later add some commands that have nothing to do with our domain. The response is also typed with ShowEntityResponse.

public sealed interface ShowEntityResponse extends Serializable {

    final class CommandProcessed implements ShowEntityResponse {
    }

    record CommandRejected(ShowCommandError error) implements ShowEntityResponse {
    }
}

We can reply that the command was processed or the command was rejected with some error. This is our first event sourced behavior contract.

Extending the EventSourcedBehavior* class requires implementing 3 methods:

@Override
public Show emptyState() {...}

@Override
public CommandHandlerWithReply<ShowEntityCommand, ShowEvent, Show> commandHandler() {...}

@Override
public EventHandler<Show, ShowEvent> eventHandler() {...}

ReplyEffect

Since we have only a single command, the commandHandler implementation is really easy.

public CommandHandlerWithReply<ShowEntityCommand, ShowEvent, Show> commandHandler() {
    return newCommandHandlerWithReplyBuilder().forStateType(Show.class)
        .onCommand(ShowEntityCommand.ShowCommandEnvelope.class, this::handleShowCommand)
        .build();
}

The interesting part starts in the handleShowCommand method.

private ReplyEffect<ShowEvent, Show> handleShowCommand(Show show, ShowEntityCommand.ShowCommandEnvelope envelope) {
    ShowCommand command = envelope.command();
    return show.process(command, clock).fold(
        error -> {
            return Effect().reply(envelope.replyTo(), new CommandRejected(error));
        },
        events -> {
            return Effect().persist(events.toJavaList())
                .thenReply(envelope.replyTo(), s -> new CommandProcessed());
        }
    );
}

This is the place where we merge our Event Sourcing library with the Event Sourcing domain. To do so, we need to process the domain command from the envelope. There are two possible outcomes. In case of an error, the effect is only replying to the sender that the command was rejected. With successful command processing, we will get a list of events. As described in the previous post, before we reply with a CommandProcessed message, we need to save these events. What is not explicit in the code (although well documented) is that for all persistent events, the eventHandler method will be called, and then Akka Persistence will execute the thenReply section. Thanks to EventSourcedBehaviorWithEnforcedReplies, the compiler will check if we return the ReplyEffect instead of a simple Effect.

Why is invoking eventHandler done under the hood? Thanks to this, you don't need to remember to use it in all command handlers. Also, this method will be invoked during the recovery of the entity when you need to load all events for a given aggregate and apply them one by one.

public EventHandler<Show, ShowEvent> eventHandler() {
    return newEventHandlerBuilder()
        .forStateType(Show.class)
        .onAnyEvent(Show::apply);
}

In this case, the implementation is even more straightforward. We simply redirect all events to the Show::apply domain method.

Extending EventSourcedBehavior* not only gives us a nice abstraction on handling commands, persisting and applying events. It's also guarding the state against concurrent access in a reactive way. The actor thread is not blocked by the communication with the database. It will resume the work right after the response from the underlying event store. Since then it will buffer all commands using a more sophisticated version of Stash mechanism. This is basically an implementation of a Single Writer Principle, a very powerful technic for building really performant consistent systems (in comparison to locking).

Entity test

Ok, it's time to run it and test it. To create an actor, you need an Actor System. The Actor System is like "home" for actors. Something has to manage all actors, their lifecycle, scheduling strategies, etc. You can look at it as a Spring Context that manages Spring Beans.

For testing, you don’t have to create an Actor System manually. You can use very convenient utilities like ActorTestKit with some default configuration overridden by some unit test configuration. Don’t forget to shut it down after all tests.

private static final ActorTestKit testKit =        ActorTestKit.create(EventSourcedBehaviorTestKit.config().withFallback(UNIT_TEST_AKKA_CONFIGURATION));

@AfterAll
public static void cleanUp() {
    testKit.shutdownTestKit();
}

To test our event sourced behavior, there are at least 2 strategies.

White Box test

A White Box test with an EventSourcedBehaviorTestKit is quite nice because you can assert anything you want. Not only the response, but also the persisted event, and the state inside the actor.

@Test
public void shouldReserveSeat() {
    //given
    var showId = ShowId.of();
    EventSourcedBehaviorTestKit<ShowEntityCommand, ShowEvent, Show> showEntityKit = EventSourcedBehaviorTestKit.create(testKit.system(), ShowEntity.create(showId, clock));
    var reserveSeat = randomReserveSeat(showId);

    //when
    var result = showEntityKit.<ShowEntityResponse>runCommand(replyTo -> toEnvelope(reserveSeat, replyTo));

    //then
    assertThat(result.reply()).isInstanceOf(CommandProcessed.class);
    assertThat(result.event()).isInstanceOf(SeatReserved.class);
    var reservedSeat = result.state().seats().get(reserveSeat.seatNumber()).get();
    assertThat(reservedSeat.isReserved()).isTrue();
}

Black Box test

The second strategy is a Black Box approach, where you can talk to the actor and check only the response. What is going on inside the actor is hidden from you. This strategy is closer to the production usage of actors. To get the response from the actor, you need to use something called a testing probe, which will emulate a sender actor.

//given
var showId = ShowId.of();
var showEntityRef = testKit.spawn(ShowEntity.create(showId, clock));
var commandResponseProbe = testKit.<ShowEntityResponse>createTestProbe();

var reserveSeat = randomReserveSeat(showId);

//when
showEntityRef.tell(toEnvelope(reserveSeat, commandResponseProbe.ref()));

//then
commandResponseProbe.expectMessageClass(CommandProcessed.class);

Behavior setup

One thing I didn't cover so far is the ShowEntity.create method.

public static Behavior<ShowEntityCommand> create(ShowId showId,
                                                 Clock clock) {
    return Behaviors.setup(context -> {
        PersistenceId persistenceId = PersistenceId.of("Show", showId.id().toString());
        return new ShowEntity(persistenceId, showId, clock, context);
    });
}

To create an actor or entity, either via testKit.spawn or the EventSourcedBehaviorTestKit.create method, we need to pass the Behavior[T]. For that, we are using a static factory method. This method is invoking our private constructor and wrapping it with Behaviors.setup and we can return Behavior[ShowEntityCommand]. A PersistenceId (passed to the constructor) is a key under which we will store all events, for a given aggregate, in the database (a topic for one of the next lessons).

If this is confusing for you — don’t worry. Take your time, analyze the code, run the tests, read the logs. This is like a recipe on how to assemble all these pieces. You need to do this once and this code will not change (drastically) in the future. For more details, I would recommend reading the documentation.

Summary

I hope you liked the idea of actors combined together with Event Sourcing. I didn't elaborate on the technical pros of this solution because I don't want to repeat my previous post. The lack of a locking mechanism will reveal its full potential when we start talking about persistence in detail.

In comparison to most of the tutorials, the domain is separate from the technical (actor) stuff. Such separation gives you the ability to switch from Akka Persistence to something else in the future. The domain will remain the same. Most likely this will never happen, but it's good to have a separation of concerns in your codebase. Adding new features and evolving the application will be much more pleasant this way.

If you get the impression that you still don't know how to connect actors to your current ecosystem, stay tuned, we will cover this in the next lesson and hide the ShowEntity behind a nice service layer. Until then, check out the full source code from part_2 tag. If you don't want to miss any new materials, subscribe to SoftwareMill Academy.

Blog Comments powered by Disqus.