Event-Sourcing with NestJS and EventStoreDB (Part 1: commands and events)

July 31, 2023

Introduction

In this article, we will focus on implementing an Event-Sourcing architecture using NestJS and EventStoreDB for a restaurant table reservation application.

This study will be divided into two parts:

  • In the first part, we will solely focus on the data writing model. We will explore how to write data into EventStoreDB and how to read it.
  • In the second part, we will concentrate on the data reading model. We will learn how to read data from EventStoreDB and how to project it into a read database.

CQRS and Event-Sourcing: Architectural Patterns for Better Separation of Concerns

CQRS (Command and Query Responsibility Segregation)

CQRS is a powerful architectural pattern aimed at separating read operations (queries) from write operations (commands) within an application. This approach brings several significant advantages to the system's design and performance. It often integrates seamlessly with the Event-Sourcing architecture.

Event-Sourcing

Event-Sourcing architecture is a natural complement to CQRS. It involves storing business events in a dedicated storage system called "EventStore." The fundamental idea is to retain a record of all significant events that occur within the application.

The combined use of CQRS and Event-Sourcing offers a robust, modular, and high-performance architecture, enabling better separation of concerns, more efficient handling of concurrency issues, and enhanced traceability of changes within the application. These patterns have become favored choices for many modern applications, providing significant benefits in terms of maintenance, flexibility, and scalability.

Project Overview

In our example project, we will create an API that manages table reservations in a restaurant. The restaurant manager will be able to add or remove tables, and customers will be able to make reservations based on a list of defined time slots.

Project Structure

The project is composed of two services: a booker service for managing table reservations and a table-manager service for managing restaurant tables.

The two services will communicate with each other using events, which will be stored in an EventStore.

Two libraries will be created in a shared folder to handle events and communication between the services. These libraries will be shared within a pnpm workspace.

Here is the project structure:

services
├── booker
├── table-manager
shared
├── event-sourcing
├── events

Event-Sourcing Module

To streamline communication between the services, we will create an EventSourcing module responsible for managing the communication between services.

This module exports an EventStoreService that enables the following functionalities:

  • Publishing an event
  • Reading events from a stream
  • Subscribing to a stream

This service primarily relies on the EventStoreDB NodeJS client, which is injected into the service.

@Injectable()
export class EventStoreDbService implements EventStoreService {
  constructor(
    @Inject(EVENT_STORE_DB_CLIENT)
    private readonly client: EventStoreDBClient,
  ) {}
}

Github: Full code - EventStoreDbService

Publishing an Event

To publish an event, we use the appendToStream method of the EventStoreDB client.

async publish<T, X>(event: Event<T, X>): Promise<void> {
    const jsonEvent = this.mapEventToJsonEvent(event);

    await this.client.appendToStream(event.streamName, jsonEvent);
  }

Github: Full code - EventStoreDbService

The mapEventToJsonEvent method allows converting an event into a JSON object that will be stored in EventStoreDB.

Reading Events from a Stream

To read events from a stream, we use the readStream method of the EventStoreDB client.

This method returns an Observable that allows reading events from the stream.

readStream(
    streamName: string,
    options?: {
      maxCount?: number;
    },
  ): Observable<EventStoreEvent> {
    const stream = this.client.readStream(streamName, options);

    const observable = new Observable<EventStoreEvent>((subscriber) => {
      stream.on('data', (event) => {
        subscriber.next(
          new EventStoreEvent({
            data: event.event?.data,
            metadata: event.event?.metadata,
            type: event.event?.type,
          }),
        );
      });

      stream.on('error', (error) => {
        subscriber.error(error);
      });

      stream.on('end', () => {
        subscriber.complete();
      });

      return () => {
        stream.cancel();
      };
    });

    return observable;
  }

Github: Full code - EventStoreDbService

Subscribing to a Stream

To subscribe to a stream, we check if the subscription already exists, and we create it if it doesn't. Then, we return an Observable that allows reading events from the stream as they arrive.

async initPersistentSubscriptionToStream(
    streamName: string,
    groupName: string,
  ): Promise<Observable<AcknowledgeableEventStoreEvent>> {
    const currentSubscriptions =
      await this.client.listAllPersistentSubscriptions();

    const existingSubscription = currentSubscriptions.find(
      (sub) => sub.groupName === groupName && sub.eventSource === streamName,
    );

    if (!existingSubscription) {
      await this.client.createPersistentSubscriptionToStream(
        streamName,
        groupName,
        {
          checkPointAfter: 2000,
          checkPointLowerBound: 10,
          checkPointUpperBound: 1000,
          consumerStrategyName: 'RoundRobin',
          extraStatistics: true,
          historyBufferSize: 500,
          liveBufferSize: 500,
          maxRetryCount: 10,
          maxSubscriberCount: 'unbounded',
          messageTimeout: 30000,
          readBatchSize: 20,
          resolveLinkTos: true,
          startFrom: 'start',
        },
      );
    }

    const persistentSubscription =
      this.client.subscribeToPersistentSubscriptionToStream(
        streamName,
        groupName,
      );

    const observable = new Observable<AcknowledgeableEventStoreEvent>(
      (subscriber) => {
        persistentSubscription.on('data', (event) => {
          subscriber.next(
            new AcknowledgeableEventStoreEvent(
              {
                type: event.event?.type,
                data: event.event?.data,
                metadata: event.event?.metadata,
                retryCount: event.retryCount,
                revision: Number(event.event.revision),
                createdAt: event.event?.created,
              },
              {
                ack: () => persistentSubscription.ack(event),
                nack: (action, message) =>
                  persistentSubscription.nack(action, message, event),
              },
            ),
          );
        });

        persistentSubscription.on('error', (error) => {
          subscriber.error(error);
        });

        persistentSubscription.on('end', () => {
          subscriber.complete();
        });

        return () => {
          persistentSubscription.unsubscribe();
        };
      },
    );

    return observable;
  }

Github: Full code - EventStoreDbService

Features implementation

Adding / Removing Tables in the Restaurant

Each table has a unique identifier and a number of seats. This unique identifier will allow us to form the stream name in which we will store events related to this table.

For example, if the table's identifier is table-1, the stream's name will be table-1.

Table stream representation

The stream will contain events of adding and removing the table.

Add a table

Add table event modeling

When adding a table, we publish a TableAddedEvent event into the table's stream.

export class TableAddedEvent extends TableBaseEvent<
  TableAddedEventData,
  "table-added"
> {
  constructor(data: { id: string; seats: number }) {
    super({
      data,
      type: "table-added",
      version: 1,
    });
  }
}

Github: Full code - TableAddedEvent

The abstract class TableBaseEvent allows, among other things, defining the name of the stream in which the event will be stored.

export abstract class TableBaseEvent<Data extends { id: string }, Type> extends Event<
  Data,
  Type
> {
  static STREAM_PREFIX = "table";

  constructor({
    data,
    type,
    version,
    metadata,
  }: {
    data: Data;
    type: Type;
    version: number;
    metadata?: EventMetadata;
  }) {
    super({
      data,
      type,
      version,
      streamName: TableBaseEvent.buildStreamName(data.id),
      metadata,
    });
  }

  static buildStreamName(id: string): string {
    return `${TableBaseEvent.STREAM_PREFIX}-${id}`;
  }
}

Github: Full code - Table base event

Our controller will instantiate an AddTableCommand:

 @Post('tables')
  async addTable(@Body() body: AddTableRequest): Promise<AddTableResponse> {
    const command = new AddTableCommand(body.id, body.seats);
    await this.commandBus.execute<AddTableCommand, Table>(
        command,
      );
  }

Github: Full code - Add table controller

This command is processed by an AddTableHandler handler that will:

  • Instantiate a Table aggregate with the table's identifier
  • Call the add method of the aggregate to add the table
  • Publish the events generated by the aggregate to our EventStore.
async execute(command: AddTableCommand): Promise<Table> {
    const table = new Table(command.id);
    table.add(command.seats);

    await this.tableEventStoreRepository.publish(table.getUncommittedEvents());

    return table;
  }

Github: Full code - Add table handler

By calling the add method, the aggregate will generate a TableAddedEvent and add it to the list of uncommitted events. The apply method allows applying the event to the aggregate by invoking the onTableAddedEvent method.

export class Table extends AggregateRoot<TableEvent> {
  public seats: number;

  constructor(public id: string) {
    if (id.length === 0) {
      throw new InvalidTableIdException(id);
    }

    super();
  }

  add(seats: number) {
    this.apply(
      new TableAddedEvent({
        id: this.id,
        seats,
      }),
    );
  }

  private onTableAddedEvent(event: TableAddedEvent) {
    this.id = event.data.id;
    this.seats = event.data.seats;
  }

  protected getEventHandler<T extends TableEvent>(
    event: T,
  ): Type<IEventHandler<T>> {
    switch (event.type) {
      case 'table-added':
        return this.onTableAddedEvent.bind(this);
      default:
        throw new Error(`No handler defined for event type ${event.type}`);
    }
  }
}

Github: Full code - Table aggregate

To verify that a table with the same identifier does not already exist, you can read the events from the table's stream and apply them to the aggregate to reconstruct its state. If the aggregate already exists, you can raise an exception.

To apply these events, you can add a static method called fromEvents to your aggregate.

static fromEventsHistory(events: TableEvent[]) {
    const table = new Table(events[0].data.id);

    for (const event of events) {
      table.apply(event, true);
    }

    return table;
  }

Github: Full code - Table aggregate

This method will apply the events one by one, thus building the aggregate's state. It can be called from a repository responsible for retrieving events from a stream using the EventSourcing module.

In this way, we can add a verification in our AddTableHandler handler:

async execute(command: AddTableCommand): Promise<Table> {
    const table = await this.tableEventStoreRepository.getById(command.id);

    if (table) {
      throw new TableAlreadyExistsException(command.id);
    }

    const table = new Table(command.id);
    table.add(command.seats);

    await this.tableEventStoreRepository.publish(table.getUncommittedEvents());

    return table;
  }

Github: Full code - Add table handler

Removing a Table

When removing a table, we publish a TableRemovedEvent event into the table's stream.

export class TableRemovedEvent extends TableBaseEvent<
  TableRemovedEventData,
  "table-removed"
> {
  constructor(data: { id: string }) {
    super({
      data,
      type: "table-removed",
      version: 1,
    });
  }
}

Github: Full code - Table removed event

The removal of a table is managed similarly to adding a table, with the difference that the aggregate will generate a TableRemovedEvent when calling the remove method.

Here is the RemoveTableHandler handler:

async execute(command: RemoveTableCommand): Promise<void> {
    const table = await this.tableEventStoreRepository.findTableById(
      command.id,
    );

    if (!table) {
      throw new TableNotFoundError(command.id);
    }

    table.remove();

    await this.tableEventStoreRepository.publish(table.getUncommittedEvents());
  }

Github: Full code - Remove table handler

Booking a Table

Initiating a Reservation

From a schedule listing available time slots, the client can choose a slot and initiate a reservation. A command is created and sent to the booker service:

Initiating table booking event modeling

How can we ensure that the table has not been removed in the meantime? How can we ensure that the table will not be removed while it is reserved?

To address these questions, we will use a table locking system. When a table is reserved, it is locked. As long as it is locked, it cannot be removed.

Locking a Table and Confirming the Reservation

To handle these concurrency issues, we can use the Choreography-based saga pattern to place a lock on the table and confirm the reservation.

Choreography-based saga pattern documentation

Locking table and confirm booking event modeling

This way, we can ensure that the table is not removed in the meantime, and that the reservation is confirmed.

Implementation in the table-manager service

All events of type TableBookingInitiatedEvent are listened to by the TableLockingSaga. When an event of this type is received, the saga will place a lock on the table and publish a TableLockPlacedEvent.

async onTableBookingInitiated(resolvedEvent: AcknowledgeableEventStoreEvent) {
    const eventData = parseTableBookingInitiatedEventData(resolvedEvent.data);

    const eventMetadata = resolvedEvent.metadata as JSONMetadata;

    const command = new PlaceTableLockCommand(
      eventData.tableId,
      eventData.timeSlot,
      eventMetadata.$correlationId,
    );

    try {
      await this.commandBus.execute(command);
    } catch (error) {
      if (error instanceof TableNotFoundError) {
        await resolvedEvent.ack();
        return;
      }
      throw error;
    }

    await resolvedEvent.ack();
  }

Github: Full code - Table locking saga

Implementation in the booker service

All events of type TableLockPlacedEvent are listened to by the TableBookingSaga, which will confirm the reservation by publishing a TableBookingConfirmedEvent event.

async onTableLockPlaced(resolvedEvent: AcknowledgeableEventStoreEvent) {
    const eventMetadata = resolvedEvent.metadata as JSONMetadata;

    const command = new ConfirmTableBookingCommand(
      eventMetadata.$correlationId,
    );

    await this.commandBus.execute(command);

    await resolvedEvent.ack();
  }

Github: Full code - Table booking saga

Unlocking a Table

When a reservation is canceled or the time slot has passed, the table is unlocked.

Scenario 1: Reservation is Canceled

Unlocking table on cancellation event modeling

Scenario 2: Time Slot has Passed

Unlocking table when time slot is passed event modeling

In this article, we have seen how to implement an Event-Sourcing architecture with NestJS and EventStoreDB. In the next part, we will learn how to read the events stored in EventStoreDB and project them into a read database.

The complete code for this project is available on GitHub: Github: Event-Sourcing with NestJS and EventStoreDB

Your feedback and comments are welcome. Feel free to open an issue on the GitHub repository if you have any questions or suggestions for improvement.


Profile picture

As a Node.js enthusiastic backend engineer, I find genuine satisfaction in harnessing my passion for programming to design robust and scalable solutions. I tackle every challenge with determination, striving to create high-performance applications that ensure an optimal user experience. You can contact me on LinkedIn.