Implementing event sourcing using a relational database
Event sourcing is a pattern in which a stream of events constitutes the primary source of truth in a system. These events capture facts — state changes that occur to the entities and aggregates in our system — and hence are immutable. On the technical level, event sourcing can be implemented using dedicated storage systems, as well as general-purpose "NoSQL" and SQL databases.
If you're interested in the origins of event sourcing, the articles by Greg Young on event sourcing and CQRS are a great place to start. Event sourcing brings several changes when it comes to designing and modeling the system; there's a number of books and articles on the subject. Here, however, we'll focus on the technical side, looking at how an event stream can be persistently stored.
There's a couple of reasons why you might consider using event sourcing, among others:
- performance
- data modeling
- auditing
If you're after performance, you'll probably want to go beyond what a relational storage model might offer. If so, take a look at the actor model and how it's a natural fit for event sourcing. You might still use a relational database when using actors, as one of the possibilites of an event log implementation.
However, if the requirements of your system aren't centered around very high performance, but rather on business process modeling, keeping an audit log, or ensuring transparency and accountability of operations, using event sourcing based on a relational database can make a lot of sense.
This article is an extension of the "transactional event sourcing" mechanism I've written about before and presents step-by-step how to implement event sourcing using a relational database as well as event sourcing best practices. For a specific use case, you might need only a subset (or superset!) of the features described here.
Bare necessities
Implementing an event store using a relational database has of course been described before. A good starting point is this article from 2011 on building an event storage, as well as another one which focuses on PostgreSQL. We'll repeat some of the steps from these articles and expand on them, but you might refer to the original as well.
As we want to store a stream of events, we'll need a table with exactly that. In the examples, we'll be using PostgreSQL syntax and data types. Each event needs:
- an id, the primary key for the event,
- the id of the stream, to which the event belongs. Typically there are multiple independent event streams in a system. Each stream might correspond to an entity/aggregate, such as an individual user, product, or whatever else are top-level concepts in your system. We'll need this to query for events. This column is often called
aggregate_id
,entity_id
, orstream_id
, which will be used here. Streams might have different types — there might be user event streams, product event streams etc, - the stream version number, so that we know how to sort events. Version numbers form a gapless sequence. Each new event for a given stream increments the version number, and there can be only one event with a given version in a given stream,
- event data, here stored as json. Each event has different data, so we're keeping this unstructured. Other formats are possible as well.
CREATE TABLE events
(
id SERIAL PRIMARY KEY,
stream_id BIGINT NOT NULL,
version BIGINT NOT NULL,
data JSONB NOT NULL,
UNIQUE (stream_id, version)
);
Here’s a couple of example events that use the above structure:
(id=10, stream_id=345, version=1, data={"type": "UserCreated", …}),
(id=11, stream_id=82, version=1, data={"type": "TaskAssigned", …}),
(id=13, stream_id=345, version=2, data={"type": "UserUpdated", …}),
(id=15, stream_id=9120, version=1, data={"type": "TaskAssigned", …})
Using this simple structure, we can already implement the two basic operations on an event store. First, looking up all events for a stream with the given :stream_id
:
SELECT id, stream_id, version, data FROM events
WHERE stream_id = :stream_id ORDER BY version ASC;
This lets us reconstruct the state of any stream. Given a function f: (State, Event) => State
, and an initial "empty" state, the current state can be obtained by repeatedly applying f
from the first to the last event, using richer and richer State
values; this is also known as a fold. Thanks to the version number, we know how to order events.
The second basic operation lets us insert a new event. Here, we need to first calculate the current version number, so that we can insert an event with the next version. This can be done by iterating over the events found in the first step, and finding the maximum version
value. Let's assume this is available as current
. Inserting a new event is then as simple as:
INSERT INTO events(stream_id, version, data)
VALUES (:stream_id, :current+1, :data);
Note that we cannot simply obtain the maximum version number from the database, before inserting a new event. Between looking up the events for a stream, and inserting our event, another event might have been inserted. We'd then insert an event calculated from stale data (usually the current stream state, toghether with external input, is used to create new events). However, if you are using the
serializable
transaction isolation level, or if your db implementssnapshot isolation
, the following will also work (when everything is executed in a single transaction):
SELECT id, stream_id, version, data FROM events
WHERE stream_id = :stream_id ORDER BY version ASC;
-- process incoming command, create events
current := SELECT MAX(version) FROM events
WHERE stream_id = :stream_id;
INSERT INTO events(stream_id, version, data)
VALUES (:stream_id, :current+1, :data);
For example, in PostgreSQL, the
repeatable read
transaction isolation level is implemented usingsnapshot isolation
. In general, however, this isolation level is too weak, as it permits phantom reads, which are the problem here.
And that's it! We now have a basic event-sourced system. Whenever a new command is received, we can look up the current state of any stream (entity/aggregate) by reading its events and rebuilding its state. Then, we can decide which new events to persist, that is, what state modifications should happen to the state in response to the command.
We can already start reaping some benefits from event sourcing: we have a detailed history of changes for each stream. When a stream corresponds to e.g. a user, this translates to an audit trail of operations on that user's entity. Moreover, we are flexible in how we reconstruct the state basing on the events; by changing the function f
, without any data migrations, we can easily aggregate the events into a different State
in a different way, accomodating to new requirements that our system needs to meet. However, there are downsides as well: we can't query our data as we used to in a typical CRUD setup. We'll try to fix this later.
Single-stream concurrency
What happens if multiple clients try to insert a new event for a given stream? This will fail due to the unique constraint on the (stream_id, version)
pair. By having this constraint, we ensure that for a given stream, events will be inserted sequentially; even more than that: successfully committed transactions are guaranteed to have observed the effects of the previous transactions, for that stream. We need to make sure, however, that when inserting, we always increment the version number by 1
and not more.
If two commands for a stream are processed concurrently, read the same events and hence reconstruct the same state, and basing on this state create new events — one of the transactions will fail, as there will be a conflict on the version
column. Such a transaction can be retried, this time taking into account the newly inserted event.
A typical flow, processing a request from a user might be:
- receive a command from the user. This should include (or somehow determine) the stream id, which corresponds to the command
- start a database transaction
- read all events for the given stream; this can also be done in a separate transaction, before the one that processes the command
- using the
f
function, reconstruct the current state for the stream - process the command by running the command-specific business logic, using the current state. This should yield new events to persist, as well as a result to send back to the user
- persist the new events
- commit the transaction
- send the result back to the user
Here’s one of the places where we leverage the fact that we have a transactional database at our disposal, which gives us strong consistency guarantees.
As a side note, we are using the
version
column to guarantee per-stream sequential processing. In the actor model that we've mentioned in the introduction, the same is guaranteed by the actor model itself, and the fact that an actor processes messages from its mailbox one by one. In that implementation, a single stream is handled by a single actor.
Consistent single-stream-type projections
Not being able to query the data using SQL, as we are used to, might make processing commands trickier. Implementing a simple requirement that a username needs to be unique becomes a challenge! Let’s try to recover some of the querying/constraint functionalities that relational databases offer.
That’s where we can leverage projections, also known as read models. A projection/read model contains data derived from the event stream. As the event stream is the primary source of truth and contains full information, a projection is a denormalized view of that data. Projections are created to make querying, fast lookup, search, etc. possible.
Projections can be created synchronously, in which case they are updated each time a new event for a given stream arrives. Or they can be created asynchronously: in this case, there's a separate background process that keeps them up-to-date.
Implementing such a background update process might get quite complex and brings additional challenges connected due to eventual consistency. We'll deal with asynchronous event processing later; for now, however, what might be much simpler is keeping synchronous, or consistent projections. That is, each time a new event is inserted, we update the projection in the same transaction.
As an example, if one of the types of the stream in our system corresponds to a user, we might have the following structure in our event sourcing database:
CREATE TABLE events (...) -- as before
CREATE TABLE usernames (
id SERIAL PRIMARY KEY,
username TEXT UNIQUE
);
Then, when processing an incoming command for a user, we should:
- persist new events
- update the
usernames
table with the new state, if the username changed or if the user is new
Again, this needs to be done in a single transaction for consistency! Otherwise, we can get a partially updated usernames
table and our data will quickly become a mess.
We immediately gain two things: the ability to query for user data, which is included in the projection, just as in a good, old CRUD system. We can also leverage the projection table to implement uniqueness constraints or other more complex validation rules that involve taking into account data from multiple streams (e.g. multiple users), not just a single user stream.
It's also possible for the projection state to be persisted in multiple tables if we are dealing with dictionary values, parent-child relations, etc. A projection of a top-level aggregate might translate to multiple rows in multiple tables. However, we have to be cautious if we want to introduce relations to other aggregates, reconstructed from streams of other types. We'll talk more about this later.
For simplicity, in the examples, we're mostly focusing on dealing with events and event streams of a single application-level type. In practice, the events table would typically contain data for many different stream types: users, products, etc.; each type would have dedicated projections.
Gain a competitive edge with on-demand expert engineering. We assist forward-thinking businesses in transforming through the right technology. Explore the offer >>
Consistent snapshots
Snapshots are a special type of a projection, which store the entire reconstructed state of each stream (of a given type). Rebuilding the stream's state each time a command for that stream arrives is flexible and simple conceptually but might eventually become a performance bottleneck. A snapshot can also be used to implement constraints, or to enhance querying possibilities.
Before introducing snapshots for performance reasons, always verify if reconstructing the state from events is indeed the cause of the problems. Factors to take into account include how many events per stream there are, how many streams there are in the system, as well as whether the events occur “close” to one another — which might make querying more efficient — or not.
Instead of a snapshot, it might be easier to maintain a consistent projection, containing partial state data, especially as the events evolve. This is also the topic of the next section on migrations.
As with consistent projections, if we want to maintain a consistent snapshot, we have to update it in the same transaction, as inserting the event. One thing that becomes easier is reading the current state: we simply read it from a dedicated table. However, the current version number (that is, the maximum version
value of the events for the stream) needs to be stored with the stream's state, so that we can insert new events properly. Here’s how this might look in pseudocode:
-- lookup current snapshot:
(user, current_version) := SELECT id, ..., version FROM users
WHERE id = :user_id
-- process incoming command, create events
-- persist events:
INSERT INTO events(stream_id, version, data)
VALUES (:stream_id, :current_version+1, :data);
-- update the snapshot / projection:
UPDATE users SET ..., version = :current_version+1 WHERE id = :user_id
As with projections, snapshots can be created asynchronously. Then, when reconstructing the state of a stream, we only need to read the snapshot (which must include the stream id and the version number, up to which events are applied), and apply the missing events. This removes the problems with eventual consistency (as we can easily patch the snapshot so that it’s fully consistent); still, other difficulties arising from asynchronous event listeners apply (see below).
As mentioned in the first section, if you use the
serializable
transaction isolation level, or if your database implements snapshot isolation, you can omit storing the current version number in the snapshot, and instead calculate it before inserting an event usingSELECT MAX(version) FROM events WHERE stream_id = :stream_id;
.
Migrating projections
With the introduction of projections and snapshots, we have lost some flexibility when it comes to reconstructing the state. As the partial or full state is now persisted, migrationing to a new state structure is not that easy, as it involves the storage, not only the runtime.
The easy way out is if we can "stop-the-world", that is make our system unavailable for some time. When we need to change the structure of the state (for example, a new requirement might be that the history of email addresses associated with a user needs to be quickly available and stored in the snapshot), we can then simply drop the "old" users
table and create a new one.
Once this is done, we should run a process that iterates over all events, for all streams (but still maintaining the per-stream order imposed by version
!). This process can reuse the same logic that is called after a new event is persisted to update the projection/snapshot state in the database; as an optimization, we can accumulate the complete state in-memory, and persist it once for each stream.
Note that to stream all events so that the per-stream order defined by
version
is maintained, it’s enough to sort by theid
column, which is a PostgreSQL sequence (SERIAL
column). Even though this sequence can contain gaps (which is discussed in subsequent sections), for a single stream later events will have largerid
s, thanks to the serialization provided by theversion
column.
Things become more complicated when we need to perform a rolling upgrade. In essence, we need to create a new projection (or updated snapshot) alongside the old one. Then, this new state is gradually filled with data. Once we have both old and new projections up-to-date, we can remove the old one and start using the new one exclusively.
Here’s how the steps needed to migrate a snapshot projection to a new structure might look:
- create the new
users2
table next to the old one; theusers
table is still used to process incoming commands - as part of event handling, when a new event is inserted, both the old
users
table and the newusers2
table need to be updated; however,users2
only if an entry for a given stream already exists - if a new stream is created, we store the initial (empty) state both in
users
andusers2
- finally, for each stream (user) in the system, we calculate the new state from all events of the stream, and save the result to
users2
. In each transaction which updates the state for a single stream, we additionally insert a no-op event for that stream with an incremented version number. This serializes the update process with any commands that might produce events for this stream concurrently. By inserting an event with the next version number, we ensure that we won’t miss any events during the snapshot reconstruction. - once this is done, we can remove any logic that updated or used
users
and remove the old snapshot state
Note that any rolling data migration is tricky and events don't bring much more complexity here. You'd need to run a similar procedure in a CRUD system as well.
Adding a projection
Introducing a new projection is in essence a simplified version of the above process. It can be done both using the stop-the-world technique, where we don’t have to worry about events that might be added concurrently to the migration process. Or, we can perform a “rolling upgrade”, gradually filling in the data for the new projection, as the system works and processes incoming requests.
Migrating events
There's also another type of migration you might want to perform — when the events themselves change. The best option is to use a serialization format that supports backwards compatibility. But this covers only simple structural changes, such as adding a new field, for which a default value can be used.
If the event has changed significantly, it's best to introduce a new event type (e.g UserCreated2
, TaskAssiged3
, etc., or even better, using a completely new event name).
This means that the f
function which reconstructs state from events will need to handle both the old event type, and the new one. Hence essentially, code to handle the old event types needs to remain forever. You might also attempt updating the events, but as events should be immutable, they are best left untouched. You can find out more about versioning in an event-sourced system from Greg Young's book.
Using triggers
So far, we have silently assumed that the snapshot is being created by application code. However, it is also possible to use triggers to update the read model in the same transaction — whenever a new event is inserted. While this can save us a DB round trip, such an approach has time and again proved to be harder to maintain; separating application code from the database is usually more flexible and easier to evolve. Still, it's not a fixed rule and in some circumstances triggers might work just fine.
Cross-stream-type projections
Previously, we've covered consistent projections, which provide a view of data for a single stream type. However, usually we can come up with a number of read models, which capture interesting aspects of our data — performing aggregations and combining information from multiple streams and stream types. As before, we can choose between consistent read models and ones that are updated asynchronously — then we are dealing with eventual consistency.
Consistent cross-stream-type projections
Alongside introducing event sourcing to your domain, try to shift your design to a more asynchronous model. While single-stream-type consistent projections are fairly easy to implement, when multiple stream types are involved, things get complex. Chances are high, you don’t need this complexity and using an eventually consistent, asynchronous model will work better and be more flexible. Hence if you’re in a need of a consistent, cross-stream-type projection, maybe your aggregate boundaries are incorrect? Can you model the same process, using eventually consistent projections? Consistency is great but expensive.
Why are cross-stream-type consistent projections tricky? Can't we just transactionally update any other table, just as we updated usernames
? Yes, we can, and this will work, but problems might surface when we replay the stream of events and reconstruct the projections. It might turn out that the projections contain different data after replay than before!
How come? For individual streams, the version
numbers define a strict order of events, without gaps. For all events, viewed as a whole, we don't have this luxury. Using our current model, we don't have a way to tell which event came first. Hence, if a projection combines data from multiple streams or stream types, and if the order in which events are applied to the projection matters (that is, the projection function isn't commutative), we can get different results if the order in which events were originally processed is different from the order in which they are streamed.
For projections/read models which are not snapshots, we are still dealing with functions
p: (ProjectionState, Event) => ProjectionState
, and it's theProjectionState
that we want to consistently store. However, here theEvent
might come from multiple event streams and stream types. As noted above, such projections are easy to maintain ifp
is commutative, that is if for any eventsp(p(s, e1), e2) = p(p(s, e2), e1)
.
As a first pass, we could try implementing a global, gapless sequence which would impose a total ordering on all events, e.g. as described here. But that's equivalent to taking a global table lock on events
at the beginning of the transaction and effectively limits concurrency to 1 (i.e., no concurrency). We'll discard this solution then.
Instead, we'll aim at sequencing the events in a "good enough" way to capture the ordering that's important. This can be done by adding a sequence number column to the event
s table. Luckily, in our original design, we already have the id
column, which uses PostgreSQL's built-in sequencing, so we can simply use that value:
CREATE TABLE events
(
id SERIAL PRIMARY KEY,
stream_id BIGINT NOT NULL,
version BIGINT NOT NULL,
data JSONB NOT NULL,
UNIQUE (stream_id, version)
);
Why not use a timestamp to do the ordering? Timestamps and wall-clock time are usually unreliable and might differ from system to system, even if only by a couple of milliseconds. System clocks can also go backward (e.g. when adjusted by NTP), so they are useful only as a rough approximation of order, but not as the primary source of ordering.
There are two things to keep in mind about the id
numbering, though:
- there might be gaps due to rolled back transactions; new sequence numbers are acquired as data is inserted within a transaction but aren't put back to the pool if the transaction fails, for performance reasons
- transactions might be committed out-of-order, i.e. a slower transaction might insert an event with a lower sequence number; another transaction might insert another event with the next sequence number and commit; and only later the first transaction might commit
Because of this, reconstructing consistent projections using the order defined by sequence numbers can still yield different results than what you'd see on a "live" system, or even worse, miss some events altogether.
It might seem that using the
serializable
transaction isolation level might help here. Unfortunately, this only specifies that the transactions are committed according to some serial ordering; not that it maintains the same ordering as in a sequence. In other words, the commit order might still be different from the sequence ordering.
Is there any hope left? Unfortunately, we'll have to resort to locking. Postgres includes a feature called advisory locks, which are controlled by the application and leverage the fact that the database is a central component in the system, which can be used for synchronization.
Once we receive a command, but before any events are persisted (and their id
s generated), we should obtain advisory locks for any projections that we'll update. The exact scope of the locks depends on the specific use case: we need to ensure that other transactions that will want to update the same region of the projection (same or overlapping rows) will have to acquire the same advisory locks (and in the same order, to avoid deadlocks). We can of course acquire a coarse-grained lock on the whole table, but this will limit concurrency. Maybe there is some stream id, which is the primary identifier for the projection, and we can use that? For example, to lock a projection keyed by user id, we can issue the following:
SELECT pg_advisory_xact_lock(:user_id)
With such locking in place, we can be sure that the order defined by the sequence numbers is the same order in which the events were originally processed. We can then use this order to replay events and reconstruct projections (using similar strategies as discussed with snapshots).
In other words, through the version
column for events within an individual stream, and through the id
column and advisory locks for related events from multiple streams (where "related" is induced by the fact that events are used in the same consistent projection/read model), we guarantee that:
The single-stream version ordering and related-stream sequence number (id) ordering corresponds to commit order.
Note that the above construction applies only when your projection function is non-commutative or when the fact that reconstructing the projection might yield different results isn't a problem.
Eventually consistent projections
You might have your hopes up that after all these problems with consistent projections, eventually consistent ones will be easier to create and maintain!
Unfortunately, the event ordering problems remain. Unless we stop the system, ensuring that all transactions have commited, we can't really tell that no more events with sequence numbers smaller than the biggest one seen so far won't be yet committed. We can use heuristics — delaying processing of the event stream, but that's still not a guarantee.
Because of this, we might still want to use advisory locks, to ensure that sequence numbers correspond to commit order for any events that are somehow related to each other. We don't really need a total order of events — just for events that are "related" and might be used together in projections. What this really means is vague and use case specific, and due to this, the construction might be brittle and a source of bugs. Care must be taken when defining such projections.
That's a tradeoff that we have to live with — we are using a general-purpose database and this imposes some restrictions. The situation is different if we use storage that has streaming built-in, such as EventStore, or even Kafka. There, the stream/log is a central component and events are appended to it one by one, clearly defining a sequence. However, these systems don't give us the transactional and relational benefits, which we can use here. Tradeoffs, tradeoffs!
Kafka doesn't define a single totally-ordered stream of events/messages, but guarantees order only within a partition. That way, the storage system can scale and accommodate vast amounts of data. However, the number of partitions is typically fixed and much smaller than the number of streams in an event-sourced system (for example, you might have 100 partitions, but 1000000 users). When creating projections, this also needs to be taken into account.
Eventually consistent read models/projections are in fact a special case of event listeners, which are discussed next. Let's proceed then!
Event listeners
Almost every system needs to interact with the outside world. So far, we've only worked with a database. What if, as part of processing an event, we need to make an API call, send an email, or create an (eventually consistent) projection?
We have two choices:
- run the side effect within the transaction. If the side effect fails (e.g. an HTTP API call throws a connection exception), the transaction will fail as well, and whatever data was sent as part of the side effect might be invalid if it included whatever was supposed to be persisted in the transaction. However, we might also obtain information necessary to complete the DB transaction and create the events.
- run the side effect after the transaction. Here we might settle for a simple after-transaction-completion callback, however, this isn't guaranteed to be run, if the application crashes in-between. If a guarantee is required, we need another mechanism.
Let's focus on the second scenario, as it's where more work is needed. In order to guarantee that an asynchronous event listener is run for an event (unless, of course, the database catastrophically crashes and loses data), we need to put the event on some sort of queue. This needs to be part of the original transaction!
We have again two choices: run a distributed transaction, which spans our PostgreSQL database and some other queueing system; or create a queue inside Postgres, so that enqueueing amounts to inserting data to a table. While the performance of such a solution isn't a match for dedicated queueing systems, as we are working with a general-purpose database — not a specialized streaming or queueing system, it might be just good enough.
We could also enqueue events non-transactionally in an external system before the transaction commits, thus ensuring if the commit is successful, data will be enqueued. However, this has its own subtleties: an event might be dequeued before the transaction commits. Then, we have to postpone processing. An event might also be in the queue, for which a transaction failed — hence after a timeout, such an event must be discarded. Finally, reasoning about ordering is even harder in such a scenario.
Using a PostgreSQL queue
Here's a very basic structure of a to-be-processed event queue:
CREATE TABLE event_queue
(
id SERIAL PRIMARY KEY,
event_id INT NOT NULL REFERENCES events(id)
);
After an event is persisted, we should insert the event id into that queue — in the same transaction. As the sequence-number ordering follows commit order for same-stream events or related events (synchronised using advisory locks), the same holds for entries in this queue: order will be maintained for same-stream or related events. Other events, however, might be stored out-of-order when we sort by event_queue.id
.
The event processor then needs to run (again using pseudo-code):
(id, event_id) := SELECT * FROM event_queue ORDER BY id ASC LIMIT 1
-- process the event
DELETE FROM event_queue WHERE id = :id
To avoid active polling of the event_queue
table, we can use the listen/notify mechanism of PostgreSQL. Or we might just poll every 1 second, for example, if such delays are acceptable. Moreover, we can quite easily implement batching, that is getting a batch of event ids to process in one go.
To maintain order, the above process should run on a single thread in the system. If keeping order of event processing isn't important, this might be run in multiple copies concurrently.
Processing an event might be an arbitrary action. Note that if processing fails, the whole transaction will fail as well, and the event will be reprocessed on the next loop. In other words, the above implements at least once processing. When you integrate two distinct systems, you always get either at least once or at most once processing; "exactly-once" is impossible to implement.
The processing might amount to enqueueing the event on yet another event queue, e.g. RabbitMQ or Kafka. What do we gain by this extra level of indirection? First, we know that the event has been processed successfully, and that its data is committed to the database. We might still get duplicates on the target queue (if the application running the event listener fails), but we won't ever get event ids which are not yet committed or for which a transaction failed.
You might have several queues, for event listeners performing different tasks; some might implement strict ordering by retrying the processing of a single event until it is successful; others might delay processing of events that initially failed. As enqueueing is as simple as an insert, we might enqueue a single event to a couple of "queues", or use another queue, as described above, to implement the fan-out.
How do others deal with the same problem? In case of Akka persistence, where events can be stored in multiple storage backends such as Cassandra or a relational database, a heuristic is used. Akka persistence uses the database only as a log for events, without leveraging ACID capabilities. If a gap in the
id
s sequence is found, Akka queries the database for up to 10 seconds, checking if the gap hasn’t been filled. If this doesn’t, the gap is assumed to be “genuine”. As all of the transactions are short-lived and fail only if there’s a connection failure (there are no constraints, which might fail), this rarely happens.
Introducing a new event listener or projection
What about event listeners that need to be initialized with a stream of past events? How to build new read models which take into account the whole history?
If we can stop the system (making it unavailable) for the time the new event listener is initialized or while the projection is initially built, we can simply query for all events ordered by the event sequence number, id
(with the usual ordering guarantees).
However, if we'd like to perform an on-line rollout, to ensure that we don't miss any events, a more complicated procedure is needed. For example:
- add an event listener, which consumes the
event_queue
table but suspends processing by storing all event ids in an in-memory, ordered buffer - initialize the new event listener/projection, by querying for all events ordered by
id
- resume the event listener so that it consumes all buffered events and then processes new ones as they come in. This switch can be done atomically, as we are dealing with application code that runs in-memory in a single thread
Note that in the above scenarios some of the events might be processed twice, so if this is a problem, in-memory deduplication will be needed.
Using the WAL
Another way of obtaining an event stream and feeding it into event listeners is using the so-called Change Data Capture (CDC). In PostgreSQL, this amounts to listening to entries that are written to the WAL (write-ahead log). Each committed transaction ends up in this log in commit order.
Working directly with the WAL might be difficult, but there are tools, such as Debezium, which make it more accessible. Note, however, that not every PostgreSQL installation will allow you to set the required wal_level=logical
option, so this might not always be available.
The output obtained from Debezium (which itself uses Kafka) can be directed to another queue or persistent log, such as Kafka, Pulsar, Artemis, RabbitMQ or another database, thus creating an event stream, which can be consumed by other parties.
Note that when initializing a new event listener with historical data, whatever the source of the event ids, we will always need to take into account the fact that new events might have been processed while the listener was initialized. Some systems might make it easier, e.g. by exposing a high-water mark, or log position, which uniquely defines how much of the stream has been consumed. Such a marker is not available on PostgreSQL, hence the need for in-memory buffering described in the previous section.
Other metadata
In our model, we've been storing only the bare minimum to make things work. In practice, you can associate and persist much more event meta-data. This can include:
- event creation wall-clock time
- event type (e.g. name of the class to which the event should be deserialized)
- stream type (might correspond to the type of the entity/aggregate, e.g.
user
/product
) - principal id: currently logged in user who triggered the event
- transaction id: grouping events persisted in a single transaction
Is event sourcing really that hard? Event Sourcing vs CRUD
It might seem that while simple conceptually, event sourcing brings a lot of complexity compared to a "traditional" CRUD approach. But! First of all, we don't do all of this for nothing: we obtain a full audit log and we don't lose any information. In a CRUD application, on the other hand, any time you do an UPDATE
or DELETE
, information dies forever. Second, chances are high you will only need a fraction of what’s described above — consistent single-stream-type projections and event listeners usually are enough to meet the requirements.
A lot of the complexity above is inherent: whenever you will try to capture an audit log (using any method) and impose an ordering on events from disparate event streams, you will encounter problems. Keeping track of time and causality is a hard topic for any database — and here it's no exception. Finally, rolling upgrades and data migrations are always a challenge, however you model your data.
There is also some accidental complexity coming from the fact that we are using a general-purpose database for storing streams of events. We don't have an "ordered log" abstraction at our disposal, which is at the core of dedicated event sourcing storage systems. However, we do have the ACID guarantees, transactions, transaction isolation levels, SQL queries with joins, and all of the engineering that went towards making PostgreSQL a solid relational database. Hence, after all, the effort we've put in might not be for nothing.
Ready to dive deep into Event Sourcing in Java? Subscribe to SoftwareMill Academy and get a series of in-depth tutorials.
Thanks to Andrzej Ludwikowski and Kaja Polachowska for their feedback on this article.
Might interest you:
Event sourcing on blockchain with Ethereum, TypeScript and React