use serde::{Deserialize, Serialize}; use tracing_subscriber::EnvFilter; use tracing_subscriber::filter::LevelFilter; use spectroscopy::errors::{DeserializeError, SerializeError}; use spectroscopy::mapping::{Mapper, ResolveMapping}; use spectroscopy::{Event, Projection}; pub struct Counter { pub state: u32, } impl ResolveMapping for Counter { fn mapping(mapper: &mut Mapper) { mapper.register::(); } } #[async_trait::async_trait] impl Publisher for Counter { type Event = CounterEvent; type Rejection = (); async fn publish(&self, command: CounterCommand, _: &mut Context) -> Result { match command { CounterCommand::Increase => Ok(CounterEvent::Increased), CounterCommand::Decrease => Ok(CounterEvent::Decreased) } } } #[async_trait::async_trait] impl Applicator for Counter { async fn apply(&mut self, event: CounterEvent, _: &mut Context) { match event { CounterEvent::Increased => self.state += 1, CounterEvent::Decreased => self.state -= 1, } tracing::debug!("[Counter] current state: {}", self.state); } } #[async_trait::async_trait] impl Projection for Counter { type Rejection = (); async fn first(event: CounterEvent) -> Result { let mut state = 0; match event { CounterEvent::Increased => state += 1, CounterEvent::Decreased => state -= 1, } Ok(Self { state }) } async fn apply(&mut self, event: CounterEvent) -> Result<(), Self::Rejection> { match event { CounterEvent::Increased => self.state += 1, CounterEvent::Decreased => self.state -= 1, } tracing::debug!("[Counter] current state: {}", self.state); Ok(()) } } #[derive(Debug)] pub enum CounterCommand { Increase, Decrease } impl Command for CounterCommand {} #[derive(Debug, Deserialize, Serialize)] pub enum CounterEvent { Increased, Decreased, } impl Event for CounterEvent { const REGISTRY_KEY: &'static str = "counter-event"; fn as_bytes(&self) -> Result, SerializeError> { Ok(serde_json::to_vec(self)?) } fn from_bytes(bytes: &[u8]) -> Result { Ok(serde_json::from_slice(bytes)?) } } use spectroscopy::agent::{AgentExecutor, Applicator, Command, Context, Executor, Publisher}; #[tokio::test] async fn spawn_agent() { std::env::set_var("RUST_LOG", "spectroscopy=trace,test_agent=trace"); tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env()) .with_max_level(LevelFilter::TRACE) .init(); let executor = Executor::default(); let id = "counter-1".to_string(); let counter = Counter { state: 0, }; let agent = executor.spawn(id, counter, 0).await.unwrap(); let cmd1 = CounterCommand::Increase; let cmd2 = CounterCommand::Decrease; let ev1 = agent.publish(cmd1).await.unwrap().unwrap(); let ev2 = agent.publish(cmd2).await.unwrap().unwrap(); agent.apply(ev1).await.unwrap(); agent.apply(ev2).await.unwrap(); }