Introduction
Dans la première partie de cet article, nous avons vu comment créer un projet NestJS avec EventStoreDB. Nous avons également vu comment traiter des commandes, publier des événements et réagir aux événements publiés. Dans cette deuxième partie, nous allons voir comment créer des projections avec EventStoreDB et NestJS.
Projections
Les projections sont utilisées pour créer des modèles de lecture optimisés pour les besoins de l'application. Les projections sont créées à partir des événements stockés dans EventStoreDB. Elles sont mises à jour en temps réel à chaque fois qu'un événement est publié dans EventStoreDB.
Un exemple commun est celui du solde d'un compte bancaire, qui est mis à jour à chaque fois qu'un dépôt ou un retrait est effectué sur le compte. Dans ce cas, le solde du compte est une projection créée à partir des événements de dépôt et de retrait.
Créer une projection à partir d'un stream
Pour créer une projection à partir d'un stream, nous devons nous abonner à ce stream et consommer les événements publiés dans ce stream. Pour ce faire, nous pouvons utiliser la méthode initPersistentSubscriptionToStream
du module Event-Sourcing créé dans la première partie.
Cette méthode va créer une nouvelle souscription persistante dans le cas où elle n'existe pas. Elle va ensuite retourner un Observable RxJS qui va nous servira à consommer les événements publiés dans le stream.
La liste des tables du restaurant
Nous allons créer une première projection pour récupérer la liste des tables du restaurant.
Une projection peut être initialisée de cette manière
async init() {
this.logger.log('Initializing TableProjection');
const streamName = '$ce-table'; // Abonnement à tous les événements de la catégorie 'table' (tous les streams commençant par 'table-')
const groupName = this.configService.get<string>(
'TABLE_PROJECTION_GROUP_NAME',
);
const source$ =
await this.eventStoreDbService.initPersistentSubscriptionToStream(
streamName,
groupName,
);
source$
.pipe(
groupBy((event) => (event.data as TableEvent['data']).id),
mergeMap((group$) =>
group$.pipe(concatMap((event) => this.handleEvent(event))),
),
)
.subscribe();
}
Code complet - TableProjection
L'utilisation de RxJS est pratique ici, elle nous permet notamment de grouper facilement les événements par id de table. C'est nécessaire ici dans le cas où plusieurs événements se suivent pour une même table : on souhaite attendre que le premier événement soit traité avant de traiter le suivant.
Ensuite, une méthode handleEvent est appelée pour traiter chaque événement.
private async handleEvent(resolvedEvent: AcknowledgeableEventStoreEvent) {
this.logger.debug(`Handling event: ${resolvedEvent.type}`);
try {
switch (resolvedEvent.type as TableEventType) {
case 'table-added':
await this.onTableAdded(resolvedEvent);
break;
case 'table-removed':
await this.onTableRemoved(resolvedEvent);
break;
case 'table-lock-placed':
await this.onTableLockPlaced(resolvedEvent);
break;
case 'table-lock-removed':
await this.onTableLockRemoved(resolvedEvent);
break;
default:
this.logger.warn(`Unhandled event type: ${resolvedEvent.type}`);
}
} catch (error) {
this.logger.error(error, error.stack);
}
}
Code complet - TableProjection
Cette méthode va appeler une méthode spécifique pour chaque type d'événement. Par exemple, pour l'événement table-added
, la méthode onTableAdded
est appelée.
private async onTableAdded(resolvedEvent: AcknowledgeableEventStoreEvent) {
const eventData = parseTableAddedEventData(resolvedEvent.data);
const currentTable = await this.db
.select({
id: tables.id,
})
.from(tables)
.where(eq(tables.id, eventData.id));
if (currentTable.length > 0) {
await resolvedEvent.ack();
return;
}
await this.db.insert(tables).values({
id: eventData.id,
seats: eventData.seats,
revision: resolvedEvent.revision,
});
await resolvedEvent.ack();
}
Code complet - TableProjection
En insérant une nouvelle table dans la base de données de lecture, on ajoute également la révision de l'événement. Cela va nous permettre de vérifier si l'événement a déjà été traité ou non. Si c'est le cas, on ne fait rien. Ce numéro de révision est incrémenté à chaque fois qu'un événement est publié dans EventStoreDB.
Pour l'événement table-removed
, la méthode onTableRemoved
est appelée.
private async onTableRemoved(resolvedEvent: AcknowledgeableEventStoreEvent) {
const eventData = parseTableRemovedEventData(resolvedEvent.data);
const tableResults = await this.db
.select({
revision: tables.revision,
})
.from(tables)
.where(eq(tables.id, eventData.id));
if (tableResults.length === 0) {
this.logger.warn(`Table not found: ${eventData.id}`);
await resolvedEvent.nack('retry', 'Table not found');
return;
}
const table = tableResults[0];
if (table.revision + 1 !== resolvedEvent.revision) {
this.logger.warn('Table revision mismatch', {
tableId: eventData.id,
currentRevision: table.revision,
eventRevision: resolvedEvent.revision,
});
await resolvedEvent.nack('retry', 'Table revision mismatch');
return;
}
await this.db
.update(tables)
.set({
removedAt: resolvedEvent.createdAt,
revision: resolvedEvent.revision,
})
.where(eq(tables.id, eventData.id))
.returning({
updatedId: tables.id,
removed: tables.removedAt,
});
await resolvedEvent.ack();
}
Code complet - TableProjection
Particularité : on vérifie si la révision de l'événement correspond à la révision de la table. Si ce n'est pas le cas, on rejette l'événement et on le met dans une file d'attente pour le traiter plus tard, les événements étant traités dans l'ordre.
Notre stream table-{tableId}
contient également des événements concernant les verrous placés sur les tables. Nous pouvons simplement incrémenter le numéro de révision de la table à chaque fois qu'un verrou est placé ou retiré pour conserver un numéro de révision cohérent.
Désormais, les données concernant les tables sont disponibles dans la base de données de lecture et peuvent être exposées via une API REST.
La liste des créneaux de réservation
Toujours dans le même principe, nous allons utiliser les projections pour lister les créneaux de réservation disponibles.
Les informations que nous souhaitons récupérer sont les suivantes pour un nombre donné de personnes :
- La date et l'heure du début et de la fin du créneau
- La liste des tables disponibles
Pour cela, nous pouvons nous appuyer sur deux streams :
- Le stream
table-{tableId}
contient les événements concernant une table - Le stream
booking-{bookingId}
contient les événements concernant une réservation
En créant une projection pour chacun de ces streams, nous pouvons ajouter deux tables dans notre base de données de lecture qui nous permettront de récupérer les informations nécessaires.
La première projection à créer est exactement la même que celle que nous avons créée pour récupérer la liste des tables.
Elle va permettre au service booker
de récupérer la liste des tables.
La seconde projection a pour but de créer une nouvelle table listant le statut de chaque réservation, ainsi nous pouvons savoir si une table est disponible ou non.
Initialisation de la projection :
async init() {
this.logger.log('Initializing BookingProjection');
const streamName = '$ce-table_booking'; // Abonnement à tous les événements de la catégorie 'table_booking' (tous les streams commençant par 'table_booking-')
const groupName = this.configService.get<string>(
'BOOKING_PROJECTION_GROUP_NAME',
);
const source$ =
await this.eventStoreDbService.initPersistentSubscriptionToStream(
streamName,
groupName,
);
source$
.pipe(
groupBy((event) => (event.data as TableBookingEvent['data']).id),
mergeMap((group$) =>
group$.pipe(concatMap((event) => this.handleEvent(event))),
),
)
.subscribe();
}
Code complet - BookingProjection
Traitement des événements :
private async onTableBookingInitiated(
resolvedEvent: AcknowledgeableEventStoreEvent,
) {
const eventData = parseTableBookingInitiatedEventData(resolvedEvent.data);
const currentBooking = await this.db
.select({
id: bookings.id,
})
.from(bookings)
.where(eq(bookings.id, eventData.id));
if (currentBooking.length > 0) {
await resolvedEvent.ack();
return;
}
await this.db.insert(bookings).values({
id: eventData.id,
tableId: eventData.tableId,
timeSlotFrom: new Date(eventData.timeSlot.from),
timeSlotTo: new Date(eventData.timeSlot.to),
status: 'initiated',
revision: resolvedEvent.revision,
});
await resolvedEvent.ack();
}
// Plusieurs méthodes pour les autres événements ...
Code complet - BookingProjection
Nous avons à présent deux tables disponibles dans notre base de données de lecture : tables
et bookings
.
Tables
- id
- seats
- revision
- removedAt
Bookings
- id
- tableId
- timeSlotFrom
- timeSlotTo
- status
- revision
Il manque une dernière information avant de pouvoir générer une liste de créneaux disponibles : les horaires d'ouverture du restaurant !
Ajoutons un fichier de configuration à notre service booker
pour définir ces horaires.
export const timeSlotConfiguration = {
morning: {
from: {
hours: 12,
minutes: 0,
},
to: {
hours: 14,
minutes: 0,
},
},
evening: {
from: {
hours: 19,
minutes: 0,
},
to: {
hours: 21,
minutes: 0,
},
},
timezone: 'Europe/Paris',
} as const;
Toutes les données nécessaires sont à présent disponibles dans notre base de données de lecture. Nous pouvons donc commencer à écrire notre requête dans le but d'obtenir une liste de créneaux disponibles :
const rows: Array<{
day: string;
start_time: string;
end_time: string;
available_tables: string[];
}> = await this.db.execute(
sql`
-- 1. Création d'une table temporaire contenant les dates de début et de fin de la période de recherche
WITH date_range AS (
SELECT
${startDate}::date AS start_date,
${endDate}::date AS end_date
),
-- 2. Création d'une table temporaire contenant les créneaux horaires
time_slots AS (
SELECT
generate_series(start_date, end_date, INTERVAL '1 day') AS day,
${config.morningStartTime} AS morning_start,
${config.morningEndTime} AS morning_end,
${config.eveningStartTime} AS evening_start,
${config.eveningEndTime} AS evening_end
FROM
date_range
),
-- 3. Création d'une table temporaire contenant les tables disponibles
available_tables AS (
SELECT
ts.day,
ts.morning_start AS start_time,
ts.morning_end AS end_time,
array_agg(${tables.id}) AS available_tables
FROM
time_slots ts
CROSS JOIN ${tables}
LEFT JOIN ${bookings} ON (
${bookings.tableId} = ${tables.id}
AND ${bookings.status} != 'cancelled'
AND ${bookings.timeSlotFrom} = (ts.day + ts.morning_start)::timestamp AT TIME ZONE ${timeZone}
AND ${bookings.timeSlotTo} = (ts.day + ts.morning_end)::timestamp AT TIME ZONE ${timeZone}
)
WHERE
${tables.seats} >= ${query.people}
AND ${bookings.id} IS NULL
GROUP BY
ts.day,
ts.morning_start,
ts.morning_end
UNION ALL
SELECT
ts.day,
ts.evening_start AS start_time,
ts.evening_end AS end_time,
array_agg(${tables.id}) AS available_tables
FROM
time_slots ts
CROSS JOIN ${tables}
LEFT JOIN ${bookings} ON (
${bookings.tableId} = ${tables.id}
AND ${bookings.status} != 'cancelled'
AND ${bookings.timeSlotFrom} = (ts.day + ts.evening_start)::timestamp AT TIME ZONE ${timeZone}
AND ${bookings.timeSlotTo} = (ts.day + ts.evening_end)::timestamp AT TIME ZONE ${timeZone}
)
WHERE
${tables.seats} >= ${query.people}
AND ${bookings.id} IS NULL
GROUP BY
ts.day,
ts.evening_start,
ts.evening_end
)
-- 4. Sélection des créneaux disponibles
SELECT
to_char(a.day, 'YYYY-MM-DD') AS day,
to_char(a.start_time, 'HH24:MI') AS start_time,
to_char(a.end_time, 'HH24:MI') AS end_time,
a.available_tables
FROM
available_tables a
ORDER BY
a.day,
a.start_time;
`,
);
Code complet - ListAvailableBookingSlotsHandler
Note sur la consistance éventuelle
Attention, notre base de données de lecture est mise à jour de manière asynchrone. Elle ne doit pas être considérée comme une source de vérité et ne doit être utilisée que pour des besoins de lecture.
Conclusion
Avec l'ajout de ces projections, nous pouvons mettre à disposition des endpoints pour afficher la liste des tables du restaurant au sein du service table-manager
et la liste des créneaux disponibles au sein du service booker
.
Grâce à ce type d'architecture, il est possible de créer autant de projections que nécessaire pour répondre aux besoins de lecture de l'application.
Nous pouvons imaginer répondre facilement à des besoins tels que :
- Combien de réservations ont été effectuées sur une période donnée ?
- Combien de réservations ont été annulées sur une période donnée ?
- Combien de personnes ont été servies sur une période donnée ?
L'ensemble de ce projet est disponible sur Github
Vos remarques et commentaires sont les bienvenus, n'hésitez pas à ouvrir une issue sur le repository GitHub si vous avez des questions ou des suggestions d'amélioration.