Contents

Reactive Event Sourcing in Java, Part 3: Service

The main topic of the third part of the series about Event Sourcing will be to hide ShowEntity under a nice service wrapper. Otherwise, the logic required to talk with the actor will be repeated in many places, which is always a bad idea. ShowService will be our port from the Hexagonal Architecture point of view, ready to use by any adapter in the future.

Querying the entity

Before that, we will start with something very simple. Sometimes we would like to ask the entity for some data for reading purposes. To make it possible, we need to add another command GetShow. Now, I know what you are thinking. This is not a command, it’s more like a query. Yes, that’s true. Hence, everything is typed and we need to follow a generic contract. The contract is that our event sourced entity will accept any message that extends ShowEntityCommad. Close your eyes and treat GetShow as a command to get some data.

public sealed interface ShowEntityCommand extends Serializable {

    record ShowCommandEnvelope(ShowCommand command, ActorRef<ShowEntityResponse> replyTo) implements ShowEntityCommand {
    }

    record GetShow(ActorRef<Show> replyTo) implements ShowEntityCommand {
    }
}

Handling such a command is very straightforward.

public CommandHandlerWithReply<ShowEntityCommand, ShowEvent, Show> commandHandler() {
    return newCommandHandlerWithReplyBuilder().forStateType(Show.class)
        .onCommand(ShowEntityCommand.GetShow.class, this::returnState)
        .onCommand(ShowEntityCommand.ShowCommandEnvelope.class, this::handleShowCommand)
        .build();
}

private ReplyEffect<ShowEvent, Show> returnState(Show show, ShowEntityCommand.GetShow getShow) {
    return Effect().reply(getShow.replyTo(), show);
}

WARNING: such implementation is ok only if the state is immutable, like in our case. The state could be mutable, but then before we respond with it, we need to create a full copy. Otherwise, we could break the actor/entity encapsulation and we will not be thread-safe anymore.

Thanks to the GetShow command, we can extend our black-box test with more logic that will also check the state after processing the reserve seat command.

@Test
public void shouldReserveSeat_WithProbe() {
    //given
    var showId = ShowId.of();
    var showEntityRef = testKit.spawn(ShowEntity.create(showId, clock));
    var commandResponseProbe = testKit.<ShowEntityResponse>createTestProbe();
    var showResponseProbe = testKit.<Show>createTestProbe();

    var reserveSeat = randomReserveSeat(showId);

    //when
    showEntityRef.tell(toEnvelope(reserveSeat, commandResponseProbe.ref()));

    //then
    commandResponseProbe.expectMessageClass(CommandProcessed.class);

    //when
    showEntityRef.tell(new ShowEntityCommand.GetShow(showResponseProbe.ref()));

    //then
    Show returnedShow = showResponseProbe.receiveMessage();
    assertThat(returnedShow.seats().get(reserveSeat.seatNumber()).get().isReserved()).isTrue();
}

Some purists might say that we should create a read model for that and the command handling side shouldn't be responsible for queries. Definitely, we will talk about CQRS in a separate post, but I prefer a more practical approach. Reading data from the entity is (usually) a very cheap operation, so why not take advantage of this.

Service

That was only a starter required to do something more interesting. As you might have already noticed, based on the test above, talking with actors required some ceremony. My advice is to encapsulate it in a nice, clean service. We would like to have a ShowService that will expose only 3 methods:

public class ShowService {

    public CompletionStage<Show> findShowBy(ShowId showId) {    }

    public CompletionStage<ShowEntityResponse> reserveSeat(ShowId showId, SeatNumber seatNumber) {}

    public CompletionStage<ShowEntityResponse> cancelReservation(ShowId showId, SeatNumber seatNumber) {  }
}

Since we are building a reactive solution, the return type is wrapped with CompletitionStage, ready to be combined with other stages of processing. Returning ShowEntityResponse, which is a part of the ShowEntity contract, might look like an abstraction leak. That depends on the convention, if this is problematic for you, just return something more specific to the service contract. Here it will be quite artificial to do so, but in some cases, it will be more natural.

Sharding

The main responsibility of the service is to hide or simplify all the communication with the ShowEntity. Instead of spawning the actor manually like in our test: testKit.spawn(ShowEntity.create(showId, clock)), we will use Akka Sharding for that. Long story short, sharding gives you the ability to create (and to talk) with actors on different nodes of the Akka Cluster. This way, we can very easily spread the load and achieve a Distributed Single Writer Principle.

Wait! Does it mean that I need to run the Akka Cluster now? It's not as hard as you might think, but the answer is no (or not entirely). We can create a single instance cluster with just one node. It will behave like a standard application. In the future, if we decide that we would like to scale and handle more traffic, then we can enable the full potential of the Akka Cluster with many nodes, rolling updates, dynamic scaling, etc. The best thing is that we will need to change only the configuration file. The code responsible for talking with actors will be exactly the same. Location Transparency is actually one of the main Akka architecture drivers. Everything is designed to work in a distributed setup by default.

To enable sharding, we will need the application.conf file with the following settings:

akka {

  actor {
    provider = "cluster"
  }

  cluster {
    seed-nodes = ["akka://es-workshop@127.0.0.1:2551"]
  }

  remote {
    artery {
      canonical.hostname = 127.0.0.1
      canonical.port = 2551
    }
  }
}

Akka configuration is using the HOCON format. Personally, I think it's way better than YAML. In the snippet above, we change the akka.actor.provider from local to cluster. The seed node, required to form a cluster, is the same as the application address. The remote protocol is artery (a default option). We will cover these settings in detail later when we talk about dockerization and scaling.

To get EntityRef, we need to initialize sharding with a proper Entity factory method. This could be done in the service constructor:

public ShowService(ClusterSharding sharding, Clock clock) {
    this.sharding = sharding;
    sharding.init(Entity.of(SHOW_ENTITY_TYPE_KEY, entityContext -> {
        ShowId showId = new ShowId(UUID.fromString(entityContext.getEntityId()));
        return ShowEntity.create(showId, clock);
    }));
}

Thanks to the entityRefFor method we can, finally, acquire the reference for a given showId:

private EntityRef<ShowEntityCommand> getShowEntityRef(ShowId showId) {
    return sharding.entityRefFor(SHOW_ENTITY_TYPE_KEY, showId.id().toString());
}

The same showId will be available in the entityContext.getEntityId() method. When we send the message via EntityRef, sharding logic will check if such an entity already exists in the ActorSystem. If not, then it will get the instruction on how to create the entity (under the SHOW_ENTITY_TYPE_KEY). Use the lambda expression (above) to build it and route the message to a new ShowEntity. On the other hand, if the entity already exists in the ActorSystem, then no additional steps are required. The message can be routed right away. You don't have to recreate the state (which might take some time) for each message. That's why Event Sourcing based on actors (with Akka implementation) is so fast because once you load the state, it will wait to handle the next command. That's why reading from the entity is cheap, because in many cases, it doesn't require any event store access. With a Single Writer Principle (mentioned in part 2), we can use the power of consistent write-through cache. Together with Akka Sharding and Akka Cluster, we can embrace the full potential of the ActorSystem and play with a Distributed Single Writer Principle, but that's a topic for a separate post.

From the communication perspective, the EntityRef works exactly the same as ActorRef, but it is a separate abstraction to emphasise that we are using sharding with a more complex life cycle of an entity than in case of a plain actor.

There are many options for interaction with actors. In our service layer, we will go with Request-Response and use the ask method.

public CompletionStage<Show> findShowBy(ShowId showId) {
    return getShowEntityRef(showId).ask(replyTo -> new ShowEntityCommand.GetShow(replyTo), askTimeout);
}

The ask signature might be confusing because the first parameter is a lambda function that will create a message to the actor with a replyTo reference to the asking actor. Fortunately, we don't have to create such an actor (like in our test here). Everything is encapsulated inside the ask method. It looks so strange because, under the hood, the only way to communicate with an actor is via the tell method (fire and forget). If this is still not clear for you, then I would recommend reading carefully the official documentation about interaction patterns.

The second problem with the ask method is that sometimes you must help the Java compiler with the return type, e.g. this code will not compile:

public CompletionStage<Show> findShowBy(ShowId showId) {
    var result = getShowEntityRef(showId).ask(replyTo -> new ShowEntityCommand.GetShow(replyTo), askTimeout);
    return result;
}

but this one will be ok:

public CompletionStage<Show> findShowBy(ShowId showId) {
    var result = getShowEntityRef(showId).<Show>ask(replyTo -> new ShowEntityCommand.GetShow(replyTo), askTimeout);
    // this will also work
    // CompletionStage<Show> result = getShowEntityRef(showId).<Show>ask(replyTo -> new ShowEntityCommand.GetShow(replyTo), askTimeout);
    return result;
}

Service test

Testing the ShowService should be a rather easy task. This time, we're creating ActorSystem (with in-memory events store) manually, just to check how this could be done without the test kit.

private static Config config = PersistenceTestKitPlugin.config().withFallback(ConfigFactory.load());
    private static ActorSystem system = ActorSystem.create("es-workshop", config);
    private ClusterSharding sharding = ClusterSharding.get(Adapter.toTyped(system));
    private Clock clock = new Clock.UtcClock();
    private ShowService showService = new ShowService(sharding, clock);

    @AfterAll
    public static void cleanUp() {
        TestKit.shutdownActorSystem(system);
    }

    @Test
    public void shouldReserveSeat() throws ExecutionException, InterruptedException {
        //given
        var showId = ShowId.of();
        var seatNumber = randomSeatNumber();

        //when
        var result = showService.reserveSeat(showId, seatNumber).toCompletableFuture().get();

        //then
        assertThat(result).isInstanceOf(ShowEntityResponse.CommandProcessed.class);
    }

    @Test
    public void shouldCancelReservation() throws ExecutionException, InterruptedException {
        //given
        var showId = ShowId.of();
        var seatNumber = randomSeatNumber();

        //when
        var reservationResult = showService.reserveSeat(showId, seatNumber).toCompletableFuture().get();

        //then
        assertThat(reservationResult).isInstanceOf(ShowEntityResponse.CommandProcessed.class);

        //when
        var cancellationResult = showService.cancelReservation(showId, seatNumber).toCompletableFuture().get();

        //then
        assertThat(cancellationResult).isInstanceOf(ShowEntityResponse.CommandProcessed.class);
    }

If you analyse the logs, you could see that the single node cluster was properly formed:

Cluster Node [akka://es-workshop@127.0.0.1:2551] - Starting up, Akka version [2.6.16] ...
Cluster Node [akka://es-workshop@127.0.0.1:2551] - Registered cluster JMX MBean [akka:type=Cluster]
Cluster Node [akka://es-workshop@127.0.0.1:2551] - Started up successfully
Cluster Node [akka://es-workshop@127.0.0.1:2551] - No downing-provider-class configured, manual cluster downing required, see https://doc.akka.io/docs/akka/current/typed/cluster.html#downing
Cluster Node [akka://es-workshop@127.0.0.1:2551] - Node [akka://es-workshop@127.0.0.1:2551] is JOINING itself (with roles [dc-default], version [0.0.0]) and forming new cluster
Cluster Node [akka://es-workshop@127.0.0.1:2551] - is the new leader among reachable nodes (more leaders may exist)
Cluster Node [akka://es-workshop@127.0.0.1:2551] - Leader is moving node [akka://es-workshop@127.0.0.1:2551] to [Up]

Summary

As I mentioned at the beginning, ShowService is our main port to the business functionalities. The actual service contract could be of course adjusted to your needs and standards. Having a service wrapper around the entity will keep the code more reusable and fault-tolerant. In the next part of the series, we will use this port in one possible adapter (an HTTP endpoint) and invite the Spring ecosystem to our codebase. Until then, check out the full source code from the part_3 tag, run all the tests, and analyse logs. If you don't want to miss any new materials, subscribe to SoftwareMill Academy.

Blog Comments powered by Disqus.