Introduction
In the first part of this article, we saw how to create a NestJS project with EventStoreDB. We also learned how to handle commands, publish events, and react to published events. In this second part, we will explore how to create projections with EventStoreDB and NestJS.
Projections
Projections are used to create optimized read models for application requirements. Projections are built from events stored in EventStoreDB. They are updated in real-time whenever an event is published in EventStoreDB.
A common example is the balance of a bank account, which gets updated whenever a deposit or withdrawal is made. In this case, the account balance is a projection created from deposit and withdrawal events.
Creating a Projection from a Stream
To create a projection from a stream, we need to subscribe to that stream and consume the events published in it. For this purpose, we can use the initPersistentSubscriptionToStream
method from the Event-Sourcing module created in the first part.
This method will create a new persistent subscription if it doesn't exist. It will then return an RxJS Observable that we can use to consume the events published in the stream.
Restaurant Table List
We will create a initial projection to retrieve the list of tables in the restaurant.
A projection can be initialized as follows:
async init() {
this.logger.log('Initializing TableProjection');
const streamName = '$ce-table'; // Subscribe to all events in the 'table' category (all streams starting with 'table-')
const groupName = this.configService.get<string>(
'TABLE_PROJECTION_GROUP_NAME',
);
const source$ =
await this.eventStoreDbService.initPersistentSubscriptionToStream(
streamName,
groupName,
);
source$
.pipe(
groupBy((event) => (event.data as TableEvent['data']).id),
mergeMap((group$) =>
group$.pipe(concatMap((event) => this.handleEvent(event))),
),
)
.subscribe();
}
The use of RxJS here is convenient, especially for easily grouping events by table id. This is necessary when multiple events follow each other for the same table: we want to wait for the first event to be processed before handling the next one.
Next, a handleEvent
method is called to process each event.
private async handleEvent(resolvedEvent: AcknowledgeableEventStoreEvent) {
this.logger.debug(`Handling event: ${resolvedEvent.type}`);
try {
switch (resolvedEvent.type as TableEventType) {
case 'table-added':
await this.onTableAdded(resolvedEvent);
break;
case 'table-removed':
await this.onTableRemoved(resolvedEvent);
break;
case 'table-lock-placed':
await this.onTableLockPlaced(resolvedEvent);
break;
case 'table-lock-removed':
await this.onTableLockRemoved(resolvedEvent);
break;
default:
this.logger.warn(`Unhandled event type: ${resolvedEvent.type}`);
}
} catch (error) {
this.logger.error(error, error.stack);
}
}
This method calls a specific method for each event type. For example, for the table-added
event, the onTableAdded
method is called.
private async onTableAdded(resolvedEvent: AcknowledgeableEventStoreEvent) {
const eventData = parseTableAddedEventData(resolvedEvent.data);
const currentTable = await this.db
.select({
id: tables.id,
})
.from(tables)
.where(eq(tables.id, eventData.id));
if (currentTable.length > 0) {
await resolvedEvent.ack();
return;
}
await this.db.insert(tables).values({
id: eventData.id,
seats: eventData.seats,
revision: resolvedEvent.revision,
});
await resolvedEvent.ack();
}
By inserting a new table into the read database, we also add the event's revision. This allows us to check if the event has already been processed. If so, we do nothing. This revision number is incremented every time an event is published in EventStoreDB.
For the table-removed
event, the onTableRemoved
method is called.
private async onTableRemoved(resolvedEvent: AcknowledgeableEventStoreEvent) {
const eventData = parseTableRemovedEventData(resolvedEvent.data);
const tableResults = await this.db
.select({
revision: tables.revision,
})
.from(tables)
.where(eq(tables.id, eventData.id));
if (tableResults.length === 0) {
this.logger.warn(`Table not found: ${eventData.id}`);
await resolvedEvent.nack('retry', 'Table not found');
return;
}
const table = tableResults[0];
if (table.revision + 1 !== resolvedEvent.revision) {
this.logger.warn('Table revision mismatch', {
tableId: eventData.id,
currentRevision: table.revision,
eventRevision: resolvedEvent.revision,
});
await resolvedEvent.nack('retry', 'Table revision mismatch');
return;
}
await this.db
.update(tables)
.set({
removedAt: resolvedEvent.createdAt,
revision: resolvedEvent.revision,
})
.where(eq(tables.id, eventData.id))
.returning({
updatedId: tables.id,
removed: tables.removedAt,
});
await resolvedEvent.ack();
}
A special note: we check if the event's revision matches the table's revision. If not, we reject the event and put it in a retry queue for later processing, as events are processed in order.
Our table-{tableId}
stream also contains events related to locks placed on the tables. We can simply increment the table's revision number whenever a lock is placed or removed to maintain a consistent revision number.
Now, data about the tables is available in the read database and can be exposed through a REST API.
The booking slot list
Continuing with the same approach, we will utilize projections to list available booking slots.
The information we want to retrieve includes the following for a given number of people:
- The start and end date and time of the slot
- The list of available tables
For this purpose, we can rely on two streams:
- The
table-{tableId}
stream contains events related to a table - The
booking-{bookingId}
stream contains events related to a booking
By creating a projection for each of these streams, we can add two tables to our read database that will allow us to retrieve the necessary information.
The first projection to create is exactly the same as the one we created to retrieve the list of tables. It will allow the booker
service to retrieve the list of tables.
The second projection aims to create a new table listing the status of each booking, so we can determine if a table is available or not.
Projection Initialization:
async init() {
this.logger.log('Initializing BookingProjection');
const streamName = '$ce-table_booking'; // Subscribe to all events in the 'table_booking' category (all streams starting with 'table_booking-')
const groupName = this.configService.get<string>(
'BOOKING_PROJECTION_GROUP_NAME',
);
const source$ =
await this.eventStoreDbService.initPersistentSubscriptionToStream(
streamName,
groupName,
);
source$
.pipe(
groupBy((event) => (event.data as TableBookingEvent['data']).id),
mergeMap((group$) =>
group$.pipe(concatMap((event) => this.handleEvent(event))),
),
)
.subscribe();
}
Event Handling:
private async onTableBookingInitiated(
resolvedEvent: AcknowledgeableEventStoreEvent,
) {
const eventData = parseTableBookingInitiatedEventData(resolvedEvent.data);
const currentBooking = await this.db
.select({
id: bookings.id,
})
.from(bookings)
.where(eq(bookings.id, eventData.id));
if (currentBooking.length > 0) {
await resolvedEvent.ack();
return;
}
await this.db.insert(bookings).values({
id: eventData.id,
tableId: eventData.tableId,
timeSlotFrom: new Date(eventData.timeSlot.from),
timeSlotTo: new Date(eventData.timeSlot.to),
status: 'initiated',
revision: resolvedEvent.revision,
});
await resolvedEvent.ack();
}
// More event handling methods for booking cancellation and confirmation...
Now we have two tables available in our read database: tables
and bookings
.
Tables
- id
- seats
- revision
- removedAt
Bookings
- id
- tableId
- timeSlotFrom
- timeSlotTo
- status
- revision
Before we can generate a list of available slots, there's one more piece of information we need: the restaurant's opening hours!
Let's add a configuration file to our booker
service to define these hours.
export const timeSlotConfiguration = {
morning: {
from: {
hours: 12,
minutes: 0,
},
to: {
hours: 14,
minutes: 0,
},
},
evening: {
from: {
hours: 19,
minutes: 0,
},
to: {
hours: 21,
minutes: 0,
},
},
timezone: 'Europe/Paris',
} as const;
All the necessary data is now available in our read database. We can start writing our query to obtain a list of available slots:
const rows: Array<{
day: string;
start_time: string;
end_time: string;
available_tables: string[];
}> = await this.db.execute(
sql`
-- 1. Create a temporary table containing the start and end dates of the search period
WITH date_range AS (
SELECT
${startDate}::date AS start_date,
${endDate}::date AS end_date
),
-- 2. Create a temporary table containing time slots
time_slots AS (
SELECT
generate_series(start_date, end_date, INTERVAL '1 day') AS day,
${config.morningStartTime} AS morning_start,
${config.morningEndTime} AS morning_end,
${config.eveningStartTime} AS evening_start,
${config.eveningEndTime} AS evening_end
FROM
date_range
),
-- 3. Create a temporary table containing available tables
available_tables AS (
SELECT
ts.day,
ts.morning_start AS start_time,
ts.morning_end AS end_time,
array_agg(${tables.id}) AS available_tables
FROM
time_slots ts
CROSS JOIN ${tables}
LEFT JOIN ${bookings} ON (
${bookings.tableId} = ${tables.id}
AND ${bookings.status} != 'cancelled'
AND ${bookings.timeSlotFrom} = (ts.day + ts.morning_start)::timestamp AT TIME ZONE ${timeZone}
AND ${bookings.timeSlotTo} = (ts.day + ts.morning_end)::timestamp AT TIME ZONE ${timeZone}
)
WHERE
${tables.seats} >= ${query.people}
AND ${bookings.id} IS NULL
GROUP BY
ts.day,
ts.morning_start,
ts.morning_end
UNION ALL
-- Similar query for evening slots...
)
-- 4. Select available slots
SELECT
to_char(a.day, 'YYYY-MM-DD') AS day,
to_char(a.start_time, 'HH24:MI') AS start_time,
to_char(a.end_time, 'HH24:MI') AS end_time,
a.available_tables
FROM
available_tables a
ORDER BY
a.day,
a.start_time;
`,
);
Full code - ListAvailableBookingSlotsHandler
Note on Eventual Consistency
Please note that our read database is updated asynchronously. It should not be considered a source of truth and should only be used for reading purposes.
Conclusion
With the addition of these projections, we can provide endpoints to display the list of restaurant tables within the table-manager
service and the list of available slots within the booker
service.
With this type of architecture, it's possible to create as many projections as needed to fulfill the application's reading requirements.
We can easily address needs like:
- How many bookings have been made during a given period?
- How many bookings have been canceled during a given period?
- How many people have been served during a given period?
The entire project is available on Github.
Your feedback and comments are welcome. Feel free to open an issue on the GitHub repository if you have any questions or improvement suggestions.