Event-Sourcing avec NestJS et EventStoreDB (Partie 2: projections)

August 10, 2023

Introduction

Dans la première partie de cet article, nous avons vu comment créer un projet NestJS avec EventStoreDB. Nous avons également vu comment traiter des commandes, publier des événements et réagir aux événements publiés. Dans cette deuxième partie, nous allons voir comment créer des projections avec EventStoreDB et NestJS.

Projections

Les projections sont utilisées pour créer des modèles de lecture optimisés pour les besoins de l'application. Les projections sont créées à partir des événements stockés dans EventStoreDB. Elles sont mises à jour en temps réel à chaque fois qu'un événement est publié dans EventStoreDB.

Un exemple commun est celui du solde d'un compte bancaire, qui est mis à jour à chaque fois qu'un dépôt ou un retrait est effectué sur le compte. Dans ce cas, le solde du compte est une projection créée à partir des événements de dépôt et de retrait.

Exemple d'une projection pour le solde d'un compte

Créer une projection à partir d'un stream

Pour créer une projection à partir d'un stream, nous devons nous abonner à ce stream et consommer les événements publiés dans ce stream. Pour ce faire, nous pouvons utiliser la méthode initPersistentSubscriptionToStream du module Event-Sourcing créé dans la première partie.

Cette méthode va créer une nouvelle souscription persistante dans le cas où elle n'existe pas. Elle va ensuite retourner un Observable RxJS qui va nous servira à consommer les événements publiés dans le stream.

La liste des tables du restaurant

Nous allons créer une première projection pour récupérer la liste des tables du restaurant.

Projection des événements pour les tables

Une projection peut être initialisée de cette manière

  async init() {
    this.logger.log('Initializing TableProjection');

    const streamName = '$ce-table'; // Abonnement à tous les événements de la catégorie 'table' (tous les streams commençant par 'table-')

    const groupName = this.configService.get<string>(
      'TABLE_PROJECTION_GROUP_NAME',
    );

    const source$ =
      await this.eventStoreDbService.initPersistentSubscriptionToStream(
        streamName,
        groupName,
      );

    source$
      .pipe(
        groupBy((event) => (event.data as TableEvent['data']).id), 
        mergeMap((group$) =>
          group$.pipe(concatMap((event) => this.handleEvent(event))),
        ),
      )
      .subscribe();
  }

Code complet - TableProjection

L'utilisation de RxJS est pratique ici, elle nous permet notamment de grouper facilement les événements par id de table. C'est nécessaire ici dans le cas où plusieurs événements se suivent pour une même table : on souhaite attendre que le premier événement soit traité avant de traiter le suivant.

Ensuite, une méthode handleEvent est appelée pour traiter chaque événement.

private async handleEvent(resolvedEvent: AcknowledgeableEventStoreEvent) {
    this.logger.debug(`Handling event: ${resolvedEvent.type}`);

    try {
      switch (resolvedEvent.type as TableEventType) {
        case 'table-added':
          await this.onTableAdded(resolvedEvent);
          break;
        case 'table-removed':
          await this.onTableRemoved(resolvedEvent);
          break;
        case 'table-lock-placed':
          await this.onTableLockPlaced(resolvedEvent);
          break;
        case 'table-lock-removed':
          await this.onTableLockRemoved(resolvedEvent);
          break;
        default:
          this.logger.warn(`Unhandled event type: ${resolvedEvent.type}`);
      }
    } catch (error) {
      this.logger.error(error, error.stack);
    }
  }

Code complet - TableProjection

Cette méthode va appeler une méthode spécifique pour chaque type d'événement. Par exemple, pour l'événement table-added, la méthode onTableAdded est appelée.

private async onTableAdded(resolvedEvent: AcknowledgeableEventStoreEvent) {
    const eventData = parseTableAddedEventData(resolvedEvent.data);

    const currentTable = await this.db
      .select({
        id: tables.id,
      })
      .from(tables)
      .where(eq(tables.id, eventData.id));

    if (currentTable.length > 0) {
      await resolvedEvent.ack();
      return;
    }

    await this.db.insert(tables).values({
      id: eventData.id,
      seats: eventData.seats,
      revision: resolvedEvent.revision,
    });

    await resolvedEvent.ack();
  }

Code complet - TableProjection

En insérant une nouvelle table dans la base de données de lecture, on ajoute également la révision de l'événement. Cela va nous permettre de vérifier si l'événement a déjà été traité ou non. Si c'est le cas, on ne fait rien. Ce numéro de révision est incrémenté à chaque fois qu'un événement est publié dans EventStoreDB.

Pour l'événement table-removed, la méthode onTableRemoved est appelée.

private async onTableRemoved(resolvedEvent: AcknowledgeableEventStoreEvent) {
    const eventData = parseTableRemovedEventData(resolvedEvent.data);

    const tableResults = await this.db
      .select({
        revision: tables.revision,
      })
      .from(tables)
      .where(eq(tables.id, eventData.id));

    if (tableResults.length === 0) {
      this.logger.warn(`Table not found: ${eventData.id}`);
      await resolvedEvent.nack('retry', 'Table not found');
      return;
    }

    const table = tableResults[0];

    if (table.revision + 1 !== resolvedEvent.revision) {
      this.logger.warn('Table revision mismatch', {
        tableId: eventData.id,
        currentRevision: table.revision,
        eventRevision: resolvedEvent.revision,
      });
      await resolvedEvent.nack('retry', 'Table revision mismatch');
      return;
    }

    await this.db
      .update(tables)
      .set({
        removedAt: resolvedEvent.createdAt,
        revision: resolvedEvent.revision,
      })
      .where(eq(tables.id, eventData.id))
      .returning({
        updatedId: tables.id,
        removed: tables.removedAt,
      });

    await resolvedEvent.ack();
  }

Code complet - TableProjection

Particularité : on vérifie si la révision de l'événement correspond à la révision de la table. Si ce n'est pas le cas, on rejette l'événement et on le met dans une file d'attente pour le traiter plus tard, les événements étant traités dans l'ordre.

Notre stream table-{tableId} contient également des événements concernant les verrous placés sur les tables. Nous pouvons simplement incrémenter le numéro de révision de la table à chaque fois qu'un verrou est placé ou retiré pour conserver un numéro de révision cohérent.

Désormais, les données concernant les tables sont disponibles dans la base de données de lecture et peuvent être exposées via une API REST.

La liste des créneaux de réservation

Toujours dans le même principe, nous allons utiliser les projections pour lister les créneaux de réservation disponibles.

Les informations que nous souhaitons récupérer sont les suivantes pour un nombre donné de personnes :

  • La date et l'heure du début et de la fin du créneau
  • La liste des tables disponibles

Pour cela, nous pouvons nous appuyer sur deux streams :

  • Le stream table-{tableId} contient les événements concernant une table
  • Le stream booking-{bookingId} contient les événements concernant une réservation

En créant une projection pour chacun de ces streams, nous pouvons ajouter deux tables dans notre base de données de lecture qui nous permettront de récupérer les informations nécessaires.

La première projection à créer est exactement la même que celle que nous avons créée pour récupérer la liste des tables. Elle va permettre au service booker de récupérer la liste des tables.

La seconde projection a pour but de créer une nouvelle table listant le statut de chaque réservation, ainsi nous pouvons savoir si une table est disponible ou non.

Projection des événements pour les réservations

Initialisation de la projection :

 async init() {
    this.logger.log('Initializing BookingProjection');

    const streamName = '$ce-table_booking'; // Abonnement à tous les événements de la catégorie 'table_booking' (tous les streams commençant par 'table_booking-')
    const groupName = this.configService.get<string>(
      'BOOKING_PROJECTION_GROUP_NAME',
    );

    const source$ =
      await this.eventStoreDbService.initPersistentSubscriptionToStream(
        streamName,
        groupName,
      );

    source$
      .pipe(
        groupBy((event) => (event.data as TableBookingEvent['data']).id),
        mergeMap((group$) =>
          group$.pipe(concatMap((event) => this.handleEvent(event))),
        ),
      )
      .subscribe();
  }

Code complet - BookingProjection

Traitement des événements :

private async onTableBookingInitiated(
    resolvedEvent: AcknowledgeableEventStoreEvent,
  ) {
    const eventData = parseTableBookingInitiatedEventData(resolvedEvent.data);

    const currentBooking = await this.db
      .select({
        id: bookings.id,
      })
      .from(bookings)
      .where(eq(bookings.id, eventData.id));

    if (currentBooking.length > 0) {
      await resolvedEvent.ack();
      return;
    }

    await this.db.insert(bookings).values({
      id: eventData.id,
      tableId: eventData.tableId,
      timeSlotFrom: new Date(eventData.timeSlot.from),
      timeSlotTo: new Date(eventData.timeSlot.to),
      status: 'initiated',
      revision: resolvedEvent.revision,
    });

    await resolvedEvent.ack();
  }

  // Plusieurs méthodes pour les autres événements ...

Code complet - BookingProjection

Nous avons à présent deux tables disponibles dans notre base de données de lecture : tables et bookings.

Tables

  • id
  • seats
  • revision
  • removedAt

Bookings

  • id
  • tableId
  • timeSlotFrom
  • timeSlotTo
  • status
  • revision

Il manque une dernière information avant de pouvoir générer une liste de créneaux disponibles : les horaires d'ouverture du restaurant !

Ajoutons un fichier de configuration à notre service booker pour définir ces horaires.

export const timeSlotConfiguration = {
  morning: {
    from: {
      hours: 12,
      minutes: 0,
    },
    to: {
      hours: 14,
      minutes: 0,
    },
  },
  evening: {
    from: {
      hours: 19,
      minutes: 0,
    },
    to: {
      hours: 21,
      minutes: 0,
    },
  },
  timezone: 'Europe/Paris',
} as const;

Toutes les données nécessaires sont à présent disponibles dans notre base de données de lecture. Nous pouvons donc commencer à écrire notre requête dans le but d'obtenir une liste de créneaux disponibles :

const rows: Array<{
      day: string;
      start_time: string;
      end_time: string;
      available_tables: string[];
    }> = await this.db.execute(
      sql`
      -- 1. Création d'une table temporaire contenant les dates de début et de fin de la période de recherche

      WITH date_range AS (
        SELECT
          ${startDate}::date AS start_date,
          ${endDate}::date AS end_date
      ),

      -- 2. Création d'une table temporaire contenant les créneaux horaires

      time_slots AS (
        SELECT
          generate_series(start_date, end_date, INTERVAL '1 day') AS day,
           ${config.morningStartTime} AS morning_start,
          ${config.morningEndTime} AS morning_end,
          ${config.eveningStartTime} AS evening_start,
          ${config.eveningEndTime} AS evening_end
        FROM
          date_range
      ),

      -- 3. Création d'une table temporaire contenant les tables disponibles

      available_tables AS (
        SELECT
          ts.day,
          ts.morning_start AS start_time,
          ts.morning_end AS end_time,
          array_agg(${tables.id}) AS available_tables
        FROM
          time_slots ts
          CROSS JOIN ${tables}
          LEFT JOIN ${bookings} ON (
            ${bookings.tableId} = ${tables.id}
            AND ${bookings.status} != 'cancelled'
            AND ${bookings.timeSlotFrom} = (ts.day + ts.morning_start)::timestamp AT TIME ZONE ${timeZone}
            AND ${bookings.timeSlotTo} = (ts.day + ts.morning_end)::timestamp AT TIME ZONE ${timeZone}
          )
        WHERE
          ${tables.seats} >= ${query.people}
          AND ${bookings.id} IS NULL
        GROUP BY
          ts.day,
          ts.morning_start,
          ts.morning_end
        UNION ALL
        SELECT
          ts.day,
          ts.evening_start AS start_time,
          ts.evening_end AS end_time,
          array_agg(${tables.id}) AS available_tables
        FROM
          time_slots ts
          CROSS JOIN ${tables}
          LEFT JOIN ${bookings} ON (
            ${bookings.tableId} = ${tables.id}
            AND ${bookings.status} != 'cancelled'
            AND ${bookings.timeSlotFrom} = (ts.day + ts.evening_start)::timestamp AT TIME ZONE ${timeZone}
            AND ${bookings.timeSlotTo} = (ts.day + ts.evening_end)::timestamp AT TIME ZONE ${timeZone}
          )
        WHERE
          ${tables.seats} >= ${query.people}
          AND ${bookings.id} IS NULL
        GROUP BY
          ts.day,
          ts.evening_start,
          ts.evening_end
      )

      -- 4. Sélection des créneaux disponibles

      SELECT
        to_char(a.day, 'YYYY-MM-DD') AS day,
        to_char(a.start_time, 'HH24:MI') AS start_time,
        to_char(a.end_time, 'HH24:MI') AS end_time,
        a.available_tables
      FROM
        available_tables a
      ORDER BY
        a.day,
        a.start_time;
      `,
    );

Code complet - ListAvailableBookingSlotsHandler

Note sur la consistance éventuelle

Attention, notre base de données de lecture est mise à jour de manière asynchrone. Elle ne doit pas être considérée comme une source de vérité et ne doit être utilisée que pour des besoins de lecture.

Conclusion

Avec l'ajout de ces projections, nous pouvons mettre à disposition des endpoints pour afficher la liste des tables du restaurant au sein du service table-manager et la liste des créneaux disponibles au sein du service booker.

Grâce à ce type d'architecture, il est possible de créer autant de projections que nécessaire pour répondre aux besoins de lecture de l'application.

Nous pouvons imaginer répondre facilement à des besoins tels que :

  • Combien de réservations ont été effectuées sur une période donnée ?
  • Combien de réservations ont été annulées sur une période donnée ?
  • Combien de personnes ont été servies sur une période donnée ?

L'ensemble de ce projet est disponible sur Github

Vos remarques et commentaires sont les bienvenus, n'hésitez pas à ouvrir une issue sur le repository GitHub si vous avez des questions ou des suggestions d'amélioration.


Profile picture

En tant qu'ingénieur backend Node.js, je trouve une réelle satisfaction à exploiter ma passion pour la programmation afin de concevoir des solutions solides et évolutives. Je relève chaque défi avec détermination, m'efforçant de créer des applications haute performance qui garantissent une expérience utilisateur optimale. Vous pouvez me contacter sur LinkedIn.