| Crates.io | evento-sql |
| lib.rs | evento-sql |
| version | 2.0.0-alpha.12 |
| created_at | 2025-12-25 14:16:12.165981+00 |
| updated_at | 2026-01-21 23:46:22.763318+00 |
| description | SQL database implementations for evento event sourcing library. |
| homepage | |
| repository | https://github.com/timayz/evento |
| max_upload_size | |
| id | 2004592 |
| size | 125,911 |
SQL database implementations for the Evento event sourcing library.
This crate provides SQL-based persistence for events and subscriber state, implementing the Executor trait from evento-core. It supports SQLite, MySQL, and PostgreSQL through feature flags.
Add to your Cargo.toml:
[dependencies]
evento-sql = "2"
By default, all database backends are enabled. To use only specific databases:
[dependencies]
evento-sql = { version = "2", default-features = false, features = ["postgres"] }
sqlite - SQLite database supportmysql - MySQL database supportpostgres - PostgreSQL database supportuse evento_sql::Sql;
use sqlx::sqlite::SqlitePoolOptions;
use sqlx_migrator::{Migrate, Plan};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Create a connection pool
let pool = SqlitePoolOptions::new()
.connect(":memory:")
.await?;
// Run migrations (requires evento-sql-migrator)
let mut conn = pool.acquire().await?;
let migrator = evento_sql_migrator::new::<sqlx::Sqlite>()?;
migrator.run(&mut *conn, &Plan::apply_all()).await?;
drop(conn);
// Create the executor
let executor: Sql<sqlx::Sqlite> = pool.into();
// Use with Evento for event sourcing operations
Ok(())
}
Convenience type aliases are provided for each database:
use evento_sql::{Sqlite, MySql, Postgres};
// These are equivalent:
let executor: Sqlite = pool.into();
let executor: Sql<sqlx::Sqlite> = pool.into();
For CQRS patterns with separate read and write connections:
use evento_sql::{RwSqlite, RwMySql, RwPostgres};
Sql<DB>The main executor type that wraps a SQLx connection pool. Implements the Executor trait with:
read - Query events with filtering and cursor-based paginationwrite - Persist events with optimistic concurrency controlget_subscriber_cursor - Get current cursor position for a subscriberis_subscriber_running - Check if a subscriber is activeupsert_subscriber - Create or update a subscriber recordacknowledge - Update subscriber cursor after processingReaderQuery builder for cursor-based pagination:
use evento_sql::{Reader, Event};
use sea_query::Query;
let statement = Query::select()
.columns([Event::Id, Event::Name, Event::Data])
.from(Event::Table)
.to_owned();
let result = Reader::new(statement)
.forward(10, None) // First 10 events
.execute::<_, MyEvent, _>(&pool)
.await?;
// Navigate pages
if result.page_info.has_next_page {
let next = Reader::new(statement)
.forward(10, result.page_info.end_cursor)
.execute::<_, MyEvent, _>(&pool)
.await?;
}
Sea-query column identifiers for type-safe query building:
Event - Event table columnsSubscriber - Subscriber table columnsSnapshot - Snapshot table columns (deprecated, dropped in M0003)Event data is serialized using bitcode for compact binary representation. The sql_types::Bitcode<T> wrapper provides SQLx integration:
use evento_sql::sql_types::Bitcode;
// Wrap data for storage
let data = Bitcode(my_event_data);
// Encode to bytes
let bytes = data.encode_to();
// Decode from bytes
let decoded = Bitcode::<MyData>::decode_from_bytes(&bytes)?;
This crate expects the database schema created by evento-sql-migrator. See that crate for the full schema definition.
| Column | Type | Description |
|---|---|---|
id |
VARCHAR(26) | Event ID (ULID) |
name |
VARCHAR(50) | Event type name |
aggregator_type |
VARCHAR(50) | Aggregate root type |
aggregator_id |
VARCHAR(26) | Aggregate root instance ID |
version |
INTEGER | Event sequence number |
data |
BLOB | Serialized event data (bitcode) |
metadata |
BLOB | Serialized metadata (bitcode) |
routing_key |
VARCHAR(50) | Optional routing key |
timestamp |
BIGINT | Timestamp (seconds) |
timestamp_subsec |
BIGINT | Sub-second precision |
| Column | Type | Description |
|---|---|---|
key |
VARCHAR(50) | Subscriber identifier (primary key) |
worker_id |
VARCHAR(26) | Current worker ID |
cursor |
TEXT | Current stream position |
lag |
INTEGER | Events behind |
enabled |
BOOLEAN | Active status |
See the LICENSE file in the repository root.