//! Example ABCI application, an in-memory key-value store. use std::{ collections::HashMap, future::Future, pin::Pin, task::{Context, Poll}, }; use bytes::Bytes; use futures::future::FutureExt; use structopt::StructOpt; use tendermint::abci::{Event, EventAttributeIndexExt}; use tendermint::v0_34::abci::{response, Request, Response}; use tower::{Service, ServiceBuilder}; use tower_abci::{ v034::{split, Server}, BoxError, }; /// In-memory, hashmap-backed key-value store ABCI application. #[derive(Clone, Debug, Default)] pub struct KVStore { store: HashMap, height: u32, app_hash: [u8; 8], } impl Service for KVStore { type Response = Response; type Error = BoxError; type Future = Pin> + Send + 'static>>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: Request) -> Self::Future { tracing::info!(?req); let rsp = match req { // handled messages Request::Info(_) => Response::Info(self.info()), Request::Query(query) => Response::Query(self.query(query.data)), Request::DeliverTx(deliver_tx) => Response::DeliverTx(self.deliver_tx(deliver_tx.tx)), Request::Commit => Response::Commit(self.commit()), // unhandled messages Request::Flush => Response::Flush, Request::Echo(_) => Response::Echo(Default::default()), Request::InitChain(_) => Response::InitChain(Default::default()), Request::BeginBlock(_) => Response::BeginBlock(Default::default()), Request::CheckTx(_) => Response::CheckTx(Default::default()), Request::EndBlock(_) => Response::EndBlock(Default::default()), Request::ListSnapshots => Response::ListSnapshots(Default::default()), Request::OfferSnapshot(_) => Response::OfferSnapshot(Default::default()), Request::LoadSnapshotChunk(_) => Response::LoadSnapshotChunk(Default::default()), Request::ApplySnapshotChunk(_) => Response::ApplySnapshotChunk(Default::default()), // See: https://github.com/informalsystems/tendermint-rs/blob/c2b5c9e01eab1c740598aa14375a7453f3bfa436/tendermint/src/abci/doc/request-setoption.md#L1 Request::SetOption(_) => Response::SetOption(response::SetOption { code: tendermint::abci::Code::Ok, log: "undocumented".to_string(), info: "removed from tendermint in 0.35".to_string(), }), }; tracing::info!(?rsp); async move { Ok(rsp) }.boxed() } } impl KVStore { fn info(&self) -> response::Info { response::Info { data: "tower-abci-kvstore-example".to_string(), version: "0.1.0".to_string(), app_version: 1, last_block_height: self.height.into(), last_block_app_hash: self.app_hash.to_vec().try_into().unwrap(), } } fn query(&self, query: Bytes) -> response::Query { let key = String::from_utf8(query.to_vec()).unwrap(); let (value, log) = match self.store.get(&key) { Some(value) => (value.clone(), "exists".to_string()), None => ("".to_string(), "does not exist".to_string()), }; response::Query { log, key: key.into_bytes().into(), value: value.into_bytes().into(), ..Default::default() } } fn deliver_tx(&mut self, tx: Bytes) -> response::DeliverTx { let tx = String::from_utf8(tx.to_vec()).unwrap(); let tx_parts = tx.split('=').collect::>(); let (key, value) = match (tx_parts.first(), tx_parts.get(1)) { (Some(key), Some(value)) => (*key, *value), _ => (tx.as_ref(), tx.as_ref()), }; self.store.insert(key.to_string(), value.to_string()); response::DeliverTx { events: vec![Event::new( "app", vec![ ("key", key).index(), ("index_key", "index is working").index(), ("noindex_key", "index is working").no_index(), ], )], ..Default::default() } } fn commit(&mut self) -> response::Commit { let retain_height = self.height.into(); // As in the other kvstore examples, just use store.len() as the "hash" self.app_hash = (self.store.len() as u64).to_be_bytes(); self.height += 1; response::Commit { data: self.app_hash.to_vec().into(), retain_height, } } } #[derive(Debug, StructOpt)] struct Opt { /// Bind the TCP server to this host. #[structopt(short, long, default_value = "127.0.0.1")] host: String, /// Bind the TCP server to this port. #[structopt(short, long, default_value = "26658")] port: u16, /// Bind the UDS server to this path #[structopt(long)] uds: Option, } #[tokio::main] async fn main() { tracing_subscriber::fmt::init(); let opt = Opt::from_args(); // Construct our ABCI application. let service = KVStore::default(); // Split it into components. let (consensus, mempool, snapshot, info) = split::service(service, 1); // Hand those components to the ABCI server, but customize request behavior // for each category -- for instance, apply load-shedding only to mempool // and info requests, but not to consensus requests. // Note that this example use synchronous execution in `Service::call`, wrapping the end result in a ready future. // If `Service::call` did the actual request handling inside the `async` block as well, then the `consensus` service // below should be wrapped with a `ServiceBuilder::concurrency_limit` to avoid any unintended reordering of message effects. let server_builder = Server::builder() .consensus(consensus) .snapshot(snapshot) .mempool( ServiceBuilder::new() .load_shed() .buffer(10) .service(mempool), ) .info( ServiceBuilder::new() .load_shed() .buffer(100) .rate_limit(50, std::time::Duration::from_secs(1)) .service(info), ); let server = server_builder.finish().unwrap(); if let Some(uds_path) = opt.uds { server.listen_unix(uds_path).await.unwrap(); } else { server .listen_tcp(format!("{}:{}", opt.host, opt.port)) .await .unwrap(); } }