#![allow(dead_code)] use anyhow::anyhow; use anyhow::Result; use async_trait::async_trait; use groupcache::Groupcache; use moka::future::CacheBuilder; use pretty_assertions::assert_eq; use std::collections::HashMap; use std::future::{pending, Future}; use std::sync::{Arc, RwLock}; use std::time::Duration; use tokio::net::TcpListener; use tokio::sync::oneshot::{Receiver, Sender}; use tokio_stream::wrappers::TcpListenerStream; use tonic::codegen::tokio_stream; use tonic::transport::Server; pub static OS_ALLOCATED_PORT_ADDR: &str = "127.0.0.1:0"; pub static HOT_CACHE_TTL: Duration = Duration::from_millis(100); pub fn key_owned_by_instance(instance: TestGroupcache) -> String { format!("{}_0", instance.addr()) } pub fn error_key_on_instance(instance: TestGroupcache) -> String { format!("{}_13", instance.addr()) } pub async fn two_instances() -> Result<(TestGroupcache, TestGroupcache)> { let instance_one = spawn_groupcache("1").await?; let instance_two = spawn_groupcache("2").await?; Ok((instance_one, instance_two)) } pub async fn spawn_instances(n: usize) -> Result> { let mut instances = Vec::new(); for i in 0..n { let instance = spawn_groupcache(&i.to_string()).await?; instances.push(instance); } let first_instance = &instances[0]; for instance in instances.iter().skip(1) { first_instance.add_peer(instance.addr().into()).await?; } Ok(instances) } pub async fn single_instance() -> Result { spawn_groupcache("1").await } pub async fn two_connected_instances() -> Result<(TestGroupcache, TestGroupcache)> { let (instance_one, instance_two) = two_instances().await?; instance_one.add_peer(instance_two.addr().into()).await?; instance_two.add_peer(instance_one.addr().into()).await?; Ok((instance_one, instance_two)) } pub async fn reconnect(instance: TestGroupcache) { let listener = TcpListener::bind(instance.addr()).await.unwrap(); tokio::spawn(async move { Server::builder() .add_service(instance.grpc_service()) .serve_with_incoming(TcpListenerStream::new(listener)) .await .unwrap(); }); } pub async fn two_instances_with_one_disconnected() -> Result<(TestGroupcache, TestGroupcache)> { let (shutdown_signal, shutdown_recv) = tokio::sync::oneshot::channel::<()>(); let (shutdown_done_s, shutdown_done_r) = tokio::sync::oneshot::channel::<()>(); pub async fn shutdown_proxy(shutdown_signal: Receiver<()>, shutdown_done: Sender<()>) { shutdown_signal.await.unwrap(); shutdown_done.send(()).unwrap(); } let instance_one = spawn_groupcache("1").await?; let instance_two = spawn_groupcache_instance( "2", OS_ALLOCATED_PORT_ADDR, shutdown_proxy(shutdown_recv, shutdown_done_s), ) .await?; instance_one.add_peer(instance_two.addr().into()).await?; instance_two.add_peer(instance_one.addr().into()).await?; shutdown_signal.send(()).unwrap(); shutdown_done_r.await.unwrap(); Ok((instance_one, instance_two)) } pub async fn spawn_groupcache(instance_id: &str) -> Result { spawn_groupcache_instance(instance_id, OS_ALLOCATED_PORT_ADDR, pending()).await } pub async fn spawn_groupcache_instance( instance_id: &str, addr: &str, shutdown_signal: impl Future + Send + 'static, ) -> Result { let listener = TcpListener::bind(addr).await.unwrap(); let addr = listener.local_addr()?; let groupcache = Groupcache::builder(addr.into(), TestCacheLoader::new(instance_id)) .hot_cache(CacheBuilder::default().time_to_live(HOT_CACHE_TTL).build()) .build(); let server = groupcache.grpc_service(); tokio::spawn(async move { Server::builder() .add_service(server) .serve_with_incoming_shutdown(TcpListenerStream::new(listener), shutdown_signal) .await .unwrap(); }); Ok(groupcache) } pub async fn success_or_transport_err(key: &str, groupcache: TestGroupcache) { let result = groupcache.get(key).await; match result { Ok(v) => { assert_eq!(v.contains(key), true); } Err(e) => { let error_string = e.to_string(); assert_eq!( error_string.contains("Transport"), true, "expected transport error, got: '{}'", error_string ); } } } #[derive(Default)] pub struct GetAssertions { pub expected_instance_id: Option, pub unexpected_instance_id: Option, pub expected_load_count: Option, } pub async fn successful_get_not_from_instance( key: &str, unexpected_instance_id: &str, groupcache: TestGroupcache, ) { let opts = GetAssertions { unexpected_instance_id: Some(unexpected_instance_id.to_string()), ..GetAssertions::default() }; successful_get_opts(key, groupcache, opts).await; } pub async fn successful_get( key: &str, expected_instance_id: Option<&str>, groupcache: TestGroupcache, ) { let opts = GetAssertions { expected_instance_id: expected_instance_id.map(|s| s.to_string()), ..GetAssertions::default() }; successful_get_opts(key, groupcache, opts).await; } pub async fn successful_get_opts(key: &str, groupcache: TestGroupcache, opts: GetAssertions) { let v = groupcache.get(key).await.expect("get should be successful"); assert_eq!( v.contains(key), true, "expected value to be '{}', got: '{}'", key, v ); if let Some(instance) = opts.expected_instance_id { assert_eq!( v.contains(&format!("INSTANCE_{}", instance)), true, "expected instance id to be '{}', got: '{}'", instance, v ); } if let Some(load) = opts.expected_load_count { assert_eq!( v.contains(&format!("LOAD_{}", load)), true, "expected load count to be '{}', got: '{}'", load, v, ); } } pub type TestGroupcache = Groupcache; pub type CachedValue = String; pub struct TestCacheLoader { instance_id: String, load_counter: Arc>>, } impl TestCacheLoader { pub fn new(instance_id: &str) -> Self { Self { instance_id: instance_id.to_string(), load_counter: Arc::new(RwLock::new(HashMap::new())), } } pub fn count_loads(&self, key: &str) -> Result { let mut lock = self.load_counter.write().unwrap(); let counter = lock.entry(key.to_string()).or_insert(0); *counter += 1; Ok(*counter) } } #[async_trait] impl groupcache::ValueLoader for TestCacheLoader { type Value = CachedValue; async fn load( &self, key: &str, ) -> std::result::Result> { let load_counter = self.count_loads(key)?; return if !key.contains("error") && !key.contains("_13") { Ok(format!( "VAL_INSTANCE_{}_KEY_{}_LOAD_{}", self.instance_id, key, load_counter )) } else { Err(anyhow!("Something bad happened during loading :/").into()) }; } }