| Crates.io | drasi-reaction-storedproc-postgres |
| lib.rs | drasi-reaction-storedproc-postgres |
| version | 0.2.1 |
| created_at | 2026-01-15 16:13:49.392989+00 |
| updated_at | 2026-01-23 06:27:53.83406+00 |
| description | PostgreSQL Stored Procedure reaction plugin for Drasi |
| homepage | |
| repository | https://github.com/drasi-project/drasi-core |
| max_upload_size | |
| id | 2045933 |
| size | 211,550 |
A Drasi reaction plugin that invokes PostgreSQL stored procedures when continuous query results change.
The PostgreSQL Stored Procedure reaction enables you to:
@fieldName syntaxAdd the dependency to your Cargo.toml:
[dependencies]
drasi-reaction-storedproc-postgres = { path = "path/to/drasi-core/components/reactions/storedproc-postgres" }
CREATE OR REPLACE PROCEDURE add_user(
p_id INTEGER,
p_name TEXT,
p_email TEXT
)
LANGUAGE plpgsql
AS $$
BEGIN
INSERT INTO users_sync (id, name, email)
VALUES (p_id, p_name, p_email);
END;
$$;
use drasi_reaction_storedproc_postgres::{PostgresStoredProcReaction, QueryConfig, TemplateSpec};
use drasi_lib::DrasiLib;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let reaction = PostgresStoredProcReaction::builder("user-sync")
.with_connection(
"localhost",
5432,
"mydb",
"postgres",
"password"
)
.with_query("user-changes")
.with_default_template(QueryConfig {
added: Some(TemplateSpec::new("CALL add_user(@after.id, @after.name, @after.email)")),
updated: Some(TemplateSpec::new("CALL update_user(@after.id, @after.name, @after.email)")),
deleted: Some(TemplateSpec::new("CALL delete_user(@before.id)")),
})
.build()
.await?;
let drasi = DrasiLib::builder()
.with_id("my-app")
.with_reaction(reaction)
.build()
.await?;
drasi.start().await?;
tokio::signal::ctrl_c().await?;
Ok(())
}
let reaction = PostgresStoredProcReaction::builder("my-reaction")
.with_hostname("localhost")
.with_port(5432)
.with_database("mydb")
.with_user("postgres")
.with_password("secret")
.with_ssl(true) // Enable SSL/TLS
.with_query("query1")
.with_default_template(QueryConfig {
added: Some(TemplateSpec::new("CALL add_record(@after.id, @after.name)")),
updated: Some(TemplateSpec::new("CALL update_record(@after.id, @after.name)")),
deleted: Some(TemplateSpec::new("CALL delete_record(@before.id)")),
})
.with_command_timeout_ms(30000)
.with_retry_attempts(3)
.build()
.await?;
| Option | Description | Type | Default |
|---|---|---|---|
hostname |
Database hostname | String |
"localhost" |
port |
Database port | u16 |
5432 |
user |
Database user | String |
Required |
password |
Database password | String |
Required |
database |
Database name | String |
Required |
ssl |
Enable SSL/TLS | bool |
false |
default_template |
Default templates for all queries | Option<QueryConfig> |
None |
routes |
Query-specific template overrides | HashMap<String, QueryConfig> |
Empty |
command_timeout_ms |
Command timeout | u64 |
30000 |
retry_attempts |
Number of retries | u32 |
3 |
Templates use the @ syntax to reference fields from query results. The reaction provides different data contexts based on the operation type:
@after.field to access the new data@after.field for new data, @before.field for old data@before.field to access the deleted data.with_default_template(QueryConfig {
added: Some(TemplateSpec::new("CALL add_user(@after.id, @after.name, @after.email)")),
updated: Some(TemplateSpec::new("CALL update_user(@after.id, @after.name, @after.email)")),
deleted: Some(TemplateSpec::new("CALL delete_user(@before.id)")),
})
Query result for ADD operation:
{
"id": 1,
"name": "Alice",
"email": "alice@example.com"
}
Executes:
CALL add_user(1, 'Alice', 'alice@example.com')
Access nested fields using dot notation:
TemplateSpec::new("CALL add_address(@after.user.id, @after.address.city)")
You can configure different templates for specific queries using the routes field or the builder's with_route method:
use std::collections::HashMap;
let mut routes = HashMap::new();
routes.insert("user-query".to_string(), QueryConfig {
added: Some(TemplateSpec::new("CALL user_added(@after.id, @after.name)")),
updated: None,
deleted: None,
});
let reaction = PostgresStoredProcReaction::builder("my-reaction")
.with_hostname("localhost")
.with_database("mydb")
.with_user("postgres")
.with_password("secret")
.with_query("user-query")
.with_route("user-query", QueryConfig {
added: Some(TemplateSpec::new("CALL user_added(@after.id, @after.name)")),
..Default::default() // updated and deleted will use default template
})
.build()
.await?;
This example shows how to override only specific operations for a query while falling back to defaults for others:
use drasi_reaction_storedproc_postgres::{PostgresStoredProcReaction, QueryConfig, TemplateSpec};
let reaction = PostgresStoredProcReaction::builder("multi-query-sensor-sync")
.with_hostname("localhost")
.with_port(5432)
.with_database("drasi_test")
.with_user("postgres")
.with_password("mysecret")
// Subscribe to multiple queries
.with_query("high-temp")
.with_query("low-temp")
.with_query("critical-temp")
// Default template - applies to "high-temp" and "low-temp"
.with_default_template(QueryConfig {
added: Some(TemplateSpec::new(
"CALL log_sensor_added(@after.id, @after.temperature, @after.timestamp)"
)),
updated: Some(TemplateSpec::new(
"CALL log_sensor_updated(@after.id, @after.temperature)"
)),
deleted: Some(TemplateSpec::new(
"CALL log_sensor_deleted(@before.id)"
)),
})
// Custom route for critical temperature readings
// Only handles ADD operations, falls back to default for UPDATE/DELETE
.with_route("critical-temp", QueryConfig {
added: Some(TemplateSpec::new(
"CALL log_critical_alert(@after.id, @after.temperature, @after.timestamp)"
)),
..Default::default() // updated and deleted will use default template
})
.with_command_timeout_ms(5000)
.with_retry_attempts(3)
.build()
.await?;
How it works:
CALL log_critical_alert(...) (custom route)CALL log_sensor_updated(...) (falls back to default)CALL log_sensor_deleted(...) (falls back to default)This example shows a reaction handling multiple queries with different stored procedure requirements:
use drasi_reaction_storedproc_postgres::{PostgresStoredProcReaction, QueryConfig, TemplateSpec};
// Create a reaction that:
// 1. Subscribes to 3 different queries: "user-changes", "product-changes", "order-changes"
// 2. Has a default template for most operations
// 3. Overrides only the "product-changes" query with custom procedures
let reaction = PostgresStoredProcReaction::builder("multi-query-sync")
.with_hostname("localhost")
.with_port(5432)
.with_database("mydb")
.with_user("postgres")
.with_password("secret")
// Subscribe to multiple queries
.with_query("user-changes")
.with_query("product-changes")
.with_query("order-changes")
// Default template applies to "user-changes" and "order-changes"
.with_default_template(QueryConfig {
added: Some(TemplateSpec::new("CALL log_entity_added(@after.id, @after.type)")),
updated: Some(TemplateSpec::new("CALL log_entity_updated(@after.id, @after.type)")),
deleted: Some(TemplateSpec::new("CALL log_entity_deleted(@before.id, @before.type)")),
})
// Override "product-changes" with specific procedures
.with_route("product-changes", QueryConfig {
added: Some(TemplateSpec::new(
"CALL sync_product_added(@after.product_id, @after.name, @after.price, @after.inventory)"
)),
updated: Some(TemplateSpec::new(
"CALL sync_product_updated(@after.product_id, @after.price, @after.inventory)"
)),
..Default::default() // deleted will fall back to default template
})
.with_command_timeout_ms(5000)
.with_retry_attempts(3)
.build()
.await?;
How it works:
"user-changes" query → Uses default template
CALL log_entity_added(@after.id, @after.type)CALL log_entity_updated(@after.id, @after.type)CALL log_entity_deleted(@before.id, @before.type)"product-changes" query → Uses custom route (with fallback to default for delete)
CALL sync_product_added(@after.product_id, @after.name, @after.price, @after.inventory)CALL sync_product_updated(@after.product_id, @after.price, @after.inventory)CALL log_entity_deleted(@before.id, @before.type) (falls back to default)"order-changes" query → Uses default template
CALL log_entity_added(@after.id, @after.type)CALL log_entity_updated(@after.id, @after.type)CALL log_entity_deleted(@before.id, @before.type)Note: If a route specifies None for an operation (like deleted: None for product-changes), the reaction will check the default template. If the default template also has None for that operation, no procedure will be called.
The reaction includes automatic retry logic with exponential backoff:
Copyright 2025 The Drasi Authors.
Licensed under the Apache License, Version 2.0.