| Crates.io | evidentsource-client |
| lib.rs | evidentsource-client |
| version | 1.0.0-rc1 |
| created_at | 2026-01-15 21:43:17.300719+00 |
| updated_at | 2026-01-15 21:43:17.300719+00 |
| description | Rust client for the EvidentSource event sourcing platform |
| homepage | |
| repository | https://github.com/evidentsystems/evident-stack |
| max_upload_size | |
| id | 2047152 |
| size | 290,441 |
A Rust client library for connecting to EvidentSource servers. This library provides a high-level API for event sourcing with bi-temporal query support.
evidentsource-coreThis crate provides two API layers:
The client module provides type-safe access using domain types:
use evidentsource_client::client::{EvidentSource, Connection, DatabaseName};
let es = EvidentSource::connect_to_server("http://localhost:50051").await?;
let conn = es.connect(&DatabaseName::new("my-db")?).await?;
The client module re-exports all commonly used domain types, so you typically
only need one import path.
The grpc module provides direct protocol buffer access:
use evidentsource_client::grpc::{EvidentSourceClient, proto};
let mut client = EvidentSourceClient::new("http://localhost:50051").await?;
let db = client.fetch_latest_database("my-db".into()).await?;
Use this for custom tooling, debugging, or direct gRPC integration.
Add to your Cargo.toml:
[dependencies]
evidentsource-client = { path = "../client" }
evidentsource-core = { path = "../core" }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
futures = "0.3"
use evidentsource_client::client::{EvidentSource, DatabaseName};
use evidentsource_core::DatabaseProvider;
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Connect to the server
let es = EvidentSource::connect_to_server("http://localhost:50051").await?;
// Connect to a database
let db_name = DatabaseName::new("my-database")?;
let conn = es.connect(&db_name).await?;
// Get the latest revision
let db = conn.latest_database().await?;
println!("Database: {}", db.name());
println!("Revision: {}", db.revision());
println!("Revision timestamp: {}", db.revision_timestamp());
// Graceful shutdown
conn.close().await?;
Ok(())
}
The main entry point for connecting to a server. Use it to:
let es = EvidentSource::connect_to_server("http://localhost:50051").await?;
A long-lived connection to a specific database that maintains a background subscription for live updates. It provides methods to access database state at different revisions.
let conn = es.connect(&DatabaseName::new("my-db")?).await?;
A point-in-time view of the database at a specific revision. Use it to query events and fetch state views.
let db = conn.latest_database().await?;
let events = db.query_events(&selector);
let state = db.view_state(&view_name, version).await?;
EvidentSource supports two time dimensions:
This allows queries like "what was the account balance on March 1st, as known on March 15th?"
use evidentsource_client::client::EvidentSource;
// HTTP connection
let es = EvidentSource::connect_to_server("http://localhost:50051").await?;
// HTTPS connection (TLS automatically configured)
let es = EvidentSource::connect_to_server("https://api.example.com:443").await?;
use evidentsource_core::DatabaseCatalog;
use futures::StreamExt;
// List all databases
let mut databases = es.list_databases();
while let Some(name) = databases.next().await {
println!("Database: {}", name);
}
// Create a database
let db_identity = es.create_database(DatabaseName::new("new-db")?).await?;
println!("Created at: {}", db_identity.created_at());
// Delete a database
es.delete_database(DatabaseName::new("old-db")?).await?;
let conn = es.connect(&DatabaseName::new("my-db")?).await?;
// The connection maintains a live subscription for updates
println!("Connected to: {}", conn.name());
use evidentsource_core::DatabaseProvider;
// Get the latest revision (network call)
let db = conn.latest_database().await?;
// Get the local cached revision (no network call, uses subscription)
let db = conn.local_database();
// Get a specific revision (waits if not yet available)
let db = conn.database_at_revision(42).await?;
// Get the revision that was current at a specific timestamp
use chrono::{DateTime, Utc};
let timestamp: DateTime<Utc> = "2024-03-15T10:00:00Z".parse()?;
let db = conn.database_at_timestamp(timestamp).await?;
use evidentsource_client::client::EventSelector;
use evidentsource_core::DatabaseAtRevision;
use futures::StreamExt;
let db = conn.latest_database().await?;
// Query by event type
let selector = EventSelector::event_type_equals("order.created")?;
let events: Vec<_> = db.query_events(&selector).collect().await;
// Query by subject
let selector = EventSelector::subject_equals("order-123")?;
let events: Vec<_> = db.query_events(&selector).collect().await;
// Combine selectors with method chaining
let selector = EventSelector::event_type_equals("order.created")?
.and(EventSelector::subject_equals("order-123")?);
let events: Vec<_> = db.query_events(&selector).collect().await;
// OR combinations
let selector = EventSelector::event_type_equals("order.created")?
.or(EventSelector::event_type_equals("order.updated")?);
use evidentsource_client::client::{StateViewName, StateView};
use evidentsource_core::DatabaseAtRevision;
let db = conn.latest_database().await?;
// Fetch a state view
let view_name = StateViewName::new("account-summary")?;
let state_view: StateView = db.view_state(&view_name, 1).await?;
// Access the state view content
if let Some(content) = state_view.content {
let account: AccountSummary = serde_json::from_slice(&content)?;
println!("Balance: {}", account.balance);
}
use chrono::{DateTime, Utc};
use evidentsource_core::DatabaseAtRevision;
let db = conn.latest_database().await?;
// Scope to an effective timestamp
let effective_time: DateTime<Utc> = "2024-03-01T00:00:00Z".parse()?;
let view = db.at_effective_timestamp(effective_time);
// State views will be computed as-of the effective timestamp
let state = view.view_state(&view_name, 1).await?;
Preview what the database would look like with uncommitted events:
use cloudevents::EventBuilderV10;
use evidentsource_core::DatabaseAtRevision;
use nonempty::NonEmpty;
use uuid::Uuid;
let db = conn.latest_database().await?;
// Create speculative events using official CloudEvents SDK
// Stream is set via .source() - it's an alias for source in EvidentSource
let stream = "orders/2024-03-15";
let event = EventBuilderV10::new()
.id(Uuid::new_v4().to_string())
.source(stream)
.ty("com.example.order.shipped")
.subject("order-123")
.data("application/json", serde_json::to_string(&OrderShipped {
tracking_number: "1Z999".into()
})?)
.build()?;
// Create a speculative view
let speculative = db.speculate_with_transaction(NonEmpty::new(event));
// Query events (includes speculated events)
let selector = EventSelector::subject_equals("order-123")?;
let all_events: Vec<_> = speculative.query_events(&selector).collect().await;
// Chain multiple speculative transactions
let another_event = EventBuilderV10::new()
.id(Uuid::new_v4().to_string())
.source(stream)
.ty("com.example.order.delivered")
.subject("order-123")
.data("application/json", serde_json::to_string(&OrderDelivered {})?)
.build()?;
let speculative2 = speculative.speculate_with_transaction(NonEmpty::new(another_event));
Use the official cloudevents-sdk crate for type-safe event construction:
use cloudevents::EventBuilderV10;
use uuid::Uuid;
// Build a CloudEvent with the fluent builder API
// Stream is set via .source() - it's an alias for source in EvidentSource
let stream = "orders/2024-03-15";
let event = EventBuilderV10::new()
.id(Uuid::new_v4().to_string())
.source(stream)
.ty("com.example.order.created")
.subject("order-123")
.data("application/json", serde_json::to_string(&OrderCreated {
order_id: "order-123".to_string(),
amount: 99.99,
})?)
.build()?;
use evidentsource_client::client::{EventSelector, AppendCondition};
use evidentsource_core::DatabaseConnection;
use nonempty::NonEmpty;
// Create events to transact
let events = NonEmpty::new(event);
// Optional: add constraints for optimistic concurrency
let constraints = vec![
// Ensure no existing order with this ID (max revision 0 = doesn't exist)
AppendCondition::max(
EventSelector::subject_equals("order-123")?
.and(EventSelector::event_type_equals("com.example.order.created")?),
0,
),
];
// Commit the transaction
let new_db = conn.transact(events, constraints).await?;
println!("New revision: {}", new_db.revision());
use evidentsource_client::client::{StateChangeName, CommandRequest};
use evidentsource_core::DatabaseConnection;
// Using the builder pattern
let state_change = StateChangeName::new("deposit")?;
let request = CommandRequest::json(&DepositCommand { amount: 100.0 })?
.header("account_id", account_id.to_string());
let new_db = conn.execute_state_change(&state_change, 1, request).await?;
// Or using the state change builder for a fluent API
let new_db = conn
.state_change("deposit", 1)
.json(&DepositCommand { amount: 100.0 })?
.execute()
.await?;
// With content schema for validation
let new_db = conn
.state_change("create-account", 1)
.json(&CreateAccountRequest { name: "Alice" })?
.content_schema("https://example.com/schemas/create-account.json")
.execute()
.await?;
use evidentsource_core::DatabaseConnection;
use futures::StreamExt;
// Get batch summaries
let mut summaries = conn.log();
while let Some(summary) = summaries.next().await {
println!("Revision {}: {} events", summary.revision, summary.event_count);
}
// Get full batch details including events
let mut batches = conn.log_detail();
while let Some(batch) = batches.next().await {
for event in batch.events {
println!(" Event: {}", event.id());
}
}
The SDK provides MockDatabase for unit testing state changes without a server:
use evidentsource_functions::testing::{MockDatabase, StateViewResult, TestCloudEventBuilder};
use evidentsource_client::client::EventSelector;
use nonempty::NonEmpty;
#[test]
fn test_deposit_logic() {
// Create a mock database
let mut db = MockDatabase::new("test-db")
.with_revision(100);
// Insert state view data
let account = AccountSummary {
status: AccountStatus::Open,
balance: 500.0,
};
db.insert_state_view_with_params(
"account-summary",
1,
&[("account_id", "acct-123")],
&account,
50,
).unwrap();
// Query the state view
let result: Option<StateViewResult<AccountSummary>> =
db.view_state("account-summary", 1, &[("account_id", "acct-123")]);
assert!(result.is_some());
let result = result.unwrap();
assert_eq!(result.data.balance, 500.0);
assert_eq!(result.revision(), 50);
}
#[test]
fn test_event_queries() {
// Create events
let event1 = TestCloudEventBuilder::new("order.created")
.stream("orders")
.subject("order-1")
.data(&OrderCreated { amount: 100.0 })
.build_core_event();
let event2 = TestCloudEventBuilder::new("order.shipped")
.stream("orders")
.subject("order-1")
.data(&OrderShipped {})
.build_core_event();
// Create database with events
let db = MockDatabase::new("test-db")
.with_events(vec![event1, event2]);
// Query events
let selector = EventSelector::subject_equals("order-1").unwrap();
let events = db.query_events(&selector);
assert_eq!(events.len(), 2);
}
#[test]
fn test_speculative_operations() {
let db = MockDatabase::new("test-db").with_revision(100);
// Create a speculative event
let event = TestCloudEventBuilder::new("order.created")
.stream("orders")
.subject("order-1")
.data(&OrderCreated { amount: 100.0 })
.build_core_event();
// Create speculative view
let spec_db = db.speculate_with_transaction(NonEmpty::new(event));
// Speculative revision includes the new event
assert_eq!(spec_db.revision(), 101);
assert_eq!(spec_db.speculated_event_count(), 1);
// Queries include speculated events
let selector = EventSelector::event_type_equals("order.created").unwrap();
let events = spec_db.query_events(&selector);
assert_eq!(events.len(), 1);
}
use evidentsource_client::Error;
match EvidentSource::connect_to_server("http://invalid:99999").await {
Ok(es) => { /* connected */ }
Err(Error::InvalidUri(e)) => eprintln!("Bad URI: {}", e),
Err(Error::Transport(e)) => eprintln!("Connection failed: {}", e),
Err(Error::GrpcStatus(status)) => eprintln!("gRPC error: {}", status),
}
use evidentsource_client::client::DatabaseError;
match es.connect(&db_name).await {
Ok(conn) => { /* connected */ }
Err(DatabaseError::NotFound(name)) => eprintln!("Database not found: {}", name),
Err(DatabaseError::AlreadyExists(name)) => eprintln!("Already exists: {}", name),
Err(DatabaseError::ServerError(msg)) => eprintln!("Server error: {}", msg),
Err(DatabaseError::Timeout) => eprintln!("Operation timed out"),
}
use evidentsource_client::client::StateChangeError;
match conn.execute_state_change(&name, version, request).await {
Ok(db) => println!("Success! New revision: {}", db.revision()),
Err(StateChangeError::Validation(msg)) => eprintln!("Validation failed: {}", msg),
Err(StateChangeError::NotFound(msg)) => eprintln!("Not found: {}", msg),
Err(StateChangeError::ExecutionError(msg)) => eprintln!("Execution failed: {}", msg),
Err(StateChangeError::Database(db_err)) => eprintln!("Database error: {:?}", db_err),
Err(StateChangeError::Internal(msg)) => eprintln!("Internal error: {}", msg),
}
When you call es.connect(&db_name), the connection:
conn.local_database() always returns recent dataAlways close connections when done to clean up background tasks:
// Explicit close with timeout
conn.close().await?;
If not explicitly closed, the background task will be signaled to stop when the Connection is dropped.
Connection is cheaply cloneable. All clones share the same underlying subscription:
let conn2 = conn.clone();
// Both conn and conn2 see the same updates
For advanced use cases requiring direct protocol buffer access, use the grpc module:
use evidentsource_client::grpc::{EvidentSourceClient, proto, GrpcError};
// Proto types for manual construction
use evidentsource_client::grpc::proto::evidentsource::*;
use evidentsource_client::grpc::proto::cloudevents::*;
let mut client = EvidentSourceClient::new("http://localhost:50051").await?;
// Direct proto methods
let db = client.fetch_latest_database("my-db".into()).await?;
Use this for:
Most applications should use the high-level client module instead.