Need help with postgresql-event-sourcing?
Click the “chat” button below for chat support from the developer who created it, or find similar developers for support.
tobyhede

Description

postgresql event sourcing

136 Stars 12 Forks 21 Commits 0 Opened issues

Services available

Need anything else?

postgresql-event-sourcing

Experiment using PostgreSQL as a natively event sourcing database.

Uses triggers and functions to manage projections transactionally.

The basic flow of action is:

event -> after insert trigger -> trigger function -> projection function -> projection

The advantage of this model is that triggers ensure the projections are always up to date, but we do not lose the ability to replay the event stream with the same logic.

Events

Event Sourcing ensures that all changes to application state are stored as a sequence of events.

Events are stored in an

events
table.

We assume that all objects/entities in the system have a globally unique identifier.

| Column | Details | |---------|-------------------------| | id | Primary Key | | uuid | Unique ID of the entity the event references | | type | The event type, used when building projections | | body | Event data as JSON | | inserted_at | timestamp of event insert |

CREATE TABLE "events" (
  "id" serial primary key not null,
  "uuid" uuid NOT NULL,
  "type" text NOT NULL,
  "body" jsonb NOT NULL,
  "inserted_at" timestamp(6) NOT NULL DEFAULT statement_timestamp()
);

An example event, tracking an update to the name of the user identified by a uuid:

insert into events (type, uuid, body)
values ('user_update', '11111111-1111-1111-1111-111111111111', '{"name": "blah"}');

Projection Triggers

Use

after insert
triggers on the
events
table to handle the incoming event actions.

In order to replay the events outside of the trigger mechanism, we wrap a general projection function inside the trigger. This will make more sense in a moment.

Below we create a trigger function and a trigger to execute. The trigger uses a conditional to only fire when the appropriate event type has been inserted.

create or replace function fn_trigger_user_create() returns trigger
  security definer
  language plpgsql
as $$
  begin
    perform fn_project_user_create(new.uuid, new.body);
    return new;
  end;
$$;

create trigger event_insert_user_create after insert on events for each row when (new.type = 'user_create') execute procedure fn_trigger_user_insert();

Projection Functions

A projection function does the actual work of handling the event data and mapping to the appropriate projection.

Multiple triggers and multiple functions can be added to handle different aspects of the same event type if required.

Assuming a

users
table with a
name
and
uuid
, the following function inserts a new user record into the table based on the
user_create
event.
create or replace function fn_project_user_create(uuid uuid, body jsonb) returns integer
  security definer
  language plpgsql as $$
  declare result int;
  begin
    insert into users(uuid, name, inserted_at, updated_at)
      values(uuid, body->>'name', NOW(), NOW())
      returning id into result;
    return result;
  end;
$$;

JSON can be referenced using the native operators in PostgreSQL 9.5.

body->>'name'
extracts the value of the name field from the body JSON.

Any constraints on the table will also be enforced, ensuring referential integrity.

Replay Event Stream

Using projection functions means that at any point the events can be replayed, simply by calling the function and passing the correct identifier and data.

The following code replays all

user_create
events in order
do language plpgsql $$
  declare
    e record;
  begin
    for e in select uuid body from events where type = 'user_create' order by inserted_at asc loop
      perform fn_project_user_create(e.uuid, e.body);
    end loop;
  end;
$$;

Any valid query can be used as the basis for the replay loop, and any combination of valid events.

The following code replays all events for the user identified by the specified uuid:

do language plpgsql $$
  declare
    e record;
  begin
    for e in select type, uuid, body from events where uuid = '11111111-1111-1111-1111-111111111111' order by inserted_at asc loop
    case e.type
      when 'user_create' then
        perform fn_project_user_create(e.uuid, e.body);
       when 'user_update' then
        perform fn_project_user_update(e.uuid, e.body);
      end case;
    end loop;
  end;
$$;

All of these functions will be executed in the same transaction block. This doesn't particularly matter in an event sourced system, but it is good to know.

PostgreSQL is not just limited to processing events iteratively.

Below is an example of using a materialized view to project the user data.

create materialized view users_view as
  with t as (
      select *, row_number() over(partition by uuid order by inserted_at desc) as row_number
      from events
      where type = 'user_update'
  )
  select uuid, body->>'name' as name, inserted_at from t where row_number = 1;

select * from users_view;

In this case we assume that the most recent update event contains the correct user data, and we query to find the most recent

update_user
event for each user identified.

We use cookies. If you continue to browse the site, you agree to the use of cookies. For more information on our use of cookies please see our Privacy Policy.