Introduction
Dans cet article, nous allons nous intéresser à l'implémentation d'une architecture Event-Sourcing avec NestJS et EventStoreDB dans le cadre d'une application de réservation de tables dans un restaurant.
Cette étude sera composée de deux parties :
- Dans la première partie, nous allons nous intéresser uniquement au modèle d'écriture des données. Nous allons voir comment écrire les données dans EventStoreDB, et comment les lire.
- Dans la deuxième partie, nous allons nous intéresser au modèle de lecture des données. Nous allons voir comment lire les données dans EventStoreDB, et comment les projeter dans une base de données de lecture.
CQRS et Event-Sourcing : Des patterns d'architecture pour une meilleure séparation des responsabilités
CQRS (Command and Query Responsibility Segregation)
Le CQRS est un pattern d'architecture puissant qui vise à séparer les opérations de lecture (queries) des opérations d'écriture (commands) au sein d'une application. Cette approche présente plusieurs avantages majeurs pour la conception et la performance du système. Il s'intègre souvent harmonieusement avec l'architecture Event-Sourcing.
Event-Sourcing
L'architecture Event-Sourcing constitue un complément naturel au CQRS. Elle consiste à stocker les événements métiers dans un système de stockage dédié appelé "EventStore". L'idée fondamentale est de conserver un enregistrement de tous les événements significatifs qui se produisent dans l'application.
L'utilisation combinée du CQRS et de l'Event-Sourcing offre une architecture robuste, modulaire et performante, permettant une meilleure séparation des responsabilités, une gestion plus efficace des problèmes de concurrence, et une traçabilité accrue des changements dans l'application. Ces patterns sont devenus des choix privilégiés pour de nombreuses applications modernes, offrant des avantages significatifs en termes de maintenance, de flexibilité et de scalabilité.
Présentation du projet
Dans notre projet d'exemple, nous allons créer une API qui permet de gérer les réservations de tables dans un restaurant. Le gérant du restaurant pourra ajouter ou supprimer des tables, et les clients pourront réserver une table selon une liste de créneaux définis.
Arborescence du projet
Le projet est composé de deux services : un service booker
qui permet de gérer les réservations de tables, et un service table-manager
qui permet de gérer les tables du restaurant.
Les deux services communiqueront entre eux via des événements. Les événements seront stockés dans un EventStore
.
Deux librairies seront créées dans un dossier shared
pour gérer les événements et la communication entre les services. Ces librairies seront partagées dans un workspace pnpm.
Voici l'arborescence du projet :
services
├── booker
├── table-manager
shared
├── event-sourcing
├── events
Module Event-Sourcing
Afin de simplifier la communication entre les services, nous allons créer un module EventSourcing
qui permettra de gérer la communication entre les services.
Ce module exporte un service EventStoreService
qui permet de :
- Publier un événement
- Lire les événements d'un stream
- Souscrire à un stream
Ce service repose essentiellement sur le client NodeJS d'EventStoreDB qui est injecté dans le service.
@Injectable()
export class EventStoreDbService implements EventStoreService {
constructor(
@Inject(EVENT_STORE_DB_CLIENT)
private readonly client: EventStoreDBClient,
) {}
}
Github: Code complet - EventStoreDbService
Publier un événement
Pour publier un événement, on utilise la méthode appendToStream
du client EventStoreDB.
async publish<T, X>(event: Event<T, X>): Promise<void> {
const jsonEvent = this.mapEventToJsonEvent(event);
await this.client.appendToStream(event.streamName, jsonEvent);
}
Github: Code complet - EventStoreDbService
La méthode mapEventToJsonEvent
permet de convertir un événement en un objet JSON qui sera stocké dans EventStoreDB.
Lecture des événements d'un stream
Pour lire les événements d'un stream, on utilise la méthode readStream
du client EventStoreDB.
Cette méthode retourne un Observable qui permet de lire les événements du stream.
readStream(
streamName: string,
options?: {
maxCount?: number;
},
): Observable<EventStoreEvent> {
const stream = this.client.readStream(streamName, options);
const observable = new Observable<EventStoreEvent>((subscriber) => {
stream.on('data', (event) => {
subscriber.next(
new EventStoreEvent({
data: event.event?.data,
metadata: event.event?.metadata,
type: event.event?.type,
}),
);
});
stream.on('error', (error) => {
subscriber.error(error);
});
stream.on('end', () => {
subscriber.complete();
});
return () => {
stream.cancel();
};
});
return observable;
}
Github: Full code - EventStoreDbService
Souscrire à un stream
Pour souscrire à un stream, on vérifie si la souscription existe déjà, et on la crée si elle n'existe pas. Ensuite, on retourne un Observable qui permet de lire les événements du stream au fur et à mesure qu'ils arrivent.
async initPersistentSubscriptionToStream(
streamName: string,
groupName: string,
): Promise<Observable<AcknowledgeableEventStoreEvent>> {
const currentSubscriptions =
await this.client.listAllPersistentSubscriptions();
const existingSubscription = currentSubscriptions.find(
(sub) => sub.groupName === groupName && sub.eventSource === streamName,
);
if (!existingSubscription) {
await this.client.createPersistentSubscriptionToStream(
streamName,
groupName,
{
checkPointAfter: 2000,
checkPointLowerBound: 10,
checkPointUpperBound: 1000,
consumerStrategyName: 'RoundRobin',
extraStatistics: true,
historyBufferSize: 500,
liveBufferSize: 500,
maxRetryCount: 10,
maxSubscriberCount: 'unbounded',
messageTimeout: 30000,
readBatchSize: 20,
resolveLinkTos: true,
startFrom: 'start',
},
);
}
const persistentSubscription =
this.client.subscribeToPersistentSubscriptionToStream(
streamName,
groupName,
);
const observable = new Observable<AcknowledgeableEventStoreEvent>(
(subscriber) => {
persistentSubscription.on('data', (event) => {
subscriber.next(
new AcknowledgeableEventStoreEvent(
{
type: event.event?.type,
data: event.event?.data,
metadata: event.event?.metadata,
retryCount: event.retryCount,
revision: Number(event.event.revision),
createdAt: event.event?.created,
},
{
ack: () => persistentSubscription.ack(event),
nack: (action, message) =>
persistentSubscription.nack(action, message, event),
},
),
);
});
persistentSubscription.on('error', (error) => {
subscriber.error(error);
});
persistentSubscription.on('end', () => {
subscriber.complete();
});
return () => {
persistentSubscription.unsubscribe();
};
},
);
return observable;
}
Github: Full code - EventStoreDbService
Implémentation des fonctionnalités
Ajouter / supprimer des tables au restaurant
Chaque table possède un identifiant unique et un nombre de places. Cet identifiant unique va nous permettre de constituer le nom du stream dans lequel nous allons stocker les événements liés à cette table.
Par exemple, si l'identifiant de la table est table-1
, le nom du stream sera table-1
.
Le stream contiendra les événements d'ajout et de suppression de la table.
Ajouter une table
Lorsqu'on ajoute une table, on publie un événement TableAddedEvent
dans le stream de la table.
export class TableAddedEvent extends TableBaseEvent<
TableAddedEventData,
"table-added"
> {
constructor(data: { id: string; seats: number }) {
super({
data,
type: "table-added",
version: 1,
});
}
}
Github: Code complet - Table added event
La classe abstraite TableBaseEvent
permet notamment de définir le nom du stream dans lequel l'événement sera stocké.
export abstract class TableBaseEvent<Data extends { id: string }, Type> extends Event<
Data,
Type
> {
static STREAM_PREFIX = "table";
constructor({
data,
type,
version,
metadata,
}: {
data: Data;
type: Type;
version: number;
metadata?: EventMetadata;
}) {
super({
data,
type,
version,
streamName: TableBaseEvent.buildStreamName(data.id),
metadata,
});
}
static buildStreamName(id: string): string {
return `${TableBaseEvent.STREAM_PREFIX}-${id}`;
}
}
Github: Code complet - Table base event
Notre controller va instancier une commande AddTableCommand
:
@Post('tables')
async addTable(@Body() body: AddTableRequest): Promise<AddTableResponse> {
const command = new AddTableCommand(body.id, body.seats);
await this.commandBus.execute<AddTableCommand, Table>(
command,
);
}
Github: Code complet - Add table controller
Cette commande est traitée par un handler AddTableHandler
qui va :
- Instancier un aggregate
Table
avec l'identifiant de la table - Appeler la méthode
add
de l'aggregate pour ajouter la table - Publier les événements générés par l'aggregate vers notre EventStore
async execute(command: AddTableCommand): Promise<Table> {
const table = new Table(command.id);
table.add(command.seats);
await this.tableEventStoreRepository.publish(table.getUncommittedEvents());
return table;
}
Github: Code complet - Add table handler
En appelant la méthode add, l'aggregate va générer un événement TableAddedEvent
et l'ajouter à la liste des événements non commités. La méthode apply permet d'appliquer l'événement à l'aggregate, en appelant la méthode onTableAddedEvent
.
export class Table extends AggregateRoot<TableEvent> {
public seats: number;
constructor(public id: string) {
if (id.length === 0) {
throw new InvalidTableIdException(id);
}
super();
}
add(seats: number) {
this.apply(
new TableAddedEvent({
id: this.id,
seats,
}),
);
}
private onTableAddedEvent(event: TableAddedEvent) {
this.id = event.data.id;
this.seats = event.data.seats;
}
protected getEventHandler<T extends TableEvent>(
event: T,
): Type<IEventHandler<T>> {
switch (event.type) {
case 'table-added':
return this.onTableAddedEvent.bind(this);
default:
throw new Error(`No handler defined for event type ${event.type}`);
}
}
}
Github: Code complet - Table aggregate
Comment vérifier qu'une table avec le même identifiant n'existe pas déjà ?
Il suffit de lire les événements du stream de la table et de les appliquer à l'aggregate pour reconstituer son état. Si l'aggregate existe déjà, on lève une exception.
Pour appliquer ces événements, nous pouvons ajouter une méthode statique fromEvents
à notre aggregate.
static fromEventsHistory(events: TableEvent[]) {
const table = new Table(events[0].data.id);
for (const event of events) {
table.apply(event, true);
}
return table;
}
Github: Code complet - Table aggregate
Cette méthode va appliquer les événements un à un, et donc constituer l'état de l'aggregate. Elle peut être appelée depuis un repository, chargé de récupérer les événements d'un stream grâce à au module EventSourcing
.
Ainsi, nous pouvons ajouter une vérification dans notre handler AddTableHandler
:
async execute(command: AddTableCommand): Promise<Table> {
const table = await this.tableEventStoreRepository.getById(command.id);
if (table) {
throw new TableAlreadyExistsException(command.id);
}
const table = new Table(command.id);
table.add(command.seats);
await this.tableEventStoreRepository.publish(table.getUncommittedEvents());
return table;
}
Github: Code complet - Add table handler
Supprimer une table
Lorsqu'on supprime une table, on publie un événement TableRemovedEvent
dans le stream de la table.
export class TableRemovedEvent extends TableBaseEvent<
TableRemovedEventData,
"table-removed"
> {
constructor(data: { id: string }) {
super({
data,
type: "table-removed",
version: 1,
});
}
}
La suppression d'une table est gérée de la même manière que l'ajout d'une table, à la différence que l'aggregate va générer un événement TableRemovedEvent
lorsqu'on appelle la méthode remove
.
Voici le handler RemoveTableHandler
:
async execute(command: RemoveTableCommand): Promise<void> {
const table = await this.tableEventStoreRepository.findTableById(
command.id,
);
if (!table) {
throw new TableNotFoundError(command.id);
}
table.remove();
await this.tableEventStoreRepository.publish(table.getUncommittedEvents());
}
Github: Code complet - Remove table handler
Réserver une table
Initier une réservation
A partir d'un planning listant les créneaux disponibles, le client peut choisir un créneau et initier une réservation.
Une commande est créée et envoyée au service booker
:
Comment s'assurer que la table n'a pas été supprimée entre temps ? Comment s'assurer que la table ne va pas être supprimée alors qu'elle est réservée ?
Pour répondre à ces questions, nous allons utiliser un système de verrouillage de la table. Lorsqu'une table est réservée, elle est verrouillée. Tant qu'elle est verrouillée, elle ne peut pas être supprimée.
Verrouiller une table et confirmer la réservation
Afin de gérer ces problèmes de concurrence, nous pouvons utiliser le pattern Choreography-based saga
pour placer un verrou sur la table et confirmer la réservation.
Choerography-based saga pattern documentation
Ainsi, nous pouvons nous assurer que la table n'est pas supprimée entre temps, et que la réservation est confirmée.
Implémentation côté table-manager
Tous les événements de type TableBookingInitiatedEvent
sont écoutés par la saga TableLockingSaga
. Lorsqu'un événement de ce type est reçu, la saga va placer un verrou sur la table et publier un événement TableLockPlacedEvent
.
async onTableBookingInitiated(resolvedEvent: AcknowledgeableEventStoreEvent) {
const eventData = parseTableBookingInitiatedEventData(resolvedEvent.data);
const eventMetadata = resolvedEvent.metadata as JSONMetadata;
const command = new PlaceTableLockCommand(
eventData.tableId,
eventData.timeSlot,
eventMetadata.$correlationId,
);
try {
await this.commandBus.execute(command);
} catch (error) {
if (error instanceof TableNotFoundError) {
await resolvedEvent.ack();
return;
}
throw error;
}
await resolvedEvent.ack();
}
Github: Code complet - Table locking saga
Implémentation côté booker
Tous les événements de type TableLockPlacedEvent
sont écoutés par la saga TableBookingSaga
qui va confirmer la réservation en publiant un événement TableBookingConfirmedEvent
.
async onTableLockPlaced(resolvedEvent: AcknowledgeableEventStoreEvent) {
const eventMetadata = resolvedEvent.metadata as JSONMetadata;
const command = new ConfirmTableBookingCommand(
eventMetadata.$correlationId,
);
await this.commandBus.execute(command);
await resolvedEvent.ack();
}
Github: Code complet - Table booking saga
Déverouiller une table
Lorsqu'une réservation est annulée, où que le créneau est passé, la table est déverrouillée.
Scénario 1 : La réservation est annulée
Scénario 2 : Le créneau est passé
Dans cet article, nous avons vu comment implémenter une architecture Event-Sourcing avec NestJS et EventStoreDB. Dans la prochaine partie, nous allons voir comment lire les événements stockés dans EventStoreDB et les projeter dans une base de données de lecture.
Le code complet de ce projet est disponible sur GitHub : Github: Event-Sourcing with NestJS and EventStoreDB
Vos remarques et commentaires sont les bienvenus, n'hésitez pas à ouvrir une issue sur le repository GitHub si vous avez des questions ou des suggestions d'amélioration.