| Crates.io | drasi-source-postgres |
| lib.rs | drasi-source-postgres |
| version | 0.1.2 |
| created_at | 2026-01-14 23:15:52.315436+00 |
| updated_at | 2026-01-23 06:14:18.571083+00 |
| description | PostgreSQL source plugin for Drasi |
| homepage | |
| repository | https://github.com/drasi-project/drasi-core |
| max_upload_size | |
| id | 2044145 |
| size | 241,646 |
The PostgreSQL Replication Source is a Change Data Capture (CDC) plugin for Drasi that streams data changes from PostgreSQL databases in real-time using logical replication and the Write-Ahead Log (WAL). It captures INSERT, UPDATE, and DELETE operations as they occur and transforms them into Drasi SourceChange events for continuous query processing.
Key Capabilities:
Use Cases:
The PostgreSQL source consists of several specialized modules:
connection.rs): Manages the PostgreSQL replication protocol connection, authentication (including SCRAM-SHA-256), and message exchangestream.rs): Handles the continuous WAL streaming loop, message processing, and transaction coordinationdecoder.rs): Decodes binary pgoutput messages into structured WAL events with full type supportprotocol.rs): Implements PostgreSQL wire protocol encoding/decodingscram.rs): SCRAM-SHA-256 authentication implementationtypes.rs): Type definitions for PostgreSQL values and WAL messagesNote: Bootstrap functionality is provided by the separate drasi-bootstrap-postgres crate via the pluggable bootstrap provider pattern.
PostgreSQL WAL → Connection → Decoder → Stream → SourceChange Events
↓
Transaction
Grouping
↓
Dispatcher → Queries
Bootstrap Flow (via pluggable bootstrap provider):
Bootstrap Request → Bootstrap Provider → SourceChange Events
↓
Coordinate with Streaming
The PostgreSQL database must be configured for logical replication:
PostgreSQL Version: PostgreSQL 10 or later (requires pgoutput plugin)
Configuration Parameters (postgresql.conf):
wal_level = logical
max_replication_slots = 10 # At least 1 per source
max_wal_senders = 10 # At least 1 per source
Database User Permissions:
-- Grant replication privilege
ALTER USER drasi_user WITH REPLICATION;
-- Grant table access
GRANT SELECT ON ALL TABLES IN SCHEMA public TO drasi_user;
GRANT USAGE ON SCHEMA public TO drasi_user;
Publication Setup:
-- Create a publication for specific tables
CREATE PUBLICATION drasi_publication FOR TABLE users, orders, products;
-- Or for all tables
CREATE PUBLICATION drasi_publication FOR ALL TABLES;
Replication Slot: The source automatically creates a replication slot with the configured name. If it exists, it will be reused.
Replica Identity (recommended for full UPDATE/DELETE data):
-- For tables without primary keys or needing full row data
ALTER TABLE your_table REPLICA IDENTITY FULL;
-- Default behavior (uses primary key)
ALTER TABLE your_table REPLICA IDENTITY DEFAULT;
The builder pattern provides type-safe configuration with sensible defaults:
use drasi_source_postgres::PostgresReplicationSource;
use drasi_lib::config::common::{SslMode, TableKeyConfig};
let source = PostgresReplicationSource::builder("postgres-source-1")
.with_host("db.example.com")
.with_port(5432)
.with_database("production_db")
.with_user("drasi_user")
.with_password("secure_password")
.with_tables(vec!["users".to_string(), "orders".to_string()])
.with_slot_name("drasi_production_slot")
.with_publication_name("drasi_publication")
.with_ssl_mode(SslMode::Require)
.add_table_key(TableKeyConfig {
table: "users".to_string(),
key_columns: vec!["user_id".to_string()],
})
.with_dispatch_mode(drasi_lib::channels::DispatchMode::Channel)
.with_dispatch_buffer_capacity(2000)
.with_auto_start(true)
.build()?;
Alternatively, construct the config struct directly:
use drasi_source_postgres::{PostgresReplicationSource, PostgresSourceConfig};
use drasi_lib::config::common::{SslMode, TableKeyConfig};
let config = PostgresSourceConfig {
host: "db.example.com".to_string(),
port: 5432,
database: "production_db".to_string(),
user: "drasi_user".to_string(),
password: "secure_password".to_string(),
tables: vec!["users".to_string(), "orders".to_string()],
slot_name: "drasi_production_slot".to_string(),
publication_name: "drasi_publication".to_string(),
ssl_mode: SslMode::Require,
table_keys: vec![
TableKeyConfig {
table: "users".to_string(),
key_columns: vec!["user_id".to_string()],
},
],
};
let source = PostgresReplicationSource::new("postgres-source-1", config)?;
| Option | Type | Default | Description |
|---|---|---|---|
id |
String |
(Required) | Unique identifier for the source instance |
host |
String |
"localhost" |
PostgreSQL server hostname or IP address |
port |
u16 |
5432 |
PostgreSQL server port number |
database |
String |
(Required) | Database name to connect to |
user |
String |
(Required) | Database user with replication privileges |
password |
String |
"" |
Database password (supports SCRAM-SHA-256 and cleartext) |
tables |
Vec<String> |
[] |
List of tables to monitor (empty = all tables in publication) |
slot_name |
String |
"drasi_slot" |
Replication slot name (created if doesn't exist) |
publication_name |
String |
"drasi_publication" |
PostgreSQL publication to subscribe to |
ssl_mode |
SslMode |
SslMode::Prefer |
SSL mode: Disable, Prefer, or Require |
table_keys |
Vec<TableKeyConfig> |
[] |
Manual primary key configuration (see below) |
dispatch_mode |
Option<DispatchMode> |
None |
Channel dispatch mode (builder only) |
dispatch_buffer_capacity |
Option<usize> |
None |
Dispatch buffer size (builder only) |
auto_start |
bool |
true |
Whether to start automatically when added to DrasiLib |
Manually specify primary key columns for element ID generation:
| Field | Type | Description |
|---|---|---|
table |
String |
Table name (e.g., "users" or "schema.table") |
key_columns |
Vec<String> |
Column names to use as primary key |
Example:
TableKeyConfig {
table: "users".to_string(),
key_columns: vec!["user_id".to_string()],
}
// Composite key
TableKeyConfig {
table: "order_items".to_string(),
key_columns: vec!["order_id".to_string(), "item_id".to_string()],
}
Note: User-configured keys override automatically detected primary keys.
The source consumes WAL messages in the pgoutput binary format:
WAL Message Types:
B (Begin): Transaction startC (Commit): Transaction commitR (Relation): Table metadata (schema, columns, types)I (Insert): Row insertionU (Update): Row update (may include old tuple)D (Delete): Row deletionT (Truncate): Table truncate (not implemented)Relation Metadata includes:
Tuple Data encoding:
n = null, u = unchanged TOAST, t = text) + length + dataThe decoder supports PostgreSQL's built-in types via OID mapping:
| PostgreSQL Type | OID | Decoded As |
|---|---|---|
boolean |
16 | PostgresValue::Bool |
int2 (smallint) |
21 | PostgresValue::Int2 |
int4 (integer) |
23 | PostgresValue::Int4 |
int8 (bigint) |
20 | PostgresValue::Int8 |
float4 (real) |
700 | PostgresValue::Float4 |
float8 (double) |
701 | PostgresValue::Float8 |
numeric / decimal |
1700 | PostgresValue::Numeric |
text |
25 | PostgresValue::Text |
varchar |
1043 | PostgresValue::Varchar |
char / bpchar |
1042 | PostgresValue::Char |
uuid |
2950 | PostgresValue::Uuid |
timestamp |
1114 | PostgresValue::Timestamp |
timestamptz |
1184 | PostgresValue::TimestampTz |
date |
1082 | PostgresValue::Date |
time |
1083 | PostgresValue::Time |
json |
114 | PostgresValue::Json |
jsonb |
3802 | PostgresValue::Jsonb |
bytea |
17 | PostgresValue::Bytea |
| Unknown | - | PostgresValue::Text (fallback) |
use drasi_source_postgres::PostgresReplicationSource;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Build the source
let source = PostgresReplicationSource::builder("pg-source")
.with_host("localhost")
.with_database("myapp")
.with_user("postgres")
.with_password("password")
.with_tables(vec!["users".to_string(), "orders".to_string()])
.build()?;
// Start streaming
source.start().await?;
// Source will now stream changes continuously
// Stop when done
tokio::signal::ctrl_c().await?;
source.stop().await?;
Ok(())
}
use drasi_source_postgres::PostgresReplicationSource;
use drasi_lib::config::common::TableKeyConfig;
let source = PostgresReplicationSource::builder("pg-source")
.with_host("localhost")
.with_database("myapp")
.with_user("postgres")
.add_table_key(TableKeyConfig {
table: "events".to_string(),
key_columns: vec!["event_id".to_string(), "timestamp".to_string()],
})
.build()?;
source.start().await?;
use drasi_source_postgres::PostgresReplicationSource;
use drasi_lib::config::common::SslMode;
use drasi_lib::channels::DispatchMode;
let source = PostgresReplicationSource::builder("pg-source")
.with_host("db.example.com")
.with_database("production")
.with_user("drasi_user")
.with_password("secure_password")
.with_ssl_mode(SslMode::Require)
.with_dispatch_mode(DispatchMode::Channel)
.with_dispatch_buffer_capacity(5000)
.build()?;
source.start().await?;
use drasi_source_postgres::{PostgresReplicationSource, PostgresSourceConfig};
use drasi_lib::config::common::SslMode;
let config = PostgresSourceConfig {
host: "localhost".to_string(),
port: 5432,
database: "myapp".to_string(),
user: "postgres".to_string(),
password: "password".to_string(),
tables: vec![], // All tables in publication
slot_name: "drasi_slot".to_string(),
publication_name: "drasi_publication".to_string(),
ssl_mode: SslMode::Prefer,
table_keys: vec![],
};
let source = PostgresReplicationSource::new("pg-source", config)?;
source.start().await?;
All PostgreSQL changes are transformed into Drasi SourceChange events:
Insert Event:
SourceChange::Insert {
element: Element::Node {
metadata: ElementMetadata {
reference: ElementReference::new("pg-source", "users:12345"),
labels: Arc::from([Arc::from("users")]),
effective_from: 1704067200000000000,
},
properties: ElementPropertyMap {
"user_id" => ElementValue::Integer(12345),
"username" => ElementValue::String(Arc::from("john_doe")),
"email" => ElementValue::String(Arc::from("john@example.com")),
"is_active" => ElementValue::Bool(true),
}
}
}
Update Event:
SourceChange::Update {
element: Element::Node {
metadata: ElementMetadata { /* same as insert */ },
properties: ElementPropertyMap { /* new values */ }
}
}
Delete Event:
SourceChange::Delete {
metadata: ElementMetadata {
reference: ElementReference::new("pg-source", "users:12345"),
labels: Arc::from([Arc::from("users")]),
effective_from: 1704240000000000000,
}
}
Element IDs are generated using the following priority:
table_keys config)Format:
"table_name:value" (e.g., "users:12345")"table_name:value1_value2" (e.g., "order_items:1001_5")"table_name:uuid" (e.g., "events:550e8400-e29b-41d4-a716-446655440000")"users" table → ["users"] labelChanges are grouped by PostgreSQL transaction and dispatched atomically:
The PostgreSQL source supports pluggable bootstrap providers via the BootstrapProvider trait. Any bootstrap provider implementation can be used with this source:
use drasi_source_postgres::PostgresReplicationSource;
let source = PostgresReplicationSource::builder("pg-source")
.with_host("localhost")
.with_database("myapp")
.with_user("postgres")
.with_password("password")
.with_bootstrap_provider(my_bootstrap_provider) // Any BootstrapProvider impl
.build()?;
Common bootstrap provider options include:
PostgresBootstrapProvider (drasi-bootstrap-postgres) - Snapshots directly from PostgreSQLScriptFileBootstrapProvider (drasi-bootstrap-scriptfile) - Loads initial data from JSONL filesNoopBootstrapProvider (drasi-bootstrap-noop) - Skips bootstrap entirelyBootstrapProvider traitThe source handles connection failures gracefully:
During WAL streaming, the source uses primary key information from the relation metadata provided by PostgreSQL's logical replication protocol. For bootstrap, the configured bootstrap provider is responsible for primary key detection.
When using PostgresBootstrapProvider, it queries PostgreSQL system catalogs:
SELECT n.nspname, c.relname, a.attname
FROM pg_constraint con
JOIN pg_class c ON con.conrelid = c.oid
JOIN pg_namespace n ON c.relnamespace = n.oid
JOIN pg_attribute a ON a.attrelid = c.oid
WHERE con.contype = 'p'
AND a.attnum = ANY(con.conkey)
ORDER BY n.nspname, c.relname, array_position(con.conkey, a.attnum)
User-configured table_keys override automatically detected primary keys in both streaming and bootstrap.
Solution: Grant replication privilege
ALTER USER drasi_user WITH REPLICATION;
Solution: Configure PostgreSQL and restart
# postgresql.conf
wal_level = logical
sudo systemctl restart postgresql
Options:
SELECT pg_drop_replication_slot('slot_name');slot_name in configSolution: Set replica identity to FULL
ALTER TABLE your_table REPLICA IDENTITY FULL;
Solution: Configure manual keys
.add_table_key(TableKeyConfig {
table: "your_table".to_string(),
key_columns: vec!["id".to_string()],
})
Solution: Adjust SSL mode
.with_ssl_mode(SslMode::Disable) // Try without SSL
// or
.with_ssl_mode(SslMode::Prefer) // Prefer SSL but allow fallback
pg_replication_slotsmax_slot_wal_keep_size to limit retentiondispatch_buffer_capacity for high volumeDispatchMode::Channel for backpressure control-- Check replication slots
SELECT * FROM pg_replication_slots;
-- Monitor replication lag
SELECT slot_name,
pg_current_wal_lsn() AS current_lsn,
confirmed_flush_lsn,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes
FROM pg_replication_slots;
-- Check active replication connections
SELECT * FROM pg_stat_replication;
-- View publications
SELECT * FROM pg_publication;
SELECT * FROM pg_publication_tables WHERE pubname = 'drasi_publication';
The source logs important events:
info!: Connection events, transaction commits, bootstrap progresswarn!: Missing primary keys, unknown message types, recoverable errorserror!: Connection failures, protocol errors, unrecoverable errorsdebug!: Detailed message processing, WAL decodingEnable debug logging:
RUST_LOG=drasi_source_postgres=debug cargo run
# Unit tests
cargo test -p drasi-source-postgres
# With PostgreSQL instance
docker run -e POSTGRES_PASSWORD=password -p 5432:5432 postgres:15
cargo test -p drasi-source-postgres -- --test-threads=1
scram.rs)Note: MD5 authentication is explicitly not supported due to security concerns. If your PostgreSQL server requests MD5 authentication, you will receive an error instructing you to configure scram-sha-256 in pg_hba.conf.
The source implements PostgreSQL wire protocol 3.0:
See protocol.rs and connection.rs for implementation details.
Copyright 2025 The Drasi Authors.
Licensed under the Apache License, Version 2.0. See LICENSE file for details.