| Crates.io | drasi-bootstrap-postgres |
| lib.rs | drasi-bootstrap-postgres |
| version | 0.1.2 |
| created_at | 2026-01-14 23:33:11.365623+00 |
| updated_at | 2026-01-23 06:18:29.685173+00 |
| description | PostgreSQL bootstrap plugin for Drasi |
| homepage | |
| repository | https://github.com/drasi-project/drasi-core |
| max_upload_size | |
| id | 2044171 |
| size | 121,691 |
A bootstrap provider for Drasi that reads initial data snapshots from PostgreSQL databases. This component enables continuous queries to start with a complete view of existing data before processing incremental changes.
The PostgreSQL Bootstrap Provider creates consistent point-in-time snapshots of PostgreSQL tables and streams the data to Drasi queries. It provides the foundation for continuous queries by establishing the initial state before change tracking begins.
The PostgreSQL Bootstrap Provider can be configured using either the builder pattern (preferred) or a configuration struct.
The builder pattern provides a fluent API for constructing the provider:
use drasi_bootstrap_postgres::PostgresBootstrapProvider;
let provider = PostgresBootstrapProvider::builder()
.with_host("localhost")
.with_port(5432)
.with_database("mydb")
.with_user("postgres")
.with_password("secret")
.with_tables(vec!["users".to_string(), "orders".to_string()])
.with_slot_name("drasi_slot")
.with_publication_name("drasi_pub")
.with_table_key("orders", vec!["order_id".to_string()])
.build();
Alternatively, use the PostgresSourceConfig struct directly:
use drasi_bootstrap_postgres::{PostgresBootstrapProvider, PostgresSourceConfig};
use drasi_lib::config::common::{SslMode, TableKeyConfig};
let config = PostgresSourceConfig {
host: "localhost".to_string(),
port: 5432,
database: "mydb".to_string(),
user: "postgres".to_string(),
password: "secret".to_string(),
tables: vec!["users".to_string(), "orders".to_string()],
slot_name: "drasi_slot".to_string(),
publication_name: "drasi_pub".to_string(),
ssl_mode: SslMode::Disable,
table_keys: vec![
TableKeyConfig {
table: "orders".to_string(),
key_columns: vec!["order_id".to_string()],
}
],
};
let provider = PostgresBootstrapProvider::new(config);
| Name | Description | Data Type | Valid Values | Default |
|---|---|---|---|---|
host |
PostgreSQL server hostname or IP address | String |
Any valid hostname or IP | "localhost" |
port |
PostgreSQL server port | u16 |
1-65535 | 5432 |
database |
Database name to connect to | String |
Any valid PostgreSQL database name | Required (no default) |
user |
Database user for authentication | String |
Valid PostgreSQL username | Required (no default) |
password |
Database password for authentication | String |
Any string | "" (empty) |
tables |
List of tables to bootstrap | Vec<String> |
Valid table names | [] (empty) |
slot_name |
Replication slot name | String |
Valid PostgreSQL identifier | "drasi_slot" |
publication_name |
Publication name for logical replication | String |
Valid PostgreSQL identifier | "drasi_pub" |
ssl_mode |
SSL/TLS connection mode | SslMode |
Disable, Prefer, Require |
Disable |
table_keys |
Custom primary key configurations | Vec<TableKeyConfig> |
List of table/column mappings | [] (empty) |
pub struct TableKeyConfig {
pub table: String, // Table name
pub key_columns: Vec<String>, // Column names that form the key
}
SslMode::Disable - No SSL encryptionSslMode::Prefer - Prefer SSL but allow unencrypted connectionsSslMode::Require - Require SSL encryption (connection fails without SSL)The bootstrap provider reads standard PostgreSQL tables from the public schema. Tables are automatically mapped from Cypher query labels using lowercase conversion.
Example Mapping:
User → PostgreSQL table userOrder → PostgreSQL table orderProduct → PostgreSQL table productThe provider automatically detects primary keys using PostgreSQL system catalogs:
SELECT a.attname as column_name
FROM pg_constraint con
JOIN pg_class c ON con.conrelid = c.oid
JOIN pg_attribute a ON a.attrelid = c.oid
WHERE con.contype = 'p' -- Primary key constraint
AND a.attnum = ANY(con.conkey)
Automatic Detection Example:
-- PostgreSQL table definition
CREATE TABLE users (
user_id SERIAL PRIMARY KEY,
username VARCHAR(100) NOT NULL,
email VARCHAR(255)
);
The provider automatically detects user_id as the primary key and uses it for element ID generation.
For tables without primary keys or when you need custom key columns:
.with_table_key("users", vec!["username".to_string(), "email".to_string()])
Multiple Column Keys:
.with_table_key("user_sessions", vec![
"user_id".to_string(),
"session_id".to_string()
])
Element IDs are generated as: table:value1_value2_...
Example: user_sessions:123_abc-def-456
If no primary key is found and no custom key is configured:
table:uuidWarning message:
No primary key found for table 'users'. Consider adding 'table_keys' configuration.
The provider supports common PostgreSQL data types with automatic conversion:
| PostgreSQL Type | Drasi ElementValue | Notes |
|---|---|---|
boolean |
Bool |
True/false values |
smallint, integer, bigint |
Integer |
Converted to i64 |
real, double precision |
Float |
Converted to f64 |
numeric, decimal |
Float |
Parsed via decimal conversion |
varchar, text, char |
String |
Text data |
timestamp, timestamptz |
String |
ISO 8601 format |
date |
String |
ISO 8601 format |
uuid |
String |
Hyphenated UUID format |
json, jsonb |
String |
JSON serialized |
| Other types | String |
Best-effort string conversion |
NULL values are represented as ElementValue::Null.
use drasi_bootstrap_postgres::PostgresBootstrapProvider;
use drasi_lib::bootstrap::BootstrapProvider;
use drasi_lib::channels::bootstrap_channel;
// Create provider
let provider = PostgresBootstrapProvider::builder()
.with_host("localhost")
.with_port(5432)
.with_database("production_db")
.with_user("drasi_user")
.with_password("secure_password")
.with_tables(vec!["customers".to_string()])
.build();
// Create channel for receiving bootstrap events
let (tx, mut rx) = bootstrap_channel(1000);
// Create bootstrap request (from query)
let request = BootstrapRequest {
query_id: "customer-query".to_string(),
node_labels: vec!["Customer".to_string()],
relation_labels: vec![],
};
// Create context
let context = BootstrapContext {
source_id: "postgres-source".to_string(),
sequence_counter: Arc::new(AtomicU64::new(0)),
};
// Execute bootstrap
let count = provider.bootstrap(request, &context, tx).await?;
println!("Bootstrapped {} records", count);
// Receive events
while let Some(event) = rx.recv().await {
println!("Received: {:?}", event);
}
use drasi_bootstrap_postgres::PostgresBootstrapProvider;
let provider = PostgresBootstrapProvider::builder()
.with_host("db.example.com")
.with_port(5432)
.with_database("ecommerce")
.with_user("app_user")
.with_password("app_password")
// Add multiple tables
.with_table("users")
.with_table("orders")
.with_table("order_items")
// Configure custom keys for tables without primary keys
.with_table_key("order_items", vec![
"order_id".to_string(),
"product_id".to_string()
])
.build();
let request = BootstrapRequest {
query_id: "order-analytics".to_string(),
node_labels: vec![
"User".to_string(),
"Order".to_string(),
"OrderItem".to_string(),
],
relation_labels: vec![],
};
// Bootstrap will process all three tables
let count = provider.bootstrap(request, &context, tx).await?;
use drasi_bootstrap_postgres::PostgresBootstrapProvider;
use drasi_lib::config::common::SslMode;
let provider = PostgresBootstrapProvider::builder()
.with_host("secure-db.example.com")
.with_port(5432)
.with_database("secure_db")
.with_user("secure_user")
.with_password("secure_password")
.with_ssl_mode(SslMode::Require)
.with_tables(vec!["sensitive_data".to_string()])
.build();
use drasi_lib::{Query, DrasiCore};
use drasi_bootstrap_postgres::PostgresBootstrapProvider;
// Create bootstrap provider
let bootstrap = PostgresBootstrapProvider::builder()
.with_host("localhost")
.with_port(5432)
.with_database("mydb")
.with_user("postgres")
.with_password("password")
.build();
// Create query that will be bootstrapped
let query = Query::cypher("user-monitor")
.query("MATCH (u:User) WHERE u.active = true RETURN u")
.from_source("postgres-source")
.build();
// Bootstrap provider will populate initial state when query starts
Element IDs follow these rules:
With Primary Key: table:pk_value1_pk_value2_...
users:123user_sessions:456_abcWithout Primary Key: table:uuid
logs:550e8400-e29b-41d4-a716-446655440000The provider returns errors for:
Tables are discovered dynamically based on query labels. The provider does not require pre-registration of tables—it automatically maps labels to tables and verifies their existence.
Tables from schemas other than public are supported by including the schema in the table name:
.with_table("analytics.user_metrics")
.with_table_key("analytics.user_metrics", vec!["metric_id".to_string()])
Composite keys are fully supported and automatically detected:
CREATE TABLE user_permissions (
user_id INTEGER,
resource_id INTEGER,
permission VARCHAR(50),
PRIMARY KEY (user_id, resource_id)
);
Element IDs will be generated as: user_permissions:123_456
NULL values in primary key columns trigger UUID fallback:
The provider uses the log crate with standard levels:
Enable logging in your application:
env_logger::init();
// Set RUST_LOG=debug for detailed logging
Symptom: Warning message about missing table
Solution: Verify label-to-table mapping:
User maps to table user (lowercase)public schemaSymptom: Warning about missing primary keys, UUIDs used for element IDs
Solution: Configure custom key columns:
.with_table_key("my_table", vec!["id_column".to_string()])
Symptom: Warnings about type conversion failures
Solution:
ElementValue::NullSymptom: Error connecting to PostgreSQL
Solution:
pg_hba.conf for authentication settingsSymptom: High memory usage during bootstrap
Solution:
Copyright 2025 The Drasi Authors.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.