Event-Sourcing avec NestJS et EventStoreDB (Partie 1: commandes et événements)

July 31, 2023

Introduction

Dans cet article, nous allons nous intéresser à l'implémentation d'une architecture Event-Sourcing avec NestJS et EventStoreDB dans le cadre d'une application de réservation de tables dans un restaurant.

Cette étude sera composée de deux parties :

  • Dans la première partie, nous allons nous intéresser uniquement au modèle d'écriture des données. Nous allons voir comment écrire les données dans EventStoreDB, et comment les lire.
  • Dans la deuxième partie, nous allons nous intéresser au modèle de lecture des données. Nous allons voir comment lire les données dans EventStoreDB, et comment les projeter dans une base de données de lecture.

CQRS et Event-Sourcing : Des patterns d'architecture pour une meilleure séparation des responsabilités

CQRS (Command and Query Responsibility Segregation)

Le CQRS est un pattern d'architecture puissant qui vise à séparer les opérations de lecture (queries) des opérations d'écriture (commands) au sein d'une application. Cette approche présente plusieurs avantages majeurs pour la conception et la performance du système. Il s'intègre souvent harmonieusement avec l'architecture Event-Sourcing.

Event-Sourcing

L'architecture Event-Sourcing constitue un complément naturel au CQRS. Elle consiste à stocker les événements métiers dans un système de stockage dédié appelé "EventStore". L'idée fondamentale est de conserver un enregistrement de tous les événements significatifs qui se produisent dans l'application.

L'utilisation combinée du CQRS et de l'Event-Sourcing offre une architecture robuste, modulaire et performante, permettant une meilleure séparation des responsabilités, une gestion plus efficace des problèmes de concurrence, et une traçabilité accrue des changements dans l'application. Ces patterns sont devenus des choix privilégiés pour de nombreuses applications modernes, offrant des avantages significatifs en termes de maintenance, de flexibilité et de scalabilité.

Présentation du projet

Dans notre projet d'exemple, nous allons créer une API qui permet de gérer les réservations de tables dans un restaurant. Le gérant du restaurant pourra ajouter ou supprimer des tables, et les clients pourront réserver une table selon une liste de créneaux définis.

Arborescence du projet

Le projet est composé de deux services : un service booker qui permet de gérer les réservations de tables, et un service table-manager qui permet de gérer les tables du restaurant.

Les deux services communiqueront entre eux via des événements. Les événements seront stockés dans un EventStore.

Deux librairies seront créées dans un dossier shared pour gérer les événements et la communication entre les services. Ces librairies seront partagées dans un workspace pnpm.

Voici l'arborescence du projet :

services
├── booker
├── table-manager
shared
├── event-sourcing
├── events

Module Event-Sourcing

Afin de simplifier la communication entre les services, nous allons créer un module EventSourcing qui permettra de gérer la communication entre les services.

Ce module exporte un service EventStoreService qui permet de :

  • Publier un événement
  • Lire les événements d'un stream
  • Souscrire à un stream

Ce service repose essentiellement sur le client NodeJS d'EventStoreDB qui est injecté dans le service.

@Injectable()
export class EventStoreDbService implements EventStoreService {
  constructor(
    @Inject(EVENT_STORE_DB_CLIENT)
    private readonly client: EventStoreDBClient,
  ) {}
}

Github: Code complet - EventStoreDbService

Publier un événement

Pour publier un événement, on utilise la méthode appendToStream du client EventStoreDB.

async publish<T, X>(event: Event<T, X>): Promise<void> {
    const jsonEvent = this.mapEventToJsonEvent(event);

    await this.client.appendToStream(event.streamName, jsonEvent);
  }

Github: Code complet - EventStoreDbService

La méthode mapEventToJsonEvent permet de convertir un événement en un objet JSON qui sera stocké dans EventStoreDB.

Lecture des événements d'un stream

Pour lire les événements d'un stream, on utilise la méthode readStream du client EventStoreDB.

Cette méthode retourne un Observable qui permet de lire les événements du stream.

readStream(
    streamName: string,
    options?: {
      maxCount?: number;
    },
  ): Observable<EventStoreEvent> {
    const stream = this.client.readStream(streamName, options);

    const observable = new Observable<EventStoreEvent>((subscriber) => {
      stream.on('data', (event) => {
        subscriber.next(
          new EventStoreEvent({
            data: event.event?.data,
            metadata: event.event?.metadata,
            type: event.event?.type,
          }),
        );
      });

      stream.on('error', (error) => {
        subscriber.error(error);
      });

      stream.on('end', () => {
        subscriber.complete();
      });

      return () => {
        stream.cancel();
      };
    });

    return observable;
  }

Github: Full code - EventStoreDbService

Souscrire à un stream

Pour souscrire à un stream, on vérifie si la souscription existe déjà, et on la crée si elle n'existe pas. Ensuite, on retourne un Observable qui permet de lire les événements du stream au fur et à mesure qu'ils arrivent.

async initPersistentSubscriptionToStream(
    streamName: string,
    groupName: string,
  ): Promise<Observable<AcknowledgeableEventStoreEvent>> {
    const currentSubscriptions =
      await this.client.listAllPersistentSubscriptions();

    const existingSubscription = currentSubscriptions.find(
      (sub) => sub.groupName === groupName && sub.eventSource === streamName,
    );

    if (!existingSubscription) {
      await this.client.createPersistentSubscriptionToStream(
        streamName,
        groupName,
        {
          checkPointAfter: 2000,
          checkPointLowerBound: 10,
          checkPointUpperBound: 1000,
          consumerStrategyName: 'RoundRobin',
          extraStatistics: true,
          historyBufferSize: 500,
          liveBufferSize: 500,
          maxRetryCount: 10,
          maxSubscriberCount: 'unbounded',
          messageTimeout: 30000,
          readBatchSize: 20,
          resolveLinkTos: true,
          startFrom: 'start',
        },
      );
    }

    const persistentSubscription =
      this.client.subscribeToPersistentSubscriptionToStream(
        streamName,
        groupName,
      );

    const observable = new Observable<AcknowledgeableEventStoreEvent>(
      (subscriber) => {
        persistentSubscription.on('data', (event) => {
          subscriber.next(
            new AcknowledgeableEventStoreEvent(
              {
                type: event.event?.type,
                data: event.event?.data,
                metadata: event.event?.metadata,
                retryCount: event.retryCount,
                revision: Number(event.event.revision),
                createdAt: event.event?.created,
              },
              {
                ack: () => persistentSubscription.ack(event),
                nack: (action, message) =>
                  persistentSubscription.nack(action, message, event),
              },
            ),
          );
        });

        persistentSubscription.on('error', (error) => {
          subscriber.error(error);
        });

        persistentSubscription.on('end', () => {
          subscriber.complete();
        });

        return () => {
          persistentSubscription.unsubscribe();
        };
      },
    );

    return observable;
  }

Github: Full code - EventStoreDbService

Implémentation des fonctionnalités

Ajouter / supprimer des tables au restaurant

Chaque table possède un identifiant unique et un nombre de places. Cet identifiant unique va nous permettre de constituer le nom du stream dans lequel nous allons stocker les événements liés à cette table.

Par exemple, si l'identifiant de la table est table-1, le nom du stream sera table-1.

Table stream representation

Le stream contiendra les événements d'ajout et de suppression de la table.

Ajouter une table

Add table event modeling

Lorsqu'on ajoute une table, on publie un événement TableAddedEvent dans le stream de la table.

export class TableAddedEvent extends TableBaseEvent<
  TableAddedEventData,
  "table-added"
> {
  constructor(data: { id: string; seats: number }) {
    super({
      data,
      type: "table-added",
      version: 1,
    });
  }
}

Github: Code complet - Table added event

La classe abstraite TableBaseEvent permet notamment de définir le nom du stream dans lequel l'événement sera stocké.

export abstract class TableBaseEvent<Data extends { id: string }, Type> extends Event<
  Data,
  Type
> {
  static STREAM_PREFIX = "table";

  constructor({
    data,
    type,
    version,
    metadata,
  }: {
    data: Data;
    type: Type;
    version: number;
    metadata?: EventMetadata;
  }) {
    super({
      data,
      type,
      version,
      streamName: TableBaseEvent.buildStreamName(data.id),
      metadata,
    });
  }

  static buildStreamName(id: string): string {
    return `${TableBaseEvent.STREAM_PREFIX}-${id}`;
  }
}

Github: Code complet - Table base event

Notre controller va instancier une commande AddTableCommand :

 @Post('tables')
  async addTable(@Body() body: AddTableRequest): Promise<AddTableResponse> {
    const command = new AddTableCommand(body.id, body.seats);
    await this.commandBus.execute<AddTableCommand, Table>(
        command,
      );
  }

Github: Code complet - Add table controller

Cette commande est traitée par un handler AddTableHandler qui va :

  • Instancier un aggregate Table avec l'identifiant de la table
  • Appeler la méthode add de l'aggregate pour ajouter la table
  • Publier les événements générés par l'aggregate vers notre EventStore
async execute(command: AddTableCommand): Promise<Table> {
    const table = new Table(command.id);
    table.add(command.seats);

    await this.tableEventStoreRepository.publish(table.getUncommittedEvents());

    return table;
  }

Github: Code complet - Add table handler

En appelant la méthode add, l'aggregate va générer un événement TableAddedEvent et l'ajouter à la liste des événements non commités. La méthode apply permet d'appliquer l'événement à l'aggregate, en appelant la méthode onTableAddedEvent.

export class Table extends AggregateRoot<TableEvent> {
  public seats: number;

  constructor(public id: string) {
    if (id.length === 0) {
      throw new InvalidTableIdException(id);
    }

    super();
  }

  add(seats: number) {
    this.apply(
      new TableAddedEvent({
        id: this.id,
        seats,
      }),
    );
  }

  private onTableAddedEvent(event: TableAddedEvent) {
    this.id = event.data.id;
    this.seats = event.data.seats;
  }

  protected getEventHandler<T extends TableEvent>(
    event: T,
  ): Type<IEventHandler<T>> {
    switch (event.type) {
      case 'table-added':
        return this.onTableAddedEvent.bind(this);
      default:
        throw new Error(`No handler defined for event type ${event.type}`);
    }
  }
}

Github: Code complet - Table aggregate

Comment vérifier qu'une table avec le même identifiant n'existe pas déjà ?

Il suffit de lire les événements du stream de la table et de les appliquer à l'aggregate pour reconstituer son état. Si l'aggregate existe déjà, on lève une exception.

Pour appliquer ces événements, nous pouvons ajouter une méthode statique fromEvents à notre aggregate.

static fromEventsHistory(events: TableEvent[]) {
    const table = new Table(events[0].data.id);

    for (const event of events) {
      table.apply(event, true);
    }

    return table;
  }

Github: Code complet - Table aggregate

Cette méthode va appliquer les événements un à un, et donc constituer l'état de l'aggregate. Elle peut être appelée depuis un repository, chargé de récupérer les événements d'un stream grâce à au module EventSourcing.

Ainsi, nous pouvons ajouter une vérification dans notre handler AddTableHandler :

async execute(command: AddTableCommand): Promise<Table> {
    const table = await this.tableEventStoreRepository.getById(command.id);

    if (table) {
      throw new TableAlreadyExistsException(command.id);
    }

    const table = new Table(command.id);
    table.add(command.seats);

    await this.tableEventStoreRepository.publish(table.getUncommittedEvents());

    return table;
  }

Github: Code complet - Add table handler

Supprimer une table

Lorsqu'on supprime une table, on publie un événement TableRemovedEvent dans le stream de la table.

export class TableRemovedEvent extends TableBaseEvent<
  TableRemovedEventData,
  "table-removed"
> {
  constructor(data: { id: string }) {
    super({
      data,
      type: "table-removed",
      version: 1,
    });
  }
}

La suppression d'une table est gérée de la même manière que l'ajout d'une table, à la différence que l'aggregate va générer un événement TableRemovedEvent lorsqu'on appelle la méthode remove.

Voici le handler RemoveTableHandler :

async execute(command: RemoveTableCommand): Promise<void> {
    const table = await this.tableEventStoreRepository.findTableById(
      command.id,
    );

    if (!table) {
      throw new TableNotFoundError(command.id);
    }

    table.remove();

    await this.tableEventStoreRepository.publish(table.getUncommittedEvents());
  }

Github: Code complet - Remove table handler

Réserver une table

Initier une réservation

A partir d'un planning listant les créneaux disponibles, le client peut choisir un créneau et initier une réservation. Une commande est créée et envoyée au service booker :

Initiating table booking event modeling

Comment s'assurer que la table n'a pas été supprimée entre temps ? Comment s'assurer que la table ne va pas être supprimée alors qu'elle est réservée ?

Pour répondre à ces questions, nous allons utiliser un système de verrouillage de la table. Lorsqu'une table est réservée, elle est verrouillée. Tant qu'elle est verrouillée, elle ne peut pas être supprimée.

Verrouiller une table et confirmer la réservation

Afin de gérer ces problèmes de concurrence, nous pouvons utiliser le pattern Choreography-based saga pour placer un verrou sur la table et confirmer la réservation.

Choerography-based saga pattern documentation

Locking table and confirm booking event modeling

Ainsi, nous pouvons nous assurer que la table n'est pas supprimée entre temps, et que la réservation est confirmée.

Implémentation côté table-manager

Tous les événements de type TableBookingInitiatedEvent sont écoutés par la saga TableLockingSaga. Lorsqu'un événement de ce type est reçu, la saga va placer un verrou sur la table et publier un événement TableLockPlacedEvent.

async onTableBookingInitiated(resolvedEvent: AcknowledgeableEventStoreEvent) {
    const eventData = parseTableBookingInitiatedEventData(resolvedEvent.data);

    const eventMetadata = resolvedEvent.metadata as JSONMetadata;

    const command = new PlaceTableLockCommand(
      eventData.tableId,
      eventData.timeSlot,
      eventMetadata.$correlationId,
    );

    try {
      await this.commandBus.execute(command);
    } catch (error) {
      if (error instanceof TableNotFoundError) {
        await resolvedEvent.ack();
        return;
      }
      throw error;
    }

    await resolvedEvent.ack();
  }

Github: Code complet - Table locking saga

Implémentation côté booker

Tous les événements de type TableLockPlacedEvent sont écoutés par la saga TableBookingSaga qui va confirmer la réservation en publiant un événement TableBookingConfirmedEvent.

async onTableLockPlaced(resolvedEvent: AcknowledgeableEventStoreEvent) {
    const eventMetadata = resolvedEvent.metadata as JSONMetadata;

    const command = new ConfirmTableBookingCommand(
      eventMetadata.$correlationId,
    );

    await this.commandBus.execute(command);

    await resolvedEvent.ack();
  }

Github: Code complet - Table booking saga

Déverouiller une table

Lorsqu'une réservation est annulée, où que le créneau est passé, la table est déverrouillée.

Scénario 1 : La réservation est annulée

Unlocking table on cancellation event modeling

Scénario 2 : Le créneau est passé

Unlocking table when time slot is passed event modeling

Dans cet article, nous avons vu comment implémenter une architecture Event-Sourcing avec NestJS et EventStoreDB. Dans la prochaine partie, nous allons voir comment lire les événements stockés dans EventStoreDB et les projeter dans une base de données de lecture.

Le code complet de ce projet est disponible sur GitHub : Github: Event-Sourcing with NestJS and EventStoreDB

Vos remarques et commentaires sont les bienvenus, n'hésitez pas à ouvrir une issue sur le repository GitHub si vous avez des questions ou des suggestions d'amélioration.


Profile picture

En tant qu'ingénieur backend Node.js, je trouve une réelle satisfaction à exploiter ma passion pour la programmation afin de concevoir des solutions solides et évolutives. Je relève chaque défi avec détermination, m'efforçant de créer des applications haute performance qui garantissent une expérience utilisateur optimale. Vous pouvez me contacter sur LinkedIn.