| Crates.io | pgwire-replication |
| lib.rs | pgwire-replication |
| version | 0.1.2 |
| created_at | 2026-01-03 21:00:56.822225+00 |
| updated_at | 2026-01-17 00:41:07.424226+00 |
| description | Tokio-based Postgres wire-protocol logical replication client (pgoutput) with TLS and SCRAM. |
| homepage | https://github.com/vnvo/pgwire-replication |
| repository | https://github.com/vnvo/pgwire-replication |
| max_upload_size | |
| id | 2020777 |
| size | 323,514 |
A low-level, high-performance PostgreSQL logical replication client implemented directly on top of the PostgreSQL wire protocol (pgwire).
This crate is designed for CDC, change streaming, and WAL replay systems that require explicit control over replication state, deterministic restart behavior, and minimal runtime overhead.
pgwire-replication intentionally avoids libpq, tokio-postgres, and other higher-level PostgreSQL clients for the replication path. It interacts with a Postgres instance directly and relies on START_REPLICATION ... LOGICAL ... and the built-in pgoutput output plugin.
pgwire-replication exists to provide:
This crate was originally extracted from the Deltaforge CDC project and is maintained independently.
Add to your Cargo.toml:
[dependencies]
pgwire-replication = "0.1"
Or with specific features:
[dependencies]
pgwire-replication = { version = "0.1", default-features = false, features = ["tls-rustls"] }
pgoutput logical decoding support (transport-level)start_lsn)stop_at_lsn)This crate intentionally does not provide:
pgoutput decoding into rows or eventsThese responsibilities belong in higher layers.
use pgwire_replication::{ReplicationClient, ReplicationEvent};
let mut repl = ReplicationClient::connect(config).await?;
while let Some(event) = repl.recv().await? {
match event {
ReplicationEvent::XLogData { wal_end, data, .. } => {
process(data);
repl.update_applied_lsn(wal_end);
}
ReplicationEvent::KeepAlive { .. } => {}
ReplicationEvent::StoppedAt { reached } => break,
}
}
// Clean end-of-stream
Check the Quick Start and Examples for more detailed use cases.
pgwire-replication is built around explicit WAL position control.
LSNs (Log Sequence Numbers) are treated as first-class inputs and outputs and are never hidden behind opaque offsets.
Every replication session begins at an explicit LSN:
ReplicationConfig {
start_lsn: Lsn,
..
}
This enables:
The provided LSN is sent verbatim to PostgreSQL via START_REPLICATION.
Replication can be bounded using stop_at_lsn:
ReplicationConfig {
start_lsn,
stop_at_lsn: Some(stop_lsn),
..
}
When configured:
This enables:
Progress is not auto-committed. Instead, the consumer explicitly reports progress:
repl.update_applied_lsn(lsn);
Calling update_applied_lsn indicates that all WAL up to lsn has been durably persisted by the consumer (for example, flushed to disk or a message queue).
This allows callers to control:
Updates are monotonic: reporting an older LSN is a no-op.
Standby status updates are sent asynchronously by the worker using the
latest applied LSN, based on status_interval or server keepalive requests.
For CDC pipelines, progress should typically be reported at transaction commit boundaries, not for every message.
PostgreSQL logical replication may remain silent for extended periods when no WAL is generated. This is normal.
idle_wakeup_interval does not indicate failure. It bounds how long the client may block
waiting for server messages before waking up to send a standby status update and continue waiting.
While the system is idle, the effective feedback cadence is bounded by
idle_wakeup_interval, not status_interval.
stop() requests a graceful stop (sends CopyDone). The client will continue to yield any buffered events and then recv() returns Ok(None).shutdown().await is a convenience method that calls stop(), drains remaining events, and awaits the worker task result.abort() cancels the worker task immediately (hard stop; does not send CopyDone).Dropping ReplicationClient requests a best-effort graceful stop. When dropped inside a Tokio runtime, the worker is detached and allowed to finish cleanly; when dropped outside a runtime, the worker may be aborted to avoid leaking a task.
PostgreSQL logical replication delivers complete, committed transactions in commit order. This has important implications for LSN handling:
(commit_lsn, event_lsn) provides a total ordering suitable for checkpointing and replay.For CDC pipelines, progress tracking should typically be based on commit boundaries rather than individual event LSNs.
LSNs are formatted exactly as PostgreSQL displays them:
X/Y formatExamples: 0/0, 0/16B6C50, 16/B374D848
Parsing accepts both padded and unpadded forms for compatibility.
TLS is optional and uses rustls.
TLS configuration is provided explicitly via ReplicationConfig and does not rely on system OpenSSL.
Control plane (publication/slot creation) is typically done using a proper "Postgres client" (Your choice). This crate handles only the replication plane.
use pgwire_replication::{
client::ReplicationEvent, Lsn, ReplicationClient, ReplicationConfig, SslMode, TlsConfig,
};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Control plane (publication/slot creation) is typically done using a Postgres client.
// This crate implements the replication plane only.
// Use a real LSN:
// - from your checkpoint store, or
// - from SQL (pg_current_wal_lsn / slot confirmed_flush_lsn), or
// - from a previous run.
let start_lsn = Lsn::parse("0/16B6C50")?;
let cfg = ReplicationConfig {
host: "127.0.0.1".into(),
port: 5432,
user: "postgres".into(),
password: "postgres".into(),
database: "postgres".into(),
tls: TlsConfig::disabled(),
slot: "my_slot".into(),
publication: "my_pub".into(),
start_lsn,
stop_at_lsn: None,
status_interval: std::time::Duration::from_secs(10),
idle_wakeup_interval: std::time::Duration::from_secs(10),
buffer_events: 8192,
};
let mut client = ReplicationClient::connect(cfg).await?;
loop {
match client.recv().await {
Ok(Some(ev)) => match ev {
ReplicationEvent::XLogData { wal_end, data, .. } => {
println!("XLogData wal_end={wal_end} bytes={}", data.len());
client.update_applied_lsn(wal_end);
}
ReplicationEvent::KeepAlive {
wal_end,
reply_requested,
..
} => {
println!("KeepAlive wal_end={wal_end} reply_requested={reply_requested}");
}
ReplicationEvent::StoppedAt { reached } => {
println!("StoppedAt reached={reached}");
// break is optional; the stream should end shortly anyway
break;
}
},
Ok(None) => {
println!("Replication ended cleanly");
break;
}
Err(e) => {
eprintln!("Replication failed: {e}");
return Err(e.into());
}
}
}
Ok(())
}
Examples that use the control-plane SQL client (tokio-postgres) require the examples feature.
examples/basic.rsSTART_LSN="0/16B6C50" cargo run --example basic
examples/checkpointed.rscargo run --example checkpointed
examples/bounded_replay.rscargo run --example bounded_replay
examples/with_tls.rsPGHOST=db.example.com \
PGPORT=5432 \
PGUSER=repl_user \
PGPASSWORD=secret \
PGDATABASE=postgres \
PGSLOT=example_slot_tls \
PGPUBLICATION=example_pub_tls \
PGTLS_CA=/path/to/ca.pem \
PGTLS_SNI=db.example.com \
cargo run --example with_tls
examples/with_mtls.rsInject the fake dns record, if you need to:
sudo sh -c 'echo "127.0.0.1 db.example.com" >> /etc/hosts'
and then:
PGHOST=db.example.com \
PGPORT=5432 \
PGUSER=repl_user \
PGPASSWORD=secret \
PGDATABASE=postgres \
PGSLOT=example_slot_mtls \
PGPUBLICATION=example_pub_mtls \
PGTLS_CA=/etc/ssl/ca.pem \
PGTLS_CLIENT_CERT=/etc/ssl/client.crt.pem \
PGTLS_CLIENT_KEY=/etc/ssl/client.key.pem \
PGTLS_SNI=db.example.com \
cargo run --example with_mtls
PGUSER/PGPASSWORD are used for control-plane setup (publication/slot).REPL_USER/REPL_PASSWORD are used for the replication stream.PGTLS_SNI to a DNS name on the cert.VerifyCa can be used instead of VerifyFull if hostname validation is not possible.Integration tests use Docker via testcontainers and are gated behind a feature flag:
cargo test --features integration-tests -- --nocapture
Licensed under either of:
at your option.