| Crates.io | postgres-notify |
| lib.rs | postgres-notify |
| version | 0.3.8 |
| created_at | 2025-05-21 10:12:17.107757+00 |
| updated_at | 2026-01-07 10:14:06.18037+00 |
| description | Library that makes it easy to subscribe to PostgreSQL notifications |
| homepage | |
| repository | https://github.com/macprog-guy/postgres-notify.git |
| max_upload_size | |
| id | 1683205 |
| size | 160,048 |
A resilient PostgreSQL client wrapper for tokio_postgres with automatic reconnection, query timeouts, and NOTIFY/LISTEN support.
Add to your Cargo.toml:
[dependencies]
postgres-notify = "0.3"
tokio = { version = "1", features = ["rt", "time"] }
tokio-postgres = "0.7"
| Feature | Default | Description |
|---|---|---|
chrono |
Yes | Use DateTime<Utc> for timestamps instead of SystemTime |
serde |
Yes | Enable serialization for message types |
tracing |
Yes | Structured logging via the tracing crate |
To disable default features:
[dependencies]
postgres-notify = { version = "0.3", default-features = false }
use postgres_notify::{PGRobustClient, PGRobustClientConfig, PGMessage};
use tokio_postgres::NoTls;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = PGRobustClientConfig::new("postgres://localhost/mydb", NoTls)
.callback(|msg: PGMessage| println!("{}", msg));
let mut client = PGRobustClient::spawn(config).await?;
let rows = client.query("SELECT $1::TEXT", &[&"hello"], Some(Duration::from_secs(5))).await?;
println!("Result: {}", rows[0].get::<_, String>(0));
Ok(())
}
PostgreSQL supports asynchronous notifications via LISTEN/NOTIFY commands. This allows the database to push messages to clients without polling.
use postgres_notify::{PGRobustClient, PGRobustClientConfig, PGMessage};
use tokio_postgres::NoTls;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = PGRobustClientConfig::new("postgres://localhost/mydb", NoTls)
.callback(|msg: PGMessage| {
if let PGMessage::Notify(notification) = msg {
println!("Channel: {}, Payload: {}", notification.channel, notification.payload);
}
});
let mut client = PGRobustClient::spawn(config).await?;
// Subscribe to channels
client.subscribe_notify(&["events", "updates"], Some(Duration::from_secs(5))).await?;
// Subscriptions are automatically restored after reconnection
// Unsubscribe when done
client.unsubscribe_notify(&["events"], None).await?;
Ok(())
}
Capture PostgreSQL RAISE messages during query execution. Useful for debugging stored procedures and functions.
use postgres_notify::{PGRobustClient, PGRobustClientConfig, PGMessage};
use tokio_postgres::NoTls;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = PGRobustClientConfig::new("postgres://localhost/mydb", NoTls)
.callback(|_| {});
let mut client = PGRobustClient::spawn(config).await?;
// Capture logs from a query
let (result, logs) = client.with_captured_log(async |c| {
c.simple_query(
"DO $$ BEGIN RAISE NOTICE 'Processing started'; END; $$",
Some(Duration::from_secs(5))
).await?;
Ok(())
}).await?;
for log in logs {
println!("{}", log);
}
Ok(())
}
Log Levels: DEBUG, LOG, INFO, NOTICE, WARNING, ERROR, FATAL, PANIC
If the connection is lost, the client automatically reconnects with exponential backoff:
On reconnection:
let config = PGRobustClientConfig::new(url, NoTls)
.max_reconnect_attempts(5) // Limit retry attempts
.callback(|msg| {
match msg {
PGMessage::Reconnect { attempts, max_attempts, .. } => {
println!("Reconnecting: attempt {} of {}", attempts, max_attempts);
}
PGMessage::Connected { .. } => println!("Connected!"),
PGMessage::FailedToReconnect { .. } => println!("Gave up reconnecting"),
_ => {}
}
});
All query methods accept an optional timeout. If exceeded, the query is cancelled server-side.
use std::time::Duration;
// With explicit timeout
let rows = client.query("SELECT pg_sleep(10)", &[], Some(Duration::from_secs(1))).await;
// Returns Err(PGError::Timeout(..))
// Use default timeout (1 hour)
let rows = client.query("SELECT 1", &[], None).await?;
// Configure default timeout
let config = PGRobustClientConfig::new(url, NoTls)
.default_timeout(Duration::from_secs(30));
// Manual cancellation
client.cancel_query().await?;
use postgres_notify::{PGRobustClientConfig, PGMessage};
use tokio_postgres::NoTls;
use std::time::Duration;
let config = PGRobustClientConfig::new("postgres://user:pass@localhost/db", NoTls)
// Event callback (required for NOTIFY/RAISE)
.callback(|msg: PGMessage| println!("{}", msg))
// Reconnection settings
.max_reconnect_attempts(10)
// Default query timeout
.default_timeout(Duration::from_secs(300))
// SQL executed on each connection
.connect_script("SET timezone = 'UTC'; SET statement_timeout = '30s';")
// PostgreSQL application name
.application_name("my-service")
// Pre-configure LISTEN channels
.subscriptions(["events", "updates"]);
| Type | Description |
|---|---|
PGRobustClient<TLS> |
Main client with resilience features |
PGRobustClientConfig<TLS> |
Builder for client configuration |
PGMessage |
Event enum delivered to callbacks |
PGError |
Error type with timeout/reconnect variants |
PGResult<T> |
Alias for Result<T, PGError> |
All methods mirror tokio_postgres::Client with an additional timeout parameter:
| Method | Description |
|---|---|
query |
Execute query, return all rows |
query_one |
Execute query, return exactly one row |
query_opt |
Execute query, return zero or one row |
query_raw |
Execute query with streaming results |
execute_raw |
Execute statement, return affected row count |
prepare |
Prepare a statement |
transaction |
Begin a transaction |
simple_query |
Execute simple (non-parameterized) query |
batch_execute |
Execute multiple statements |
pub enum PGMessage {
Notify(PGNotifyMessage), // NOTIFY received
Raise(PGRaiseMessage), // RAISE message received
Reconnect { .. }, // Reconnection attempt
Connected { .. }, // Connection established
Timeout { .. }, // Query timeout occurred
Cancelled { .. }, // Query cancellation result
FailedToReconnect { .. }, // Max reconnect attempts reached
Disconnected { .. }, // Connection lost
}
use postgres_notify::{PGError, PGResult};
async fn example(client: &mut PGRobustClient<NoTls>) -> PGResult<()> {
match client.query("SELECT 1", &[], None).await {
Ok(rows) => println!("Got {} rows", rows.len()),
Err(PGError::Timeout(duration)) => {
println!("Query timed out after {:?}", duration);
}
Err(PGError::FailedToReconnect(attempts)) => {
println!("Failed to reconnect after {} attempts", attempts);
}
Err(PGError::Postgres(e)) => {
if e.code().map(|c| c.code().starts_with("23")).unwrap_or(false) {
println!("Constraint violation: {}", e);
}
}
Err(e) => return Err(e),
}
Ok(())
}
The callback runs in a background tokio task. If it panics:
Recommendation: Never panic in callbacks. Use std::panic::catch_unwind if calling untrusted code.
The connect_script and application_name values are interpolated directly into SQL. Do not pass untrusted user input to these methods.
query, query_one, and query_opt clone parameters internallyquery_raw or execute_rawFor production use, configure TLS:
// Without TLS (development only)
use tokio_postgres::NoTls;
let config = PGRobustClientConfig::new(url, NoTls);
// With rustls
// Add: tokio-postgres-rustls = "0.12"
use tokio_postgres_rustls::MakeRustlsConnect;
let tls = MakeRustlsConnect::new(rustls_config);
let config = PGRobustClientConfig::new(url, tls);
// With native-tls
// Add: postgres-native-tls = "0.5"
use postgres_native_tls::MakeTlsConnector;
let tls = MakeTlsConnector::new(native_tls_connector);
let config = PGRobustClientConfig::new(url, tls);
rt and time featuresSee CHANGELOG.md for a detailed history of changes.
MIT License - see LICENSE for details.