Reactive Event Sourcing in Java, Part 8: Akka Projection

Reactive Event Sourcing in Java, Part 8: Akka Projection webp image

As I mentioned in the previous post, our read model projection implementation is not done. To deploy this solution in production, we need to add also:

  • projection management (start, stop, scaling),
  • error handling,
  • offset tracking and delivery guarantees.

This code would look the same in every project, regardless of the domain. That's why the Akka team created another abstraction on top of Akka Persistence Query, which is Akka Projection. This is exactly what we need to finalize our read model implementation with all missing features.

Akka Projection

Because of technology nuances, Akka Projection gives us a different set of capabilities. If we decide to keep offsets in a relational database, we can achieve:

In the case of Cassandra, exactly-once is not an option, so we can choose between:

For Kafka, we get a completely different set of features, tailored for this solution.

For the sake of this tutorial, let's focus only on JDBC integration. To run our projection, we need a Handler. This is the essence of the projection - the domain logic:

public class ShowViewEventHandler extends Handler<EventEnvelope<ShowEvent>> {

    private final ShowViewRepository showViewRepository;

    public ShowViewEventHandler(ShowViewRepository showViewRepository) {
        this.showViewRepository = showViewRepository;
    }

    @Override
    public CompletionStage<Done> process(EventEnvelope<ShowEvent> showEventEventEnvelope) {
        return switch (showEventEventEnvelope.event()) {
            case ShowEvent.ShowCreated showCreated ->
                    showViewRepository.save(showCreated.showId(), showCreated.initialShow().seats().size());
            case ShowEvent.SeatReserved seatReserved -> 
                    showViewRepository.decrementAvailability(seatReserved.showId());
            case ShowEvent.SeatReservationCancelled seatReservationCancelled ->
                    showViewRepository.incrementAvailability(seatReservationCancelled.showId());
        };
    }
}

Next, we need to wrap it with a JdbcProjection.atLeastOnceAsync.

public Projection<EventEnvelope<ShowEvent>> create(SourceProvider<Offset, EventEnvelope<ShowEvent>> sourceProvider) {
    return JdbcProjection.atLeastOnceAsync(
            PROJECTION_ID,
            sourceProvider,
            () -> new DataSourceJdbcSession(dataSource),
            () -> showViewEventHandler,
            actorSystem)
        .withSaveOffset(saveOffsetAfterEnvelopes, saveOffsetAfterDuration)
        .withRecoveryStrategy(retryAndFail(4, ofSeconds(5))) 
        .withRestartBackoff(ofSeconds(3), ofSeconds(30), 0.1d);
}

Thanks to a very convenient fluent API, we can configure how often we expect to save offsets and what to do in case of an error. JdbcSession created from DataSource is used to open a connection and start a transaction. From the at-least-once delivery perspective, it is not so important, since it is used only for offsets persistence. However, for exactly-once delivery, the same session is passed to the handler to save the read model update together with offset in one transaction.

Now, let's focus on the SourceProvider. Since Akka Projection is a very extensible project, this could be anything that implements this class with supported offset types. In our case, the source will be based on Akka Persistence Query (events by tag).

SourceProvider<Offset, EventEnvelope<ShowEvent>> sourceProvider = 
EventSourcedProvider.eventsByTag(system, JdbcReadJournal.Identifier(), SHOW_EVENT_TAG);

For test purposes, we could inject a source based on a predefined list of events, but let's stay with a more e2e approach and use an events stream generated by the domain code.

Before we launch the projection, we need to also add journal settings in the application.conf file and set up the database schema for offsets. To run ShowViewProjectionTest, don't forget to remove existing docker container and recreate it with a new init sql.

Projection with local Actor

Launching the projection could be done in 3 different ways. The first one is to run it locally. This is an obvious choice for a single instance deployment, but sometimes it could be a desirable solution for a multi-instance cluster setup. A good example could be a projection based on the Kafka topic (with many partitions) as a source. To spread the load across all application nodes, we will launch a projection (Kafka consumer) on all of them.

To simplify the setup, we will type the ActorSystem with SpawnProtocol.

CompletionStage<ActorRef<ProjectionBehavior.Command>> result = AskPattern.ask(system,
    r -> new SpawnProtocol.Spawn<>(ProjectionBehavior.create(projection), projection.projectionId().id(), Props.empty(), r), timeout, system.scheduler());

Yes, I know, this code is pretty hard to understand at first glance. Don't worry, at the end, I will provide a production-ready projection launcher that will hide all these technical details.

Projection with Cluster Singleton

Most of the projections shouldn't be launched on many instances, because they will duplicate the work and consume the same stream of events. Achieving this in a classic multi-instance setup is quite challenging. Fortunately, Akka Cluster provides a lot of additional features to make our lives less miserable. One of them is Cluster Singleton. If we "wrap" our projection with a Cluster Singleton, we will get a guarantee that only a single instance will actually launch this code. In case of this instance failure, another instance will take over the responsibility and relaunch it.

ClusterSingleton.get(system)
    .init(SingletonActor.of(ProjectionBehavior.create(projection), projection.projectionId().id()))

Projection with Sharded Daemon Process

The last option is very interesting but I suppose it deserves a separate blog post. Long story short, to scale a tagged stream of events, we need to artificially increase the number of tags, e.g. ShowEvent_1, ShowEvent_2ShowEvent_7, etc. If we wrap our projection with Sharded Daemon Process, it will evenly distribute projections per tag across available instances. Please refer to the official documentation for more details.

Projection Launcher

If this is still confusing for you, I've created a small utility for launching and gracefully stopping projections.

projectionLauncher.withLocalProjections(projection);
projectionLauncher.runProjections();
projectionLauncher.shutdownProjections();

You can set it up it with any framework of your choice.

Summary

We only gently scratched the read side of the CQRS pattern. This is a really broad topic with many challenges and possible implementations.

With this post, the series is completed. We started with the write side (with Event Sourcing adaptation) and slowly moved to the read side to take full advantage of Event Sourcing and CQRS patterns. As I mentioned many times in my talks or workshops, implementing Event Sourcing without CQRS is quite pointless, because only the combination of two of them will give you the maximum potential to build a highly scalable, elastic, and distributed system.

Another thing worth observing is the modularity of the Akka stack. Akka Projections is based on Akka Persistence Query, which heavily relies on Akka Streams that use Akka Actor under the hood. I really like this approach, where I can use the level of abstraction which is the best for me and I don't need to hack anything if I want to go lower. Not to mention that the whole implementation is reactive, which even nowadays is rarely the case in the Java ecosystem.

Now, take your time and analyze the last part_8 tag. Run the application (or tests) and play with it. In case of any questions, don't hesitate to ask them here or ping me privately.

SMLAcademy_CTA

Blog Comments powered by Disqus.