Crates.io | es-entity |
lib.rs | es-entity |
version | 0.9.2 |
created_at | 2024-12-02 20:07:38.990663+00 |
updated_at | 2025-09-22 15:59:40.652436+00 |
description | Event Sourcing Entity Framework |
homepage | |
repository | https://github.com/GaloyMoney/es-entity |
max_upload_size | |
id | 1469161 |
size | 434,772 |
An Event Sourcing Entity Framework for Rust that simplifies building event-sourced applications with PostgreSQL.
The framework enables writing Entities that are:
Persisted to postgres with:
Book | API Docs | GitHub repository | Cargo package
Free of any unsafe code#![forbid(unsafe_code)]
to ensure everything is implemented in 100% safe Rust.
First you need your entity:
// Define your entity ID (can be any type fulfilling the traits).
es_entity::entity_id! { UserId }
// Define your events
#[derive(EsEvent, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[es_event(id = "UserId")]
pub enum UserEvent {
Initialized { id: UserId, name: String },
NameUpdated { name: String },
}
// Define your entity
// derive_builder::Builder is optional but useful for hydrating
#[derive(EsEntity, Builder)]
#[builder(pattern = "owned", build_fn(error = "EsEntityError"))]
pub struct User {
pub id: UserId,
pub name: String,
// Container for your events
events: EntityEvents<UserEvent>,
}
impl User {
// Mutations append events
pub fn update_name(&mut self, new_name: impl Into<String>) -> Idempotent<()>{
let name = new_name.into();
// Check whether the event was already recorded
idempotency_guard!(
self.events.iter().rev(),
// Return Idempotent::Ignored if this pattern hits
UserEvent::NameUpdated { name: existing_name } if existing_name == &name,
// Stop searching here
=> UserEvent::NameUpdated { .. }
);
self.name = name.clone();
self.events.push(UserEvent::NameUpdated { name });
Idempotent::Executed(())
}
}
// TryFromEvents hydrates the user entity from persisted events.
impl TryFromEvents<UserEvent> for User {
fn try_from_events(events: EntityEvents<UserEvent>) -> Result<Self, EsEntityError> {
// Using derive_builder::Builder to project the current state
// while iterating over the persisted events
let mut builder = UserBuilder::default();
for event in events.iter_all() {
match event {
UserEvent::Initialized { id, name } => {
builder = builder.id(*id).name(name.clone());
}
UserEvent::NameUpdated { name } => {
builder = builder.name(name.clone());
}
}
}
builder.events(events).build()
}
}
Setup your database - each entity needs 2 tables.
-- Index table for queries
CREATE TABLE users (
id UUID PRIMARY KEY,
created_at TIMESTAMPTZ NOT NULL,
name VARCHAR UNIQUE -- Add columns you want to query by
);
-- Event storage table
CREATE TABLE user_events (
id UUID NOT NULL REFERENCES users(id),
sequence INT NOT NULL,
event_type VARCHAR NOT NULL,
event JSONB NOT NULL,
context JSONB DEFAULT NULL,
recorded_at TIMESTAMPTZ NOT NULL,
UNIQUE(id, sequence)
);
Repository methods are generated:
// Define your repository - all CRUD operations are generated!
#[derive(EsRepo)]
#[es_repo(entity = "User", columns(name(ty = "String")))]
pub struct Users {
pool: PgPool,
}
// // Generated Repository fns:
// impl Users {
// // Create operations
// async fn create(&self, new: NewUser) -> Result<User, EsRepoError>;
// async fn create_all(&self, new: Vec<NewUser>) -> Result<Vec<User>, EsRepoError>;
//
// // Query operations
// async fn find_by_id(&self, id: UserId) -> Result<User, EsRepoError>;
// async fn find_by_name(&self, name: &str) -> Result<User, EsRepoError>;
//
// // Update operations
// async fn update(&self, entity: &mut User) -> Result<(), EsRepoError>;
//
// // Paginated listing
// async fn list_by_id(&self, args: PaginatedQueryArgs, direction: ListDirection) -> PaginatedQueryRet;
//
// // etc
// }
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pool = PgPool::connect("postgres://localhost/myapp").await?;
let users = Users { pool };
// Create a new user
let user = users.create(NewUser {
id: UserId::new(),
name: "Alice".to_string(),
}).await?;
// Query by indexed columns
let alice = users.find_by_name("Alice").await?;
// Update with automatic idempotency
let mut user = users.find_by_id(user.id).await?;
if user.update_name("Alice Cooper").did_execute() {
users.update(&mut user).await?;
}
Ok(())
}
Add to your Cargo.toml
:
[dependencies]
es-entity = "0.9"
sqlx = "0.8.3" # Needs to be in scope for entity_id! macro
serde = { version = "1.0.219", features = ["derive"] } # To serialize the `EntityEvent`
derive_builder = "0.20.1" # For hydrating and building the entity state (optional)
All Repository functions exist in 2 flavours.
The _in_op
postfix receives an additional argument for the DB connection.
This enables atomic operations across multiple entities.
let mut tx = pool.begin().await?;
users.create_in_op(&mut tx, new_user).await?;
accounts.create_in_op(&mut tx, new_account).await?;
tx.commit().await?;
Support for aggregates and child entities:
#[derive(EsEntity)]
pub struct Order {
pub id: OrderId,
// Child entity - auto implements Parent<OrderItem> for Order
#[es_entity(nested)]
items: Nested<OrderItem>,
events: EntityEvents<OrderEvent>,
}
#[derive(EsRepo, Debug)]
#[es_repo(
entity = "OrderItem",
// Child repo marks the parent foreign key
columns(order_id(ty = "OrderId", update(persist = false), parent))
)]
struct OrderItems {
pool: PgPool,
}
#[derive(EsRepo)]
#[es_repo(
entity = "Order",
)]
pub struct Orders {
pool: PgPool,
// Parent repo owns the child repo
#[es_repo(nested)]
items: OrderItems,
}
The entity style is easily testable. Hydrate from events, mutate, assert.
#[cfg(test)]
mod tests {
use super::*;
fn test_user(id: UserId) -> User {
let events = EntityEvents::init(
id,
[UserEvent::Initialized {
id,
name: "Alice".to_string()
}],
);
User::try_from_events(events).unwrap();
}
#[test]
fn test_user_update() {
let mut user = test_user(UserId::new());
assert_eq!(user.update_name("Bob"), Idempotent::Executed(()));
assert_eq!(user.update_name("Bob"), Idempotent::Ignored(()));
}
}
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.