Contents

Reactive Event Sourcing in Java, Part 5: Event Store

Choosing an event store database requires a lot of research. This could be a topic for a separate blog series (maybe in the future), so my plan is to address it from a different angle. Do not try to find the best solution (because such does not exist IMHO). Instead, try to find an optimal compromise and be ready for changes.

Using Akka Persistence has this huge advantage that persistence is only a plugin. We can use this library with any event store that is able to fulfil the contract of AsyncWriteJournal. We need to implement only 4 methods to work with our custom event store solution:

@Override
public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(Iterable<AtomicWrite> iterable) {...}

@Override
public Future<Void> doAsyncDeleteMessagesTo(String s, long l) {...}

@Override
public Future<Void> doAsyncReplayMessages(String s, long l, long l1, long l2, Consumer<PersistentRepr> consumer) {...}

@Override
public Future<Long> doAsyncReadHighestSequenceNr(String s, long l) {...}

Similar situation with SnapshotStore and ReadJournal for read model projections.

I'm not saying that implementing a custom Akka Persistence plugin is a trivial task. It will be quite a challenge but at least it is possible (here is my colleague’s example). We are not limited in the future by choices that are made today with the current context. Also, we can postpone some difficult decisions like using a distributed event store (e.g. Cassandra) vs a single host (e.g. PostgreSQL, MariaDB).

For starters, there are available plugins like:

And more from the community.

JDBC

A classic relational database will be fine for a lot of projects. I suppose the majority of developers have some experience with this kind of database. To enable the Akka Persistence JDBC plugin for our event sourced behaviors, all you have to do is add proper dependencies and some configuration:

akka{
  persistence.journal.plugin = "jdbc-journal"
}

jdbc-journal {
  slick = {
    profile = "slick.jdbc.PostgresProfile$"
    db {
      host = "localhost"
      port = "5432"
      dbName = "postgres"
      url = "jdbc:postgresql://"${slick-akka.db.host}":"${slick-akka.db.port}"/"${slick-akka.db.dbName}"?reWriteBatchedInserts=true"
      user = "admin"
      password = "admin"
      driver = "org.postgresql.Driver"
    }
  }
}

After we run docker-compose -f development/docker-compose-jdbc.yml up, we can launch our application, make some requests, and validate that our relational event store contains some events in the event_journal table. Don't be surprised with the event_payload column since our events are persisted in a form of binary blobs. For prototyping, we are using a Java serialization mechanism, which is of course a very bad idea for production deployment and should be changed. In this part, we will not focus on the serialization itself (it requires a separate post), but if you want to read more about possible options, just read my post about serialization strategies for Event Sourcing.

What about the reactiveness of such a solution? The JDBC driver for communication with the event store is a blocking one. True, at the moment of writing this post, the R2DBC (reactive JDBC) driver is not yet supported by Akka Persistence. I hope this will change in the future. Fortunately, the JDBC driver is wrapped by a Slick library, which gives us a pretty nice and manageable (a separate thread pool) reactive facade. Don't forget that the actor is not blocked by the communication with the underlying event store.

Cassandra

At some level, a single host database will not be enough. Vertical scaling will hit its limits and to handle more load, we will need to switch to something distributed. Although, not all distributed databases will improve the throughput. With a leader-follower architecture (e.g. MongoDB), we will still be limited by a single host (the leader). We should focus on leaderless solutions like Apache Cassandra. From the write perspective, Cassandra is a perfect match for the event store, we will get:

  • partitioning (data is spread across all nodes in the cluster),
  • replication (in case of a node failure, we can use a replica),
  • optimized for writes (the throughput is really great),
  • near-linear horizontal scaling.

If you ever worked with Cassandra, you should know that schema modelling is the key for this database. Akka Persistence Cassandra plugin is aware of this and the table schema for storing events works like a charm. Partitions are not too small, but also not too big (actually, you can configure how many events should go to a single partition). With such a schema, reading events for a given aggregate will be very efficient (important for a recovery phase). The reactive driver for the communication will suit our design very well. Any problems? Of course :) Maintaining a distributed event store on production is something completely different from a single host database. It will require a lot of DevOps power and knowledge (or money in case of hosted solution). Also, be warned that reading all events from all aggregates might introduce some challenges. I suppose that at some level, we simply don't have a choice. If we want to scale further, we need to face those challenges and try to figure out how to approach them.

The good news is that we don't have to change anything in the implementation to use Cassandra. As before, we need only some dependecies and a configuration:

akka {
  persistence.journal.plugin = "akka.persistence.cassandra.journal"
}
akka.persistence.cassandra.journal {
  keyspace-autocreate = true //not recommended for production
  tables-autocreate = true //not recommended for production
}

To run our application with Cassandra plugin from the part_5 tag, we need to add the -Dconfig.resource=/application-cassandra.conf VM option in Intellij launch configuration or run it directly from the command line:

./mvnw spring-boot:run -Dspring-boot.run.jvmArguments="--enable-preview -Dconfig.resource=/application-cassandra.conf"

Summary

I hope that you feel this unquestionable advantage of Akka Persistence that we can prototype with an in-memory event store, then switch to a JDBC event store on production, and once you hit some significant load, switch to a distributed event store like Apache Cassandra. For me, this is a game-changer. I could postpone very difficult decisions to the last moment. I should also mention that we can configure the event store per aggregate type. For long-lived aggregates that will produce thousands of events, like exchange market, IoT, I could use Apache Cassandra, but for aggregates with a small number of events, I could stay with a JDBC plugin. It's worth to notice that you don't have to implement database access, which in case of a custom implementation like here requires to handle a lot of edge cases.

For sure there will be some specific traps and gotchas while working with a completely different database. Events migration will require some work and it will be a process. The schema on the relational event store is completely different than on Apache Cassandra (or any other distributed database), but at least it is possible. We will not be blocked in the future and the mythical switching from one database to another can be a real deal. It's possible because with Event Sourcing, we don't have to care about the relation between data so much. We need to store a stream of events and read the stream of events, which can be emulated on many database engines. With Actor Model, we can use a (Distributed) Single Writer Principle and work safely with databases that don't give us ACID guarantees (personally, I wouldn't use Apache Cassandra for Event Sourcing without a Single Writer mechanism).

If you would like to use Akka for CRUD functionalities, there is also such an option for that with a Durable State persistence.

As usual - check out the part_5 tag and play with the application or go directly to the next part. If you don't want to miss any new materials, subscribe to SoftwareMill Academy.

Blog Comments powered by Disqus.