/** * * EXTENSIONS * */ CREATE EXTENSION IF NOT EXISTS timescaledb; CREATE OR REPLACE FUNCTION unix_now() returns BIGINT LANGUAGE SQL STABLE as $$ SELECT extract(epoch from now())::BIGINT $$; /** * * CUSTOM TYPES * */ CREATE TYPE payload_encoding AS ENUM ('json', 'cbor'); CREATE TYPE append_event_data AS ( bytes BYTEA, json TEXT, meta_id INTEGER, time BIGINT ); /** * * TABLES * */ CREATE TABLE aggregate_type ( id SERIAL PRIMARY KEY, name TEXT NOT NULL UNIQUE ); CREATE TABLE event_meta ( id SERIAL PRIMARY KEY, name TEXT NOT NULL, version TEXT, aggregate_type_id INTEGER NOT NULL, encoding payload_encoding NOT NULL, UNIQUE (aggregate_type_id, name, version), -- Remove all event_meta when aggregate type is deleted. FOREIGN KEY (aggregate_type_id) REFERENCES aggregate_type ON DELETE CASCADE ); CREATE TABLE event ( time BIGINT NOT NULL, id BIGSERIAL, aggregate_id UUID NOT NULL, aggregate_type_id INTEGER NOT NULL REFERENCES aggregate_type, meta_id INTEGER NOT NULL REFERENCES event_meta, json JSONB, bytes BYTEA, -- aggregate root events are sorted by time PRIMARY KEY (aggregate_type_id, aggregate_id, time) ); -- chunk_time_interval is default of one week for BIGINT SELECT create_hypertable('event', 'time', 'aggregate_type_id', 5, chunk_time_interval => 604800000000000, create_default_indexes => false); SELECT set_integer_now_func('event', 'unix_now'); -- Aggregate events are sorted by id -- Index used by consumers CREATE INDEX ON event (aggregate_type_id, id); CREATE TABLE aggregate_consumer ( name TEXT PRIMARY KEY, aggregate_type_id INTEGER NOT NULL, aggregate_id UUID, "offset" BIGINT, -- Remove all consumers in case the aggregate type is deleted. FOREIGN KEY (aggregate_type_id) REFERENCES aggregate_type ON DELETE CASCADE -- foreign keys referencing hypertables not supported by timescaledb at the moment -- see https://github.com/timescale/timescaledb/issues/498 -- FOREIGN KEY (offset) REFERENCES event(time) ); /** * * PROCEDURES * */ CREATE OR REPLACE FUNCTION append_to_store( _aggregate_type_id INTEGER, _aggregate_id UUID, event_data_array append_event_data[], orderly BOOLEAN, root_last_known_time BIGINT, OUT appended_ids BIGINT[] ) AS $$ DECLARE prior_root_event_time BIGINT; event_data append_event_data; last_inserted_id INTEGER; prior_root_event RECORD; prior_event_id BIGINT; notification_payload TEXT; BEGIN appended_ids = ARRAY[]::BIGINT[]; -- Make sure order is retained for aggregate type by not allowing concurrent writes PERFORM pg_advisory_xact_lock(_aggregate_type_id); -- Retrieve the last event for the aggregate type. -- prior_event_id will be used by consumers to track they have not missed any events SELECT id INTO prior_event_id FROM event WHERE aggregate_type_id = _aggregate_type_id ORDER BY id DESC LIMIT 1; -- Retrieve the last event for the aggregate root. -- Ordering is guaranteed per aggregate root only SELECT id, time INTO prior_root_event FROM event WHERE aggregate_type_id = _aggregate_type_id AND aggregate_id = _aggregate_id ORDER BY time DESC LIMIT 1; prior_root_event_time = prior_root_event.time; IF orderly THEN -- Check order is right against the last root event (not the aggregate) IF prior_root_event_time IS DISTINCT FROM root_last_known_time THEN RAISE EXCEPTION 'invalid aggregate time provided: %, expected: %', root_last_known_time, prior_root_event_time; END IF; END IF; FOREACH event_data IN ARRAY event_data_array LOOP INSERT INTO event (time, aggregate_id, aggregate_type_id, meta_id, json, bytes) VALUES (event_data.time, _aggregate_id, _aggregate_type_id, event_data.meta_id, event_data.json::json, event_data.bytes) RETURNING id INTO last_inserted_id; notification_payload = '' || coalesce(prior_event_id::TEXT, '') || ',' || _aggregate_id || ',' || last_inserted_id || ',' || event_data.time || ',' || coalesce(encode(event_data.bytes, 'base64'), '') || ',' || coalesce(event_data.json, ''); -- Wake up listeners of _aggregate_type_id channel PERFORM pg_notify(_aggregate_type_id::TEXT, notification_payload); -- Wake up listeners of aggregate root channel PERFORM pg_notify(_aggregate_type_id::TEXT || '::' || _aggregate_id::TEXT, notification_payload); appended_ids = array_append(appended_ids, last_inserted_id); prior_event_id = last_inserted_id; END LOOP; END; $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION aggregate_type_id( _name TEXT, OUT _id INT) AS $$ BEGIN LOOP SELECT id FROM aggregate_type WHERE name = _name INTO _id; EXIT WHEN FOUND; INSERT INTO aggregate_type (name) VALUES (_name) ON CONFLICT (name) DO NOTHING RETURNING id INTO _id; EXIT WHEN FOUND; END LOOP; END $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION event_meta_id( _aggregate_type_id INTEGER, _name TEXT, _version TEXT, _encoding payload_encoding, OUT _id INT ) AS $$ BEGIN LOOP SELECT id FROM event_meta WHERE aggregate_type_id = _aggregate_type_id AND name = _name AND version = _version INTO _id; EXIT WHEN FOUND; INSERT INTO event_meta (aggregate_type_id, name, version, encoding) VALUES (_aggregate_type_id, _name, _version, _encoding) ON CONFLICT (aggregate_type_id, name, version) DO NOTHING RETURNING id INTO _id; EXIT WHEN FOUND; END LOOP; END $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION consumer_offset( _name TEXT, _aggregate_type_id INTEGER, _aggregate_id UUID, OUT _offset BIGINT ) AS $$ BEGIN LOOP SELECT "offset" FROM aggregate_consumer WHERE name = _name INTO _offset; EXIT WHEN FOUND; INSERT INTO aggregate_consumer (name, aggregate_type_id, aggregate_id) VALUES (_name, _aggregate_type_id, _aggregate_id) ON CONFLICT (name) DO NOTHING RETURNING "offset" INTO _offset; EXIT WHEN FOUND; END LOOP; END $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION try_consumer_checkout( _name TEXT, _offset BIGINT ) RETURNS void AS $$ DECLARE _consumer RECORD; _last_committed_offset BIGINT; _aggregate_type_id INT; _anterior_offset BIGINT; _event RECORD; BEGIN SELECT "offset", aggregate_type_id FROM aggregate_consumer INTO _consumer WHERE name = _name FOR UPDATE; -- lock _last_committed_offset = _consumer.offset; _aggregate_type_id= _consumer.aggregate_type_id; IF _last_committed_offset IS NOT NULL AND _offset <= _last_committed_offset THEN RAISE EXCEPTION 'checkpoint behind current offset. Received %, expected higher than %', _offset, _last_committed_offset; END IF; -- Get event for the offset and the previous one CREATE TEMP TABLE _events ON COMMIT DROP AS SELECT id FROM event WHERE aggregate_type_id = _aggregate_type_id AND id <= _offset ORDER BY id DESC LIMIT 2; -- Consumer table can't reference "event" hypertable -- Perform faux FOREIGN key check here SELECT id INTO _event FROM _events e WHERE _offset = e.id; IF NOT FOUND THEN RAISE EXCEPTION 'update or delete on table aggregate_consumer violates foreign key constraint. No event found for offset: %', _offset; END IF; -- Do a constraint check for order SELECT id INTO _anterior_offset FROM _events WHERE id < _offset; IF _last_committed_offset IS DISTINCT FROM _anterior_offset THEN RAISE EXCEPTION 'checkpoint out of order. Offset % does not follow event with timestamp %', _offset, _anterior_offset; END IF; UPDATE aggregate_consumer SET "offset" = _offset WHERE "name" = _name; END; $$ LANGUAGE plpgsql;