Introduction
In this article, we will focus on implementing an Event-Sourcing architecture using NestJS and EventStoreDB for a restaurant table reservation application.
This study will be divided into two parts:
- In the first part, we will solely focus on the data writing model. We will explore how to write data into EventStoreDB and how to read it.
- In the second part, we will concentrate on the data reading model. We will learn how to read data from EventStoreDB and how to project it into a read database.
CQRS and Event-Sourcing: Architectural Patterns for Better Separation of Concerns
CQRS (Command and Query Responsibility Segregation)
CQRS is a powerful architectural pattern aimed at separating read operations (queries) from write operations (commands) within an application. This approach brings several significant advantages to the system's design and performance. It often integrates seamlessly with the Event-Sourcing architecture.
Event-Sourcing
Event-Sourcing architecture is a natural complement to CQRS. It involves storing business events in a dedicated storage system called "EventStore." The fundamental idea is to retain a record of all significant events that occur within the application.
The combined use of CQRS and Event-Sourcing offers a robust, modular, and high-performance architecture, enabling better separation of concerns, more efficient handling of concurrency issues, and enhanced traceability of changes within the application. These patterns have become favored choices for many modern applications, providing significant benefits in terms of maintenance, flexibility, and scalability.
Project Overview
In our example project, we will create an API that manages table reservations in a restaurant. The restaurant manager will be able to add or remove tables, and customers will be able to make reservations based on a list of defined time slots.
Project Structure
The project is composed of two services: a booker
service for managing table reservations and a table-manager
service for managing restaurant tables.
The two services will communicate with each other using events, which will be stored in an EventStore
.
Two libraries will be created in a shared
folder to handle events and communication between the services. These libraries will be shared within a pnpm workspace.
Here is the project structure:
services
├── booker
├── table-manager
shared
├── event-sourcing
├── events
Event-Sourcing Module
To streamline communication between the services, we will create an EventSourcing
module responsible for managing the communication between services.
This module exports an EventStoreService
that enables the following functionalities:
- Publishing an event
- Reading events from a stream
- Subscribing to a stream
This service primarily relies on the EventStoreDB NodeJS client, which is injected into the service.
@Injectable()
export class EventStoreDbService implements EventStoreService {
constructor(
@Inject(EVENT_STORE_DB_CLIENT)
private readonly client: EventStoreDBClient,
) {}
}
Github: Full code - EventStoreDbService
Publishing an Event
To publish an event, we use the appendToStream
method of the EventStoreDB client.
async publish<T, X>(event: Event<T, X>): Promise<void> {
const jsonEvent = this.mapEventToJsonEvent(event);
await this.client.appendToStream(event.streamName, jsonEvent);
}
Github: Full code - EventStoreDbService
The mapEventToJsonEvent
method allows converting an event into a JSON object that will be stored in EventStoreDB.
Reading Events from a Stream
To read events from a stream, we use the readStream
method of the EventStoreDB client.
This method returns an Observable that allows reading events from the stream.
readStream(
streamName: string,
options?: {
maxCount?: number;
},
): Observable<EventStoreEvent> {
const stream = this.client.readStream(streamName, options);
const observable = new Observable<EventStoreEvent>((subscriber) => {
stream.on('data', (event) => {
subscriber.next(
new EventStoreEvent({
data: event.event?.data,
metadata: event.event?.metadata,
type: event.event?.type,
}),
);
});
stream.on('error', (error) => {
subscriber.error(error);
});
stream.on('end', () => {
subscriber.complete();
});
return () => {
stream.cancel();
};
});
return observable;
}
Github: Full code - EventStoreDbService
Subscribing to a Stream
To subscribe to a stream, we check if the subscription already exists, and we create it if it doesn't. Then, we return an Observable that allows reading events from the stream as they arrive.
async initPersistentSubscriptionToStream(
streamName: string,
groupName: string,
): Promise<Observable<AcknowledgeableEventStoreEvent>> {
const currentSubscriptions =
await this.client.listAllPersistentSubscriptions();
const existingSubscription = currentSubscriptions.find(
(sub) => sub.groupName === groupName && sub.eventSource === streamName,
);
if (!existingSubscription) {
await this.client.createPersistentSubscriptionToStream(
streamName,
groupName,
{
checkPointAfter: 2000,
checkPointLowerBound: 10,
checkPointUpperBound: 1000,
consumerStrategyName: 'RoundRobin',
extraStatistics: true,
historyBufferSize: 500,
liveBufferSize: 500,
maxRetryCount: 10,
maxSubscriberCount: 'unbounded',
messageTimeout: 30000,
readBatchSize: 20,
resolveLinkTos: true,
startFrom: 'start',
},
);
}
const persistentSubscription =
this.client.subscribeToPersistentSubscriptionToStream(
streamName,
groupName,
);
const observable = new Observable<AcknowledgeableEventStoreEvent>(
(subscriber) => {
persistentSubscription.on('data', (event) => {
subscriber.next(
new AcknowledgeableEventStoreEvent(
{
type: event.event?.type,
data: event.event?.data,
metadata: event.event?.metadata,
retryCount: event.retryCount,
revision: Number(event.event.revision),
createdAt: event.event?.created,
},
{
ack: () => persistentSubscription.ack(event),
nack: (action, message) =>
persistentSubscription.nack(action, message, event),
},
),
);
});
persistentSubscription.on('error', (error) => {
subscriber.error(error);
});
persistentSubscription.on('end', () => {
subscriber.complete();
});
return () => {
persistentSubscription.unsubscribe();
};
},
);
return observable;
}
Github: Full code - EventStoreDbService
Features implementation
Adding / Removing Tables in the Restaurant
Each table has a unique identifier and a number of seats. This unique identifier will allow us to form the stream name in which we will store events related to this table.
For example, if the table's identifier is table-1
, the stream's name will be table-1
.
The stream will contain events of adding and removing the table.
Add a table
When adding a table, we publish a TableAddedEvent
event into the table's stream.
export class TableAddedEvent extends TableBaseEvent<
TableAddedEventData,
"table-added"
> {
constructor(data: { id: string; seats: number }) {
super({
data,
type: "table-added",
version: 1,
});
}
}
Github: Full code - TableAddedEvent
The abstract class TableBaseEvent
allows, among other things, defining the name of the stream in which the event will be stored.
export abstract class TableBaseEvent<Data extends { id: string }, Type> extends Event<
Data,
Type
> {
static STREAM_PREFIX = "table";
constructor({
data,
type,
version,
metadata,
}: {
data: Data;
type: Type;
version: number;
metadata?: EventMetadata;
}) {
super({
data,
type,
version,
streamName: TableBaseEvent.buildStreamName(data.id),
metadata,
});
}
static buildStreamName(id: string): string {
return `${TableBaseEvent.STREAM_PREFIX}-${id}`;
}
}
Github: Full code - Table base event
Our controller will instantiate an AddTableCommand
:
@Post('tables')
async addTable(@Body() body: AddTableRequest): Promise<AddTableResponse> {
const command = new AddTableCommand(body.id, body.seats);
await this.commandBus.execute<AddTableCommand, Table>(
command,
);
}
Github: Full code - Add table controller
This command is processed by an AddTableHandler
handler that will:
- Instantiate a
Table
aggregate with the table's identifier - Call the
add
method of the aggregate to add the table - Publish the events generated by the aggregate to our EventStore.
async execute(command: AddTableCommand): Promise<Table> {
const table = new Table(command.id);
table.add(command.seats);
await this.tableEventStoreRepository.publish(table.getUncommittedEvents());
return table;
}
Github: Full code - Add table handler
By calling the add
method, the aggregate will generate a TableAddedEvent
and add it to the list of uncommitted events. The apply
method allows applying the event to the aggregate by invoking the onTableAddedEvent
method.
export class Table extends AggregateRoot<TableEvent> {
public seats: number;
constructor(public id: string) {
if (id.length === 0) {
throw new InvalidTableIdException(id);
}
super();
}
add(seats: number) {
this.apply(
new TableAddedEvent({
id: this.id,
seats,
}),
);
}
private onTableAddedEvent(event: TableAddedEvent) {
this.id = event.data.id;
this.seats = event.data.seats;
}
protected getEventHandler<T extends TableEvent>(
event: T,
): Type<IEventHandler<T>> {
switch (event.type) {
case 'table-added':
return this.onTableAddedEvent.bind(this);
default:
throw new Error(`No handler defined for event type ${event.type}`);
}
}
}
Github: Full code - Table aggregate
To verify that a table with the same identifier does not already exist, you can read the events from the table's stream and apply them to the aggregate to reconstruct its state. If the aggregate already exists, you can raise an exception.
To apply these events, you can add a static method called fromEvents
to your aggregate.
static fromEventsHistory(events: TableEvent[]) {
const table = new Table(events[0].data.id);
for (const event of events) {
table.apply(event, true);
}
return table;
}
Github: Full code - Table aggregate
This method will apply the events one by one, thus building the aggregate's state. It can be called from a repository responsible for retrieving events from a stream using the EventSourcing
module.
In this way, we can add a verification in our AddTableHandler
handler:
async execute(command: AddTableCommand): Promise<Table> {
const table = await this.tableEventStoreRepository.getById(command.id);
if (table) {
throw new TableAlreadyExistsException(command.id);
}
const table = new Table(command.id);
table.add(command.seats);
await this.tableEventStoreRepository.publish(table.getUncommittedEvents());
return table;
}
Github: Full code - Add table handler
Removing a Table
When removing a table, we publish a TableRemovedEvent
event into the table's stream.
export class TableRemovedEvent extends TableBaseEvent<
TableRemovedEventData,
"table-removed"
> {
constructor(data: { id: string }) {
super({
data,
type: "table-removed",
version: 1,
});
}
}
Github: Full code - Table removed event
The removal of a table is managed similarly to adding a table, with the difference that the aggregate will generate a TableRemovedEvent
when calling the remove
method.
Here is the RemoveTableHandler
handler:
async execute(command: RemoveTableCommand): Promise<void> {
const table = await this.tableEventStoreRepository.findTableById(
command.id,
);
if (!table) {
throw new TableNotFoundError(command.id);
}
table.remove();
await this.tableEventStoreRepository.publish(table.getUncommittedEvents());
}
Github: Full code - Remove table handler
Booking a Table
Initiating a Reservation
From a schedule listing available time slots, the client can choose a slot and initiate a reservation. A command is created and sent to the booker
service:
How can we ensure that the table has not been removed in the meantime? How can we ensure that the table will not be removed while it is reserved?
To address these questions, we will use a table locking system. When a table is reserved, it is locked. As long as it is locked, it cannot be removed.
Locking a Table and Confirming the Reservation
To handle these concurrency issues, we can use the Choreography-based saga
pattern to place a lock on the table and confirm the reservation.
Choreography-based saga pattern documentation
This way, we can ensure that the table is not removed in the meantime, and that the reservation is confirmed.
Implementation in the table-manager
service
All events of type TableBookingInitiatedEvent
are listened to by the TableLockingSaga
. When an event of this type is received, the saga will place a lock on the table and publish a TableLockPlacedEvent
.
async onTableBookingInitiated(resolvedEvent: AcknowledgeableEventStoreEvent) {
const eventData = parseTableBookingInitiatedEventData(resolvedEvent.data);
const eventMetadata = resolvedEvent.metadata as JSONMetadata;
const command = new PlaceTableLockCommand(
eventData.tableId,
eventData.timeSlot,
eventMetadata.$correlationId,
);
try {
await this.commandBus.execute(command);
} catch (error) {
if (error instanceof TableNotFoundError) {
await resolvedEvent.ack();
return;
}
throw error;
}
await resolvedEvent.ack();
}
Github: Full code - Table locking saga
Implementation in the booker
service
All events of type TableLockPlacedEvent
are listened to by the TableBookingSaga
, which will confirm the reservation by publishing a TableBookingConfirmedEvent
event.
async onTableLockPlaced(resolvedEvent: AcknowledgeableEventStoreEvent) {
const eventMetadata = resolvedEvent.metadata as JSONMetadata;
const command = new ConfirmTableBookingCommand(
eventMetadata.$correlationId,
);
await this.commandBus.execute(command);
await resolvedEvent.ack();
}
Github: Full code - Table booking saga
Unlocking a Table
When a reservation is canceled or the time slot has passed, the table is unlocked.
Scenario 1: Reservation is Canceled
Scenario 2: Time Slot has Passed
In this article, we have seen how to implement an Event-Sourcing architecture with NestJS and EventStoreDB. In the next part, we will learn how to read the events stored in EventStoreDB and project them into a read database.
The complete code for this project is available on GitHub: Github: Event-Sourcing with NestJS and EventStoreDB
Your feedback and comments are welcome. Feel free to open an issue on the GitHub repository if you have any questions or suggestions for improvement.