Contents

Reactive Event Sourcing in Java, Part 7: Read Model

Andrzej Ludwikowski

13 May 2022.6 minutes read

Reactive Event Sourcing in Java, Part 7: Read Model webp image

So far, all 6 posts of the series about implementing Reactive Event Sourcing solutions in Java focus mostly on the write (command service) side. It's time to switch to the read (query service) side of the CQRS pattern.

Let's start with a business need. We would like to find all cinema shows that have some available seats. Currently, there is only a single method to retrieve the Show aggregate by a given id: ShowService#findShowBy(ShowId showId). If you are used to solutions like Spring Data, JPA, or something similar, then implementing such a new feature will be a very straightforward task. At least in a CRUD application where the same model is used for writes and reads. In CQRS, these are completely separated and independent models. As far as it's perfectly fine to implement CQRS without Event Sourcing there is one major problem with this kind of implementation. Usually, it looks like this:

cqrs

  1. Save the state to the DB.
  2. Publish events with state change information. There could be a lot of possible implementations. The most common is the YOLO style - emit the event on the best effort basis. The expected should be some variation of the transactional outbox pattern.
  3. Process the events and update read models.
  4. Handle the query and read from read models.

It looks like an Event Sourcing diagram, don't you think? No, this is only a CQRS implementation and the problem with this approach is that state and events are not connected to each other. We need to be very careful and emit all information about the state changes in the form of events. Otherwise, our read models won't be correct. In other words, we need to support two sources of truth: the state and events. From my experience, this creates a very dangerous possibility that someone, sooner or later, will forget to update the second source of truth - events. That's the problem when events are second-class citizens. Let's change this diagram a bit.

event-sourcing-with-cqrs

  1. Save only events to the database.
  2. Read events to recreate the state.
  3. Process the events and update read models.
  4. Handle the query and read from read models.

A small change on the diagram but a fundamental mind shift. In this case, persisted events are the single source of truth. They are first-class citizens. State is only a projection on events. Very important and very specific one but only a projection. That's the essence of Event Sourcing and my personal definition of this pattern.

Akka Persistence Query

Implementing Event Sourcing without CQRS is a very rare situation because without CQRS, it will be impossible to squeeze maximum potential from the Event Sourcing pattern. How to embrace this in the Akka stack? Thanks to Persistence Query, we will get an easy to use and powerful abstraction - a stream of events. There are several different kinds of streams.

by persistence id

Methods eventsByPersistenceId() and currentEventsByPersistenceId() provide a stream of all (or current) events for a given persistence id. It’s useful when you have a limited number of persistence ids and you want to have fine-grained control over events processing. To start with events stream processing, all we need to do is get a read journal for a given plugin, e.g. JdbcReadJournal, CassandraReadJournal, or for testing - PersistenceTestKitReadJournal.

private PersistenceTestKitReadJournal readJournal = PersistenceQuery.get(system)
            .getReadJournalFor(PersistenceTestKitReadJournal.class, PersistenceTestKitReadJournal.Identifier());

and run the stream.

readJournal.eventsByPersistenceId(persistenceId(showId1).id(), 0, MAX_VALUE)
        .mapAsync(1, this::processEvent)
        .run(system);

The processEvent method is the essence of the projection logic, in our case:

private CompletionStage<Done> processEvent(EventEnvelope eventEnvelope) {
    if (eventEnvelope.event() instanceof ShowEvent showEvent) {
        return switch (showEvent) {
            case ShowCreated showCreated -> repository.save(showCreated.showId(), showCreated.initialShow().seats().size());
            case SeatReserved seatReserved -> repository.decrementAvailability(seatReserved.showId());
            case SeatReservationCancelled seatReservationCancelled -> repository.incrementAvailability(seatReservationCancelled.showId());
        };
    } else {
        throw new IllegalStateException("Unrecognized event type");
    }
}

and the asynchronous repository contract itself:

public interface ShowViewRepository {
    CompletionStage<List<ShowView>> findAvailable();
    CompletionStage<Done> save(ShowId showId, int availableSeats);
    CompletionStage<Done> decrementAvailability(ShowId showId);
    CompletionStage<Done> incrementAvailability(ShowId showId);
}

public record ShowView(String showId, int availableSeats) {}

From my experience, "by persistence id" streams are pretty handy for debugging, but not necessarily for business use cases. As we can see in this shouldGetAvailableShowViewsUsingByPersistenceId test, we need to launch two separate streams to consume events from both aggregates, which in the case of thousands of aggregates will kill our Event Store performance. Each stream needs to constantly query the database for new events.

//when
readJournal.eventsByPersistenceId(persistenceId(showId1).id(), 0, MAX_VALUE)
        .mapAsync(1, this::processEvent)
        .run(system);

readJournal.eventsByPersistenceId(persistenceId(showId2).id(), 0, MAX_VALUE)
        .mapAsync(1, this::processEvent)
        .run(system);

Notice that stream processing is done asynchronously, we need to use Awaitility.await() for the assertions section, don't confuse it with Blocking.await().

Awaitility.await().atMost(10, SECONDS).untilAsserted(() -> {
    List<ShowView> showViews = await(showViewRepository.findAvailable());
    assertThat(showViews).contains(new ShowView(showId2.toString(), 19));
});

by tag

Methods eventsByTag() and currentEventsByTag() provide a stream of all (or current) events from all aggregates with a given tag. The most common scenario is to have a stream of all events for the same aggregate type (without additional events filtration). In this case, we tag all show events with the same tag, e.g. ShowEventTag, by overriding the tagsFor method in the ShowEntity class.

@Override
public Set<String> tagsFor(ShowEvent showEvent) {
    return Set.of(SHOW_EVENT_TAG);
}

In some cases, it's convenient to tag only specific events, e.g. SeatReservationCancelled, and consume a prefiltered stream. The chosen strategy heavily depends on the projection logic and event distributions.

After changes, we need to launch only a single stream, and consume events from existing and future Show aggregates.

readJournal.currentEventsByTag(ShowEntity.SHOW_EVENT_TAG, noOffset())
        .mapAsync(1, this::processEvent)
        .run(system);

A full example can be found in the shouldGetAvailableShowViewsUsingByTag test.

by slices and persistence ids

The recently added methods like eventsBySlices() and currentEventsBySlices() are a very interesting option for evenly distributed events processing across many concurrent projections. At this point, only Akka Persistence R2DBC implements these methods. This plugin is still in the incubation phase, that's why we will not focus on it. However, it looks really impressive and I hope I will have a chance to use it in the near future.

The last methods are persistenceIds() and currentPersistenceIds(), useful if we need a stream of persistenceIds instead of events. They’re useful for specific cases, e.g. to launch some action based on aggregate creation triggers, or when we want to apply something on all aggregates.

Akka Streams

I didn't cover this topic at all because it is so extensive that it would require a completely new series of articles. Basically, Akka Streams is one of the implementations of the Reactive Streams initiative. Honestly speaking, my favorite one because of the large set of existing operators, very rich ecosystem of sources and sink from the Alpakka project, extensibility options (creating custom stages), convenient error handling. If you are working with Apache Kafka, then IMHO Alpakka Kafka is the best library to consume and produce messages from/to Kafka, a really nice piece of software. When it comes to CQRS and Event Sourcing, you could stay with map and mapAsync operators but I encourage you to, at least, get familiar with the rest of them.

Projections

Read models like ShowView are not the only possible outcome of events stream processing. Such streams can be used for various tasks and side effects. This could be sending an email, WebSocket message, mobile push message, etc. We can consume streams from different aggregate types and implement a Saga pattern to achieve consistency between aggregates (or services).

Sometimes we want to share our events between many microservices and for that, our projection can stream them to a message bus:

event-sourcing-with-message-bus

This way, our internal Event Store is completely separated from all "readers". Before we share events, most likely we should transform them into a "public schema" to have a very loose coupling between the services.

Summary

A projection based on the events stream is a very powerful technique. If you have never used it before, it will take some time to understand its full potential. After that mind shift, you will start designing your systems in a completely different way.

Our implementation is far from done. The plan for the next part is to cover:

  • projection management (start, stop, scaling),
  • error handling,
  • offset tracking and delivery guarantees.

Check out part_7 and, as usual, if you don't want to miss any lesson, subscribe to SoftwareMill Academy.

Go to part 8 of the series >>

SMLAcademy_CTA

Blog Comments powered by Disqus.