drasi-reaction-storedproc-mysql

Crates.iodrasi-reaction-storedproc-mysql
lib.rsdrasi-reaction-storedproc-mysql
version0.2.1
created_at2026-01-15 16:13:40.478955+00
updated_at2026-01-23 06:27:17.996404+00
descriptionMySQL Stored Procedure reaction plugin for Drasi
homepage
repositoryhttps://github.com/drasi-project/drasi-core
max_upload_size
id2045932
size243,152
maintainers-core (github:drasi-project:maintainers-core)

documentation

README

MySQL Stored Procedure Reaction

A Drasi reaction plugin that invokes MySQL stored procedures when continuous query results change.

Overview

The MySQL Stored Procedure reaction enables you to:

  • Execute different stored procedures for ADD, UPDATE, and DELETE operations
  • Map query result fields to stored procedure parameters using @after.fieldName and @before.fieldName syntax
  • Configure default templates for all queries or per-query routes for specific queries
  • Handle multiple queries with different stored procedure configurations
  • Automatically retry failed procedure calls with exponential backoff
  • Configure connection pooling and timeouts

Installation

Add the dependency to your Cargo.toml:

[dependencies]
drasi-reaction-storedproc-mysql = { path = "path/to/drasi-core/components/reactions/storedproc-mysql" }

Quick Start

1. Create Stored Procedures in MySQL

DELIMITER //

CREATE PROCEDURE add_user(
    IN p_id INT,
    IN p_name VARCHAR(255),
    IN p_email VARCHAR(255)
)
BEGIN
    INSERT INTO users_sync (id, name, email)
    VALUES (p_id, p_name, p_email);
END //

CREATE PROCEDURE update_user(
    IN p_id INT,
    IN p_name VARCHAR(255),
    IN p_email VARCHAR(255)
)
BEGIN
    UPDATE users_sync
    SET name = p_name, email = p_email
    WHERE id = p_id;
END //

CREATE PROCEDURE delete_user(
    IN p_id INT
)
BEGIN
    DELETE FROM users_sync WHERE id = p_id;
END //

DELIMITER ;

2. Create the Reaction

use drasi_reaction_storedproc_mysql::{MySqlStoredProcReaction, QueryConfig, TemplateSpec};
use drasi_lib::DrasiLib;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let reaction = MySqlStoredProcReaction::builder("user-sync")
        .with_hostname("localhost")
        .with_port(3306)
        .with_database("mydb")
        .with_user("root")
        .with_password("password")
        .with_query("user-changes")
        .with_default_template(QueryConfig {
            added: Some(TemplateSpec::new("CALL add_user(@after.id, @after.name, @after.email)")),
            updated: Some(TemplateSpec::new("CALL update_user(@after.id, @after.name, @after.email)")),
            deleted: Some(TemplateSpec::new("CALL delete_user(@before.id)")),
        })
        .build()
        .await?;

    let drasi = DrasiLib::builder()
        .with_id("my-app")
        .with_reaction(reaction)
        .build()
        .await?;

    drasi.start().await?;
    tokio::signal::ctrl_c().await?;

    Ok(())
}

Configuration

Builder API with Default Template

Use a default template that applies to all queries:

use drasi_reaction_storedproc_mysql::{MySqlStoredProcReaction, QueryConfig, TemplateSpec};

let reaction = MySqlStoredProcReaction::builder("my-reaction")
    .with_hostname("localhost")
    .with_port(3306)
    .with_database("mydb")
    .with_user("root")
    .with_password("secret")
    .with_ssl(true)  // Enable SSL/TLS
    .with_query("query1")
    .with_default_template(QueryConfig {
        added: Some(TemplateSpec::new("CALL add_record(@after.id, @after.name)")),
        updated: Some(TemplateSpec::new("CALL update_record(@after.id, @after.name)")),
        deleted: Some(TemplateSpec::new("CALL delete_record(@before.id)")),
    })
    .with_command_timeout_ms(30000)
    .with_retry_attempts(3)
    .build()
    .await?;

Builder API with Query-Specific Routes

Configure different stored procedures for different queries:

use drasi_reaction_storedproc_mysql::{MySqlStoredProcReaction, QueryConfig, TemplateSpec};

let reaction = MySqlStoredProcReaction::builder("my-reaction")
    .with_hostname("localhost")
    .with_port(3306)
    .with_database("mydb")
    .with_user("root")
    .with_password("secret")
    .with_query("user-changes")
    .with_query("order-changes")
    // Default template for most queries
    .with_default_template(QueryConfig {
        added: Some(TemplateSpec::new("CALL default_add(@after.id)")),
        updated: None,
        deleted: None,
    })
    // Special route for critical queries
    .with_route("order-changes", QueryConfig {
        added: Some(TemplateSpec::new("CALL process_order(@after.order_id, @after.total)")),
        updated: Some(TemplateSpec::new("CALL update_order(@after.order_id, @after.status)")),
        deleted: Some(TemplateSpec::new("CALL cancel_order(@before.order_id)")),
    })
    .build()
    .await?;

Configuration Options

Option Description Type Default
hostname Database hostname String "localhost"
port Database port u16 3306
user Database user String Required
password Database password String Required
database Database name String Required
ssl Enable SSL/TLS bool false
default_template Default template for all queries Option<QueryConfig> None
routes Per-query template configurations HashMap<String, QueryConfig> Empty
command_timeout_ms Command timeout u64 30000
retry_attempts Number of retries u32 3

Parameter Mapping

Context-Aware Field Access

The reaction uses context-aware field mapping with @after and @before prefixes:

ADD Operations

Use @after.fieldName to access the newly added data:

QueryConfig {
    added: Some(TemplateSpec::new("CALL add_user(@after.id, @after.name, @after.email)")),
    updated: None,
    deleted: None,
}

Query result for ADD:

{
  "type": "add",
  "data": {
    "id": 1,
    "name": "Alice",
    "email": "alice@example.com"
  }
}

Executes:

CALL add_user(1, 'Alice', 'alice@example.com')

UPDATE Operations

Use @before.fieldName for old values and @after.fieldName for new values:

QueryConfig {
    added: None,
    updated: Some(TemplateSpec::new("CALL update_user(@after.id, @before.email, @after.email)")),
    deleted: None,
}

Query result for UPDATE:

{
  "type": "update",
  "data": {
    "before": {
      "id": 1,
      "email": "alice@oldmail.com"
    },
    "after": {
      "id": 1,
      "email": "alice@newmail.com"
    }
  }
}

Executes:

CALL update_user(1, 'alice@oldmail.com', 'alice@newmail.com')

DELETE Operations

Use @before.fieldName to access the deleted data:

QueryConfig {
    added: None,
    updated: None,
    deleted: Some(TemplateSpec::new("CALL delete_user(@before.id)")),
}

Query result for DELETE:

{
  "type": "delete",
  "data": {
    "id": 1,
    "name": "Alice"
  }
}

Executes:

CALL delete_user(1)

Nested Field Access

Access deeply nested fields using dot notation:

TemplateSpec::new("CALL add_address(@after.user.id, @after.location.city, @after.location.floor)")

Query result:

{
  "user": {
    "id": 123
  },
  "location": {
    "city": "Seattle",
    "floor": 5
  }
}

Executes:

CALL add_address(123, 'Seattle', 5)

Template Routing Priority

When a query result arrives, the reaction determines which stored procedure to call using the following priority:

  1. Query-specific route: If a route exists for the query ID, use its template
  2. Default template: If no route exists, use the default template
  3. Skip: If neither exists for the operation type, skip processing

Example:

let reaction = MySqlStoredProcReaction::builder("my-reaction")
    .with_hostname("localhost")
    .with_port(3306)
    .with_database("mydb")
    .with_user("root")
    .with_password("secret")
    // Default template - used by most queries
    .with_default_template(QueryConfig {
        added: Some(TemplateSpec::new("CALL default_add(@after.id)")),
        updated: Some(TemplateSpec::new("CALL default_update(@after.id)")),
        deleted: None,  // No default for deletes
    })
    // Special handling for critical-query
    .with_route("critical-query", QueryConfig {
        added: Some(TemplateSpec::new("CALL critical_add(@after.id, @after.priority)")),
        updated: None,  // Falls back to default_update
        deleted: Some(TemplateSpec::new("CALL critical_delete(@before.id)")),
    })
    .with_query("normal-query")
    .with_query("critical-query")
    .build()
    .await?;

In this example:

  • normal-query ADD → Uses default_add
  • normal-query UPDATE → Uses default_update
  • normal-query DELETE → Skipped (no template)
  • critical-query ADD → Uses critical_add (route override)
  • critical-query UPDATE → Uses default_update (fallback)
  • critical-query DELETE → Uses critical_delete (route)

Error Handling

The reaction includes automatic retry logic with exponential backoff:

  • Initial retry: 100ms delay
  • Subsequent retries: 200ms, 400ms, 800ms, etc.
  • Max retries: Configurable (default: 3)
  • Timeout: Configurable per command (default: 30s)

Connection Pooling

The MySQL reaction uses connection pooling for optimal performance. Connections are automatically managed and reused.

License

Copyright 2025 The Drasi Authors.

Licensed under the Apache License, Version 2.0.

Commit count: 60

cargo fmt