Event-Sourcing with NestJS and EventStoreDB (Part 2: projections)

August 10, 2023

Introduction

In the first part of this article, we saw how to create a NestJS project with EventStoreDB. We also learned how to handle commands, publish events, and react to published events. In this second part, we will explore how to create projections with EventStoreDB and NestJS.

Projections

Projections are used to create optimized read models for application requirements. Projections are built from events stored in EventStoreDB. They are updated in real-time whenever an event is published in EventStoreDB.

A common example is the balance of a bank account, which gets updated whenever a deposit or withdrawal is made. In this case, the account balance is a projection created from deposit and withdrawal events.

Example of a projection for an account balance

Creating a Projection from a Stream

To create a projection from a stream, we need to subscribe to that stream and consume the events published in it. For this purpose, we can use the initPersistentSubscriptionToStream method from the Event-Sourcing module created in the first part.

This method will create a new persistent subscription if it doesn't exist. It will then return an RxJS Observable that we can use to consume the events published in the stream.

Restaurant Table List

We will create a initial projection to retrieve the list of tables in the restaurant.

Projection of events for restaurant tables

A projection can be initialized as follows:

async init() {
  this.logger.log('Initializing TableProjection');

  const streamName = '$ce-table'; // Subscribe to all events in the 'table' category (all streams starting with 'table-')

  const groupName = this.configService.get<string>(
    'TABLE_PROJECTION_GROUP_NAME',
  );

  const source$ =
    await this.eventStoreDbService.initPersistentSubscriptionToStream(
      streamName,
      groupName,
    );

  source$
    .pipe(
      groupBy((event) => (event.data as TableEvent['data']).id),
      mergeMap((group$) =>
        group$.pipe(concatMap((event) => this.handleEvent(event))),
      ),
    )
    .subscribe();
}

Full code - TableProjection

The use of RxJS here is convenient, especially for easily grouping events by table id. This is necessary when multiple events follow each other for the same table: we want to wait for the first event to be processed before handling the next one.

Next, a handleEvent method is called to process each event.

private async handleEvent(resolvedEvent: AcknowledgeableEventStoreEvent) {
    this.logger.debug(`Handling event: ${resolvedEvent.type}`);

    try {
      switch (resolvedEvent.type as TableEventType) {
        case 'table-added':
          await this.onTableAdded(resolvedEvent);
          break;
        case 'table-removed':
          await this.onTableRemoved(resolvedEvent);
          break;
        case 'table-lock-placed':
          await this.onTableLockPlaced(resolvedEvent);
          break;
        case 'table-lock-removed':
          await this.onTableLockRemoved(resolvedEvent);
          break;
        default:
          this.logger.warn(`Unhandled event type: ${resolvedEvent.type}`);
      }
    } catch (error) {
      this.logger.error(error, error.stack);
    }
  }

Full code - TableProjection

This method calls a specific method for each event type. For example, for the table-added event, the onTableAdded method is called.

private async onTableAdded(resolvedEvent: AcknowledgeableEventStoreEvent) {
    const eventData = parseTableAddedEventData(resolvedEvent.data);

    const currentTable = await this.db
      .select({
        id: tables.id,
      })
      .from(tables)
      .where(eq(tables.id, eventData.id));

    if (currentTable.length > 0) {
      await resolvedEvent.ack();
      return;
    }

    await this.db.insert(tables).values({
      id: eventData.id,
      seats: eventData.seats,
      revision: resolvedEvent.revision,
    });

    await resolvedEvent.ack();
  }

Full code - TableProjection

By inserting a new table into the read database, we also add the event's revision. This allows us to check if the event has already been processed. If so, we do nothing. This revision number is incremented every time an event is published in EventStoreDB.

For the table-removed event, the onTableRemoved method is called.

private async onTableRemoved(resolvedEvent: AcknowledgeableEventStoreEvent) {
    const eventData = parseTableRemovedEventData(resolvedEvent.data);

    const tableResults = await this.db
      .select({
        revision: tables.revision,
      })
      .from(tables)
      .where(eq(tables.id, eventData.id));

    if (tableResults.length === 0) {
      this.logger.warn(`Table not found: ${eventData.id}`);
      await resolvedEvent.nack('retry', 'Table not found');
      return;
    }

    const table = tableResults[0];

    if (table.revision + 1 !== resolvedEvent.revision) {
      this.logger.warn('Table revision mismatch', {
        tableId: eventData.id,
        currentRevision: table.revision,
        eventRevision: resolvedEvent.revision,
      });
      await resolvedEvent.nack('retry', 'Table revision mismatch');
      return;
    }

    await this.db
      .update(tables)
      .set({
        removedAt: resolvedEvent.createdAt,
        revision: resolvedEvent.revision,
      })
      .where(eq(tables.id, eventData.id))
      .returning({
        updatedId: tables.id,
        removed: tables.removedAt,
      });

    await resolvedEvent.ack();
  }

Full code - TableProjection

A special note: we check if the event's revision matches the table's revision. If not, we reject the event and put it in a retry queue for later processing, as events are processed in order.

Our table-{tableId} stream also contains events related to locks placed on the tables. We can simply increment the table's revision number whenever a lock is placed or removed to maintain a consistent revision number.

Now, data about the tables is available in the read database and can be exposed through a REST API.

The booking slot list

Continuing with the same approach, we will utilize projections to list available booking slots.

The information we want to retrieve includes the following for a given number of people:

  • The start and end date and time of the slot
  • The list of available tables

For this purpose, we can rely on two streams:

  • The table-{tableId} stream contains events related to a table
  • The booking-{bookingId} stream contains events related to a booking

By creating a projection for each of these streams, we can add two tables to our read database that will allow us to retrieve the necessary information.

The first projection to create is exactly the same as the one we created to retrieve the list of tables. It will allow the booker service to retrieve the list of tables.

The second projection aims to create a new table listing the status of each booking, so we can determine if a table is available or not.

Projection of events for bookings

Projection Initialization:

async init() {
  this.logger.log('Initializing BookingProjection');

  const streamName = '$ce-table_booking'; // Subscribe to all events in the 'table_booking' category (all streams starting with 'table_booking-')
  const groupName = this.configService.get<string>(
    'BOOKING_PROJECTION_GROUP_NAME',
  );

  const source$ =
    await this.eventStoreDbService.initPersistentSubscriptionToStream(
      streamName,
      groupName,
    );

  source$
    .pipe(
      groupBy((event) => (event.data as TableBookingEvent['data']).id),
      mergeMap((group$) =>
        group$.pipe(concatMap((event) => this.handleEvent(event))),
      ),
    )
    .subscribe();
}

Full code - BookingProjection

Event Handling:

private async onTableBookingInitiated(
    resolvedEvent: AcknowledgeableEventStoreEvent,
  ) {
    const eventData = parseTableBookingInitiatedEventData(resolvedEvent.data);

    const currentBooking = await this.db
      .select({
        id: bookings.id,
      })
      .from(bookings)
      .where(eq(bookings.id, eventData.id));

    if (currentBooking.length > 0) {
      await resolvedEvent.ack();
      return;
    }

    await this.db.insert(bookings).values({
      id: eventData.id,
      tableId: eventData.tableId,
      timeSlotFrom: new Date(eventData.timeSlot.from),
      timeSlotTo: new Date(eventData.timeSlot.to),
      status: 'initiated',
      revision: resolvedEvent.revision,
    });

    await resolvedEvent.ack();
  }

  // More event handling methods for booking cancellation and confirmation...

Full code - BookingProjection

Now we have two tables available in our read database: tables and bookings.

Tables

  • id
  • seats
  • revision
  • removedAt

Bookings

  • id
  • tableId
  • timeSlotFrom
  • timeSlotTo
  • status
  • revision

Before we can generate a list of available slots, there's one more piece of information we need: the restaurant's opening hours!

Let's add a configuration file to our booker service to define these hours.

export const timeSlotConfiguration = {
  morning: {
    from: {
      hours: 12,
      minutes: 0,
    },
    to: {
      hours: 14,
      minutes: 0,
    },
  },
  evening: {
    from: {
      hours: 19,
      minutes: 0,
    },
    to: {
      hours: 21,
      minutes: 0,
    },
  },
  timezone: 'Europe/Paris',
} as const;

All the necessary data is now available in our read database. We can start writing our query to obtain a list of available slots:

const rows: Array<{
      day: string;
      start_time: string;
      end_time: string;
      available_tables: string[];
    }> = await this.db.execute(
      sql`
      -- 1. Create a temporary table containing the start and end dates of the search period

      WITH date_range AS (
        SELECT
          ${startDate}::date AS start_date,
          ${endDate}::date AS end_date
      ),

      -- 2. Create a temporary table containing time slots

      time_slots AS (
        SELECT
          generate_series(start_date, end_date, INTERVAL '1 day') AS day,
           ${config.morningStartTime} AS morning_start,
          ${config.morningEndTime} AS morning_end,
          ${config.eveningStartTime} AS evening_start,
          ${config.eveningEndTime} AS evening_end
        FROM
          date_range
      ),

      -- 3. Create a temporary table containing available tables

      available_tables AS (
        SELECT
          ts.day,
          ts.morning_start AS start_time,
          ts.morning_end AS end_time,
          array_agg(${tables.id}) AS available_tables
        FROM
          time_slots ts
          CROSS JOIN ${tables}
          LEFT JOIN ${bookings} ON (
            ${bookings.tableId} = ${tables.id}
            AND ${bookings.status} != 'cancelled'
            AND ${bookings.timeSlotFrom} = (ts.day + ts.morning_start)::timestamp AT TIME ZONE ${timeZone}
            AND ${bookings.timeSlotTo} = (ts.day + ts.morning_end)::timestamp AT TIME ZONE ${timeZone}
          )
        WHERE
          ${tables.seats} >= ${query.people}
          AND ${bookings.id} IS NULL
        GROUP BY
          ts.day,
          ts.morning_start,
          ts.morning_end
        UNION ALL
        -- Similar query for evening slots...
      )

      -- 4. Select available slots

      SELECT
        to_char(a.day, 'YYYY-MM-DD') AS day,
        to_char(a.start_time, 'HH24:MI') AS start_time,
        to_char(a.end_time, 'HH24:MI') AS end_time,
        a.available_tables
      FROM
        available_tables a
      ORDER BY
        a.day,
        a.start_time;
      `,
    );

Full code - ListAvailableBookingSlotsHandler

Note on Eventual Consistency

Please note that our read database is updated asynchronously. It should not be considered a source of truth and should only be used for reading purposes.

Conclusion

With the addition of these projections, we can provide endpoints to display the list of restaurant tables within the table-manager service and the list of available slots within the booker service.

With this type of architecture, it's possible to create as many projections as needed to fulfill the application's reading requirements.

We can easily address needs like:

  • How many bookings have been made during a given period?
  • How many bookings have been canceled during a given period?
  • How many people have been served during a given period?

The entire project is available on Github.

Your feedback and comments are welcome. Feel free to open an issue on the GitHub repository if you have any questions or improvement suggestions.


Profile picture

As a Node.js enthusiastic backend engineer, I find genuine satisfaction in harnessing my passion for programming to design robust and scalable solutions. I tackle every challenge with determination, striving to create high-performance applications that ensure an optimal user experience. You can contact me on LinkedIn.