# Event sourcing.rs It is an opinionated library used to achieve cqrs/es in Rust. A set of example snippets could be found in the `example` folder. ## Install Event Sourcing RS uses under the hood [`sqlx`]. [`sqlx`]: https://github.com/launchbadge/sqlx ```toml # Cargo.toml [dependencies] # postgres database esrs = { version = "0.17", features = ["postgres"] } sqlx = { version = "0.7", features = ["postgres", "runtime-tokio-native-tls", "uuid", "json", "chrono"] } ``` ## Tracing A tracing span is produced every time a projector is used or a policy is applied to a given event. ## Run examples, tests and linting Start the docker-compose stack ```shell docker compose run --service-ports web bash ``` Run tests. ```shell cargo make test ``` Run linters. ```shell cargo make clippy ``` ## Usage This section presents the integration of the library into your application, enabling you to kickstart a CQRS/Event sourcing implementation. ### Event sourcing > Event sourcing is a software architectural pattern used to capture and store the changes or events that occur in an application's state over time. Instead of persisting the current state of an object or entity, event sourcing focuses on recording a sequence of events that have led to that state. #### Aggregate To start implementing CQRS/Event sourcing in your codebase, first you need to create an `Aggregate`. In the context of event sourcing, an aggregate is a fundamental concept that represents a cluster of domain objects and their state changes. It is one of the key building blocks used to model and manage data in an event-sourced system. An aggregate, therefore, is responsible for processing incoming commands and generating corresponding events that reflect the changes to its state. It encapsulates domain logic and rules related to specific operations on the data it manages. Aggregates provide a clear boundary for consistency and concurrency control within the event-sourced system. Implement your own aggregate: ```rust pub struct Book; impl Aggregate for Book { ... } ``` Each implementation of the `Aggregate` trait is required to define specific types and functions that fulfill its interface: - `const NAME`: constant value. It refers to the unique identifier or key that is used to identify a specific aggregate instance within the system. - `type State`: refers to the current state of a specific aggregate instance at a given point in time. It represents the collective data and attributes that define the entity represented by the aggregate. - `type Command`: a request or a message that represents an intention to perform a specific action or operation on an aggregate instance. - `type Event`: is a representation of a significant state change or an occurrence that has happened to a specific aggregate instance. Aggregate events are at the core of event sourcing as they capture all the changes made to an aggregate over time. - `type Error`: is a representation of every validation failure that can occur while trying to handle a command. - `fn handle_command`: is responsible for processing and validating incoming commands and emitting corresponding events. - `fn apply_event`: this function is responsible for processing individual events, or replaying batch of events, and applying their effects on the `Aggregate`'s state. Here some example implementations: `NAME`: ```rust const NAME: &'static str = "book"; ``` `State` ```rust pub struct BookState { pub leftover: i32, } impl Default for BookState { fn default() -> Self { Self { leftover: 10 } } } ``` `Command` ```rust pub enum BookCommand { Buy { num_of_copies: i32, }, Return { num_of_copies: i32, }, } ``` `Event` ```rust use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Clone)] pub enum BookEvent { Bought { num_of_copies: i32, }, Returned { num_of_copies: i32, } } ``` `Error` ```rust use thiserror::Error; #[derive(Debug, Error)] pub enum BookError { NotEnoughCopies, } ``` And now let's put all together in the `Aggregate`, implementing `handle_command` and `apply_events` functions too. ```rust ... impl Aggregate for Book { const NAME: &'static str = "book"; type State = BookState; type Command = BookCommand; type Event = BookEvent; type Error = BookError; fn handle_command(state: &Self::State, command: Self::Command) -> Result, Self::Error> { match command { BookCommand::Buy { num_of_copies } if state.leftover < num_of_copies => Err(BookError::NotEnoughCopies), BookCommand::Buy { num_of_copies } => Ok(vec![BookEvent::Bought { num_of_copies }]), BookCommand::Return { num_of_copies } => Ok(vec![BookEvent::Returned { num_of_copies }]), } } fn apply_event(state: Self::State, payload: Self::Event) -> Self::State { match payload { BookEvent::Bought { num_of_copies } => BookState { leftover: state.leftover - num_of_copies }, BookEvent::Returned { num_of_copies } => BookState { leftover: state.leftover + num_of_copies }, } } } ``` #### Adding a persistence layer Currently, the only existing persistence layer is the `PgStore`. Its role is to write all the events in a dedicated table in `postgres`. The table name is built using `Aggregate::NAME` constant value concatenated with `_events`. !Important: each `PgStore` is designed to be globally unique per Aggregate. If you require constructing it dynamically at runtime, remember to invoke `without_running_migrations` while building the store to avoid executing migrations. Note: In order to use a `PgStore` a fully implemented `Aggregate` is needed. ```rust use sqlx::{Pool, Postgres}; let pool: Pool = unimplemented!(); // Building a `PgStore` let store: PgStore = PgStoreBuilder::new(pool) .try_build() .await .expect("Failed to create PgStore"); let mut state: BookState = BookState::default(); // Using the store let events: Vec = Book::handle_command(&state, BookCommand::Buy { num_of_copies: 1 }); store.persist(&mut state, events).await?; ``` To alleviate the burden of writing all of this code, you can leverage the `AggregateManager` helper. An `AggregateManager` could be considered as a synchronous `CommandBus`. ##### Decoupling `Aggregate::Event` from the database using `Schema` To avoid strong coupling between the domain events represented by `Aggregate::Event` and the persistence layer. It is possible to introduce a `Schema` type on the `PgStore`. This type must implement `Schema` and `Persistable`. The mechanism enables the domain events to evolve more freely. For example, it is possible to deprecate an event variant making use of the schema (see [deprecating events example](examples/schema/deprecating_events.rs)). Additionally, this mechanism can be used as an alternative for upcasting (see [upcasting example](examples/schema/upcasting.rs)). For an example of how to introduce a schema to an existing application see [introducing schema example](examples/schema/adding_schema.rs). ```rust let manager: AggregateManager<_> = AggregateManager::new(store); manager.handle_command(Default::default(), BookCommand::Buy { num_of_copies: 1 }).await ``` ### CQRS > CQRS stands for Command Query Responsibility Segregation, which is a software architectural pattern that segregates the responsibility for handling read and write operations in an application. The core idea behind CQRS is to split the application's data model into two distinct models: one optimized for read operations (the Query side) and another for write operations (the Command side). #### The sync way In the purist concept of CQRS, this approach might provoke debates. Nevertheless, the underlying intent is to provide timely feedback to users upon completion of operations, considering various scenarios. For instance, it aims to eliminate the need for a hypothetical frontend to continually poll the backend for updates, by ensuring the read side is promptly updated. Additionally, it caters to scenarios where a sequence of operations forms a chain, and completing the entire task is crucial, as in the case of a SAGA pattern. Overall, the primary goal is to enhance user experience and system efficiency by employing well-defined and specialized models for read and write operations, tailoring the architecture to specific use cases. The two main pillars in the context of CQRS/Event sourcing are "Event Handlers" and "Transactional Event Handlers." ##### Event handlers An `EventHandler`, by definition, operates on an eventually consistent basis and primarily serves to update the read side of the application. It is also commonly utilized to execute commands on other aggregates, including the aggregate to which it belongs. `EventHandler`s are infallible. An infallible function is one that is guaranteed never to fail or produce errors under any circumstances. In other words, it is a function that will always produce a valid result without any possibility of raising exceptions, throwing errors, or returning incorrect data. The duty of error handling is up to library user. ```rust pub struct BookEventHandler; #[async_trait::async_trait] impl EventHandler for BookEventHandler { async fn handle(&self, event: &StoreEvent) { // Implementation here } } // Where the store is built.. PgStoreBuilder::new(pool) // ..add your event handler .add_event_handler(BookEventHandler) .try_build() .await .expect("Failed to create PgStore"); ``` ##### Transactional event handlers `TransactionalEventHandlers` are a specialized form of Event Handlers that are designed to maintain transactional consistency between the write and read models. If a failure occurs in a `TransactionalEventHandlers`, it triggers a complete rollback of the transaction and subsequently returns an error to the caller. !Important: using a `TransactionalEventHandler` to execute commands on another aggregate is strongly discouraged. ```rust pub struct BookTransactionalEventHandler; #[async_trait::async_trait] impl TransactionalEventHandler for BookTransactionalEventHandler { async fn handle(&self, event: &StoreEvent, transaction: &mut PgConnection) -> Result<(), PgStoreError> { // Implementation here Ok(()) } } // Where the store is built.. PgStoreBuilder::new(pool) // ..add your transactional event handler .add_transactional_event_handler(BookTransactionalEventHandler) .try_build() .await .expect("Failed to create PgStore"); ``` #### The async way This approach adheres to the more traditional principles of CQRS. When events are emitted from the write side of the application, the corresponding event handlers process them asynchronously. This approach enhances responsiveness and scalability by allowing the application to handle events concurrently. ##### Event bus While right now there are two different implementations of event bus, one using rabbit and the other using kafka, it's possible to have a custom implementation of an event bus. To do so just implement the `EventBus` trait. For the sake of this example we create a strongly coupled event bus while a more generic implementation is preferable. ```rust pub struct BookEventBus; #[async_trait::async_trait] impl EventBus for BookEventBus { async fn publish(&self, store_event: &StoreEvent) { // Implementation herepub struct BookEventBus; #[async_trait::async_trait] impl EventBus for BookEventBus { async fn publish(&self, store_event: &StoreEvent) { // Implementation herre } } } } ``` Subsequently, integrate the `EventBus` into the `PgStore` at build time. ```rust // Where the store is built.. PgStoreBuilder::new(pool) // ..add your event handler .add_event_bus(BookEventBus) .try_build() .await .expect("Failed to create PgStore"); ``` ##### Consuming the bus and the event handler ... again! The subsequent phase to finalize the asynchronous approach is to construct a read side. To accomplish this, we continue to rely on the `EventHandler`. However, the library lacks a built-in consumer. Implementing the consumer is left to the discretion of the library user. The concept behind this "consumer" is to consume messages from the bus and then apply them to each `EventHandler` responsible for carrying out specific tasks. As the implementation closely resembles the "sync way" approach, we avoid to write it again. ### Follow up You can find a comprehensive example, incorporating all the presented contents from the [Usage](#usage) section, in the [readme example](./examples/readme/main.rs).