evidentsource-client

Crates.ioevidentsource-client
lib.rsevidentsource-client
version1.0.0-rc1
created_at2026-01-15 21:43:17.300719+00
updated_at2026-01-15 21:43:17.300719+00
descriptionRust client for the EvidentSource event sourcing platform
homepage
repositoryhttps://github.com/evidentsystems/evident-stack
max_upload_size
id2047152
size290,441
Bobby Calderwood (bobby)

documentation

https://docs.evidentstack.com/evidentsource

README

EvidentSource Client

A Rust client library for connecting to EvidentSource servers. This library provides a high-level API for event sourcing with bi-temporal query support.

Features

  • Async/await API built on Tokio
  • Automatic TLS support for HTTPS connections
  • Live subscriptions with automatic reconnection
  • Bi-temporal queries (revision time + effective time)
  • Speculative transactions for previewing uncommitted changes
  • Type-safe domain types from evidentsource-core

API Layers

This crate provides two API layers:

High-Level API (Recommended)

The client module provides type-safe access using domain types:

use evidentsource_client::client::{EvidentSource, Connection, DatabaseName};

let es = EvidentSource::connect_to_server("http://localhost:50051").await?;
let conn = es.connect(&DatabaseName::new("my-db")?).await?;

The client module re-exports all commonly used domain types, so you typically only need one import path.

Low-Level gRPC API (Advanced)

The grpc module provides direct protocol buffer access:

use evidentsource_client::grpc::{EvidentSourceClient, proto};

let mut client = EvidentSourceClient::new("http://localhost:50051").await?;
let db = client.fetch_latest_database("my-db".into()).await?;

Use this for custom tooling, debugging, or direct gRPC integration.

Installation

Add to your Cargo.toml:

[dependencies]
evidentsource-client = { path = "../client" }
evidentsource-core = { path = "../core" }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
futures = "0.3"

Quick Start

use evidentsource_client::client::{EvidentSource, DatabaseName};
use evidentsource_core::DatabaseProvider;
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Connect to the server
    let es = EvidentSource::connect_to_server("http://localhost:50051").await?;

    // Connect to a database
    let db_name = DatabaseName::new("my-database")?;
    let conn = es.connect(&db_name).await?;

    // Get the latest revision
    let db = conn.latest_database().await?;
    println!("Database: {}", db.name());
    println!("Revision: {}", db.revision());
    println!("Revision timestamp: {}", db.revision_timestamp());

    // Graceful shutdown
    conn.close().await?;

    Ok(())
}

Core Concepts

EvidentSource

The main entry point for connecting to a server. Use it to:

  • List available databases
  • Create and delete databases
  • Connect to a specific database
let es = EvidentSource::connect_to_server("http://localhost:50051").await?;

Connection

A long-lived connection to a specific database that maintains a background subscription for live updates. It provides methods to access database state at different revisions.

let conn = es.connect(&DatabaseName::new("my-db")?).await?;

DatabaseAtRevision

A point-in-time view of the database at a specific revision. Use it to query events and fetch state views.

let db = conn.latest_database().await?;
let events = db.query_events(&selector);
let state = db.view_state(&view_name, version).await?;

Bi-temporal Model

EvidentSource supports two time dimensions:

  • Revision time: When events were recorded in the database (immutable)
  • Effective time: When events are considered to have occurred in the business domain

This allows queries like "what was the account balance on March 1st, as known on March 15th?"

API Guide

Connecting to a Server

use evidentsource_client::client::EvidentSource;

// HTTP connection
let es = EvidentSource::connect_to_server("http://localhost:50051").await?;

// HTTPS connection (TLS automatically configured)
let es = EvidentSource::connect_to_server("https://api.example.com:443").await?;

Database Catalog Operations

use evidentsource_core::DatabaseCatalog;
use futures::StreamExt;

// List all databases
let mut databases = es.list_databases();
while let Some(name) = databases.next().await {
    println!("Database: {}", name);
}

// Create a database
let db_identity = es.create_database(DatabaseName::new("new-db")?).await?;
println!("Created at: {}", db_identity.created_at());

// Delete a database
es.delete_database(DatabaseName::new("old-db")?).await?;

Connecting to a Database

let conn = es.connect(&DatabaseName::new("my-db")?).await?;

// The connection maintains a live subscription for updates
println!("Connected to: {}", conn.name());

Working with Revisions

use evidentsource_core::DatabaseProvider;

// Get the latest revision (network call)
let db = conn.latest_database().await?;

// Get the local cached revision (no network call, uses subscription)
let db = conn.local_database();

// Get a specific revision (waits if not yet available)
let db = conn.database_at_revision(42).await?;

// Get the revision that was current at a specific timestamp
use chrono::{DateTime, Utc};
let timestamp: DateTime<Utc> = "2024-03-15T10:00:00Z".parse()?;
let db = conn.database_at_timestamp(timestamp).await?;

Querying Events

use evidentsource_client::client::EventSelector;
use evidentsource_core::DatabaseAtRevision;
use futures::StreamExt;

let db = conn.latest_database().await?;

// Query by event type
let selector = EventSelector::event_type_equals("order.created")?;
let events: Vec<_> = db.query_events(&selector).collect().await;

// Query by subject
let selector = EventSelector::subject_equals("order-123")?;
let events: Vec<_> = db.query_events(&selector).collect().await;

// Combine selectors with method chaining
let selector = EventSelector::event_type_equals("order.created")?
    .and(EventSelector::subject_equals("order-123")?);
let events: Vec<_> = db.query_events(&selector).collect().await;

// OR combinations
let selector = EventSelector::event_type_equals("order.created")?
    .or(EventSelector::event_type_equals("order.updated")?);

Fetching State Views

use evidentsource_client::client::{StateViewName, StateView};
use evidentsource_core::DatabaseAtRevision;

let db = conn.latest_database().await?;

// Fetch a state view
let view_name = StateViewName::new("account-summary")?;
let state_view: StateView = db.view_state(&view_name, 1).await?;

// Access the state view content
if let Some(content) = state_view.content {
    let account: AccountSummary = serde_json::from_slice(&content)?;
    println!("Balance: {}", account.balance);
}

Bi-temporal Queries (Effective Timestamp)

use chrono::{DateTime, Utc};
use evidentsource_core::DatabaseAtRevision;

let db = conn.latest_database().await?;

// Scope to an effective timestamp
let effective_time: DateTime<Utc> = "2024-03-01T00:00:00Z".parse()?;
let view = db.at_effective_timestamp(effective_time);

// State views will be computed as-of the effective timestamp
let state = view.view_state(&view_name, 1).await?;

Speculative Operations

Preview what the database would look like with uncommitted events:

use cloudevents::EventBuilderV10;
use evidentsource_core::DatabaseAtRevision;
use nonempty::NonEmpty;
use uuid::Uuid;

let db = conn.latest_database().await?;

// Create speculative events using official CloudEvents SDK
// Stream is set via .source() - it's an alias for source in EvidentSource
let stream = "orders/2024-03-15";
let event = EventBuilderV10::new()
    .id(Uuid::new_v4().to_string())
    .source(stream)
    .ty("com.example.order.shipped")
    .subject("order-123")
    .data("application/json", serde_json::to_string(&OrderShipped {
        tracking_number: "1Z999".into()
    })?)
    .build()?;

// Create a speculative view
let speculative = db.speculate_with_transaction(NonEmpty::new(event));

// Query events (includes speculated events)
let selector = EventSelector::subject_equals("order-123")?;
let all_events: Vec<_> = speculative.query_events(&selector).collect().await;

// Chain multiple speculative transactions
let another_event = EventBuilderV10::new()
    .id(Uuid::new_v4().to_string())
    .source(stream)
    .ty("com.example.order.delivered")
    .subject("order-123")
    .data("application/json", serde_json::to_string(&OrderDelivered {})?)
    .build()?;
let speculative2 = speculative.speculate_with_transaction(NonEmpty::new(another_event));

Creating CloudEvents

Use the official cloudevents-sdk crate for type-safe event construction:

use cloudevents::EventBuilderV10;
use uuid::Uuid;

// Build a CloudEvent with the fluent builder API
// Stream is set via .source() - it's an alias for source in EvidentSource
let stream = "orders/2024-03-15";
let event = EventBuilderV10::new()
    .id(Uuid::new_v4().to_string())
    .source(stream)
    .ty("com.example.order.created")
    .subject("order-123")
    .data("application/json", serde_json::to_string(&OrderCreated {
        order_id: "order-123".to_string(),
        amount: 99.99,
    })?)
    .build()?;

Transacting Events

use evidentsource_client::client::{EventSelector, AppendCondition};
use evidentsource_core::DatabaseConnection;
use nonempty::NonEmpty;

// Create events to transact
let events = NonEmpty::new(event);

// Optional: add constraints for optimistic concurrency
let constraints = vec![
    // Ensure no existing order with this ID (max revision 0 = doesn't exist)
    AppendCondition::max(
        EventSelector::subject_equals("order-123")?
            .and(EventSelector::event_type_equals("com.example.order.created")?),
        0,
    ),
];

// Commit the transaction
let new_db = conn.transact(events, constraints).await?;
println!("New revision: {}", new_db.revision());

Executing State Changes

use evidentsource_client::client::{StateChangeName, CommandRequest};
use evidentsource_core::DatabaseConnection;

// Using the builder pattern
let state_change = StateChangeName::new("deposit")?;
let request = CommandRequest::json(&DepositCommand { amount: 100.0 })?
    .header("account_id", account_id.to_string());

let new_db = conn.execute_state_change(&state_change, 1, request).await?;

// Or using the state change builder for a fluent API
let new_db = conn
    .state_change("deposit", 1)
    .json(&DepositCommand { amount: 100.0 })?
    .execute()
    .await?;

// With content schema for validation
let new_db = conn
    .state_change("create-account", 1)
    .json(&CreateAccountRequest { name: "Alice" })?
    .content_schema("https://example.com/schemas/create-account.json")
    .execute()
    .await?;

Scanning the Database Log

use evidentsource_core::DatabaseConnection;
use futures::StreamExt;

// Get batch summaries
let mut summaries = conn.log();
while let Some(summary) = summaries.next().await {
    println!("Revision {}: {} events", summary.revision, summary.event_count);
}

// Get full batch details including events
let mut batches = conn.log_detail();
while let Some(batch) = batches.next().await {
    for event in batch.events {
        println!("  Event: {}", event.id());
    }
}

Testing with MockDatabase

The SDK provides MockDatabase for unit testing state changes without a server:

use evidentsource_functions::testing::{MockDatabase, StateViewResult, TestCloudEventBuilder};
use evidentsource_client::client::EventSelector;
use nonempty::NonEmpty;

#[test]
fn test_deposit_logic() {
    // Create a mock database
    let mut db = MockDatabase::new("test-db")
        .with_revision(100);

    // Insert state view data
    let account = AccountSummary {
        status: AccountStatus::Open,
        balance: 500.0,
    };
    db.insert_state_view_with_params(
        "account-summary",
        1,
        &[("account_id", "acct-123")],
        &account,
        50,
    ).unwrap();

    // Query the state view
    let result: Option<StateViewResult<AccountSummary>> =
        db.view_state("account-summary", 1, &[("account_id", "acct-123")]);

    assert!(result.is_some());
    let result = result.unwrap();
    assert_eq!(result.data.balance, 500.0);
    assert_eq!(result.revision(), 50);
}

#[test]
fn test_event_queries() {
    // Create events
    let event1 = TestCloudEventBuilder::new("order.created")
        .stream("orders")
        .subject("order-1")
        .data(&OrderCreated { amount: 100.0 })
        .build_core_event();

    let event2 = TestCloudEventBuilder::new("order.shipped")
        .stream("orders")
        .subject("order-1")
        .data(&OrderShipped {})
        .build_core_event();

    // Create database with events
    let db = MockDatabase::new("test-db")
        .with_events(vec![event1, event2]);

    // Query events
    let selector = EventSelector::subject_equals("order-1").unwrap();
    let events = db.query_events(&selector);
    assert_eq!(events.len(), 2);
}

#[test]
fn test_speculative_operations() {
    let db = MockDatabase::new("test-db").with_revision(100);

    // Create a speculative event
    let event = TestCloudEventBuilder::new("order.created")
        .stream("orders")
        .subject("order-1")
        .data(&OrderCreated { amount: 100.0 })
        .build_core_event();

    // Create speculative view
    let spec_db = db.speculate_with_transaction(NonEmpty::new(event));

    // Speculative revision includes the new event
    assert_eq!(spec_db.revision(), 101);
    assert_eq!(spec_db.speculated_event_count(), 1);

    // Queries include speculated events
    let selector = EventSelector::event_type_equals("order.created").unwrap();
    let events = spec_db.query_events(&selector);
    assert_eq!(events.len(), 1);
}

Error Handling

Client Errors

use evidentsource_client::Error;

match EvidentSource::connect_to_server("http://invalid:99999").await {
    Ok(es) => { /* connected */ }
    Err(Error::InvalidUri(e)) => eprintln!("Bad URI: {}", e),
    Err(Error::Transport(e)) => eprintln!("Connection failed: {}", e),
    Err(Error::GrpcStatus(status)) => eprintln!("gRPC error: {}", status),
}

Database Errors

use evidentsource_client::client::DatabaseError;

match es.connect(&db_name).await {
    Ok(conn) => { /* connected */ }
    Err(DatabaseError::NotFound(name)) => eprintln!("Database not found: {}", name),
    Err(DatabaseError::AlreadyExists(name)) => eprintln!("Already exists: {}", name),
    Err(DatabaseError::ServerError(msg)) => eprintln!("Server error: {}", msg),
    Err(DatabaseError::Timeout) => eprintln!("Operation timed out"),
}

State Change Errors

use evidentsource_client::client::StateChangeError;

match conn.execute_state_change(&name, version, request).await {
    Ok(db) => println!("Success! New revision: {}", db.revision()),
    Err(StateChangeError::Validation(msg)) => eprintln!("Validation failed: {}", msg),
    Err(StateChangeError::NotFound(msg)) => eprintln!("Not found: {}", msg),
    Err(StateChangeError::ExecutionError(msg)) => eprintln!("Execution failed: {}", msg),
    Err(StateChangeError::Database(db_err)) => eprintln!("Database error: {:?}", db_err),
    Err(StateChangeError::Internal(msg)) => eprintln!("Internal error: {}", msg),
}

Connection Lifecycle

Live Subscriptions

When you call es.connect(&db_name), the connection:

  1. Fetches the initial database state
  2. Starts a background task that subscribes to database updates
  3. Automatically reconnects with exponential backoff if the connection drops
  4. Updates internal state so conn.local_database() always returns recent data

Graceful Shutdown

Always close connections when done to clean up background tasks:

// Explicit close with timeout
conn.close().await?;

If not explicitly closed, the background task will be signaled to stop when the Connection is dropped.

Cloning Connections

Connection is cheaply cloneable. All clones share the same underlying subscription:

let conn2 = conn.clone();
// Both conn and conn2 see the same updates

Low-Level gRPC Access

For advanced use cases requiring direct protocol buffer access, use the grpc module:

use evidentsource_client::grpc::{EvidentSourceClient, proto, GrpcError};

// Proto types for manual construction
use evidentsource_client::grpc::proto::evidentsource::*;
use evidentsource_client::grpc::proto::cloudevents::*;

let mut client = EvidentSourceClient::new("http://localhost:50051").await?;

// Direct proto methods
let db = client.fetch_latest_database("my-db".into()).await?;

Use this for:

  • Custom tooling and debugging
  • Direct gRPC integration with other systems
  • Low-level protocol buffer manipulation

Most applications should use the high-level client module instead.

Further Reading

Commit count: 0

cargo fmt