| Crates.io | drasi-reaction-storedproc-mysql |
| lib.rs | drasi-reaction-storedproc-mysql |
| version | 0.2.1 |
| created_at | 2026-01-15 16:13:40.478955+00 |
| updated_at | 2026-01-23 06:27:17.996404+00 |
| description | MySQL Stored Procedure reaction plugin for Drasi |
| homepage | |
| repository | https://github.com/drasi-project/drasi-core |
| max_upload_size | |
| id | 2045932 |
| size | 243,152 |
A Drasi reaction plugin that invokes MySQL stored procedures when continuous query results change.
The MySQL Stored Procedure reaction enables you to:
@after.fieldName and @before.fieldName syntaxAdd the dependency to your Cargo.toml:
[dependencies]
drasi-reaction-storedproc-mysql = { path = "path/to/drasi-core/components/reactions/storedproc-mysql" }
DELIMITER //
CREATE PROCEDURE add_user(
IN p_id INT,
IN p_name VARCHAR(255),
IN p_email VARCHAR(255)
)
BEGIN
INSERT INTO users_sync (id, name, email)
VALUES (p_id, p_name, p_email);
END //
CREATE PROCEDURE update_user(
IN p_id INT,
IN p_name VARCHAR(255),
IN p_email VARCHAR(255)
)
BEGIN
UPDATE users_sync
SET name = p_name, email = p_email
WHERE id = p_id;
END //
CREATE PROCEDURE delete_user(
IN p_id INT
)
BEGIN
DELETE FROM users_sync WHERE id = p_id;
END //
DELIMITER ;
use drasi_reaction_storedproc_mysql::{MySqlStoredProcReaction, QueryConfig, TemplateSpec};
use drasi_lib::DrasiLib;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let reaction = MySqlStoredProcReaction::builder("user-sync")
.with_hostname("localhost")
.with_port(3306)
.with_database("mydb")
.with_user("root")
.with_password("password")
.with_query("user-changes")
.with_default_template(QueryConfig {
added: Some(TemplateSpec::new("CALL add_user(@after.id, @after.name, @after.email)")),
updated: Some(TemplateSpec::new("CALL update_user(@after.id, @after.name, @after.email)")),
deleted: Some(TemplateSpec::new("CALL delete_user(@before.id)")),
})
.build()
.await?;
let drasi = DrasiLib::builder()
.with_id("my-app")
.with_reaction(reaction)
.build()
.await?;
drasi.start().await?;
tokio::signal::ctrl_c().await?;
Ok(())
}
Use a default template that applies to all queries:
use drasi_reaction_storedproc_mysql::{MySqlStoredProcReaction, QueryConfig, TemplateSpec};
let reaction = MySqlStoredProcReaction::builder("my-reaction")
.with_hostname("localhost")
.with_port(3306)
.with_database("mydb")
.with_user("root")
.with_password("secret")
.with_ssl(true) // Enable SSL/TLS
.with_query("query1")
.with_default_template(QueryConfig {
added: Some(TemplateSpec::new("CALL add_record(@after.id, @after.name)")),
updated: Some(TemplateSpec::new("CALL update_record(@after.id, @after.name)")),
deleted: Some(TemplateSpec::new("CALL delete_record(@before.id)")),
})
.with_command_timeout_ms(30000)
.with_retry_attempts(3)
.build()
.await?;
Configure different stored procedures for different queries:
use drasi_reaction_storedproc_mysql::{MySqlStoredProcReaction, QueryConfig, TemplateSpec};
let reaction = MySqlStoredProcReaction::builder("my-reaction")
.with_hostname("localhost")
.with_port(3306)
.with_database("mydb")
.with_user("root")
.with_password("secret")
.with_query("user-changes")
.with_query("order-changes")
// Default template for most queries
.with_default_template(QueryConfig {
added: Some(TemplateSpec::new("CALL default_add(@after.id)")),
updated: None,
deleted: None,
})
// Special route for critical queries
.with_route("order-changes", QueryConfig {
added: Some(TemplateSpec::new("CALL process_order(@after.order_id, @after.total)")),
updated: Some(TemplateSpec::new("CALL update_order(@after.order_id, @after.status)")),
deleted: Some(TemplateSpec::new("CALL cancel_order(@before.order_id)")),
})
.build()
.await?;
| Option | Description | Type | Default |
|---|---|---|---|
hostname |
Database hostname | String |
"localhost" |
port |
Database port | u16 |
3306 |
user |
Database user | String |
Required |
password |
Database password | String |
Required |
database |
Database name | String |
Required |
ssl |
Enable SSL/TLS | bool |
false |
default_template |
Default template for all queries | Option<QueryConfig> |
None |
routes |
Per-query template configurations | HashMap<String, QueryConfig> |
Empty |
command_timeout_ms |
Command timeout | u64 |
30000 |
retry_attempts |
Number of retries | u32 |
3 |
The reaction uses context-aware field mapping with @after and @before prefixes:
Use @after.fieldName to access the newly added data:
QueryConfig {
added: Some(TemplateSpec::new("CALL add_user(@after.id, @after.name, @after.email)")),
updated: None,
deleted: None,
}
Query result for ADD:
{
"type": "add",
"data": {
"id": 1,
"name": "Alice",
"email": "alice@example.com"
}
}
Executes:
CALL add_user(1, 'Alice', 'alice@example.com')
Use @before.fieldName for old values and @after.fieldName for new values:
QueryConfig {
added: None,
updated: Some(TemplateSpec::new("CALL update_user(@after.id, @before.email, @after.email)")),
deleted: None,
}
Query result for UPDATE:
{
"type": "update",
"data": {
"before": {
"id": 1,
"email": "alice@oldmail.com"
},
"after": {
"id": 1,
"email": "alice@newmail.com"
}
}
}
Executes:
CALL update_user(1, 'alice@oldmail.com', 'alice@newmail.com')
Use @before.fieldName to access the deleted data:
QueryConfig {
added: None,
updated: None,
deleted: Some(TemplateSpec::new("CALL delete_user(@before.id)")),
}
Query result for DELETE:
{
"type": "delete",
"data": {
"id": 1,
"name": "Alice"
}
}
Executes:
CALL delete_user(1)
Access deeply nested fields using dot notation:
TemplateSpec::new("CALL add_address(@after.user.id, @after.location.city, @after.location.floor)")
Query result:
{
"user": {
"id": 123
},
"location": {
"city": "Seattle",
"floor": 5
}
}
Executes:
CALL add_address(123, 'Seattle', 5)
When a query result arrives, the reaction determines which stored procedure to call using the following priority:
Example:
let reaction = MySqlStoredProcReaction::builder("my-reaction")
.with_hostname("localhost")
.with_port(3306)
.with_database("mydb")
.with_user("root")
.with_password("secret")
// Default template - used by most queries
.with_default_template(QueryConfig {
added: Some(TemplateSpec::new("CALL default_add(@after.id)")),
updated: Some(TemplateSpec::new("CALL default_update(@after.id)")),
deleted: None, // No default for deletes
})
// Special handling for critical-query
.with_route("critical-query", QueryConfig {
added: Some(TemplateSpec::new("CALL critical_add(@after.id, @after.priority)")),
updated: None, // Falls back to default_update
deleted: Some(TemplateSpec::new("CALL critical_delete(@before.id)")),
})
.with_query("normal-query")
.with_query("critical-query")
.build()
.await?;
In this example:
normal-query ADD → Uses default_addnormal-query UPDATE → Uses default_updatenormal-query DELETE → Skipped (no template)critical-query ADD → Uses critical_add (route override)critical-query UPDATE → Uses default_update (fallback)critical-query DELETE → Uses critical_delete (route)The reaction includes automatic retry logic with exponential backoff:
The MySQL reaction uses connection pooling for optimal performance. Connections are automatically managed and reused.
Copyright 2025 The Drasi Authors.
Licensed under the Apache License, Version 2.0.