use std::time::Duration; use rand::Rng; use tokio::time::timeout; use etcd_rs::*; #[macro_use] mod support; use crate::support::{Context, KVOp}; async fn put_and_get(cli: &Client, retry: usize) { for _ in 0..=retry { tokio::time::sleep(Duration::from_secs(3)).await; let k = format!("key-{}", rand::thread_rng().gen::()); let v = rand::thread_rng().gen::().to_string(); let r = cli.put(PutRequest::new(k.clone(), v.clone())).await; if let Some(e) = r.err() { eprintln!("failed to put kv (will retry): {:?}", e); continue; } let r = cli.get(k).await; match r { Ok(resp) => { assert_eq!(1, resp.count); assert_eq!(&v, resp.kvs[0].value_str()); return; } Err(e) => { eprintln!("failed to range kv (will retry): {:?}", e); } } } unreachable!(); } async fn expect_timeout(cli: &Client) { for _ in 0..3 { tokio::time::sleep(Duration::from_millis(100)).await; let res = cli.put(PutRequest::new("foo", "bar")).await; // FIXME check specified error assert!(res.is_err(), "resp = {:?}", res); } } #[tokio::test] async fn test_kv_when_node_stopped() { let ctx = Context::new(false); let cli = ctx.connect_to_cluster().await; put_and_get(&cli, 0).await; ctx.etcd_cluster.stop_node(1); put_and_get(&cli, 5).await; ctx.etcd_cluster.stop_node(2); ctx.etcd_cluster.print_status(); expect_timeout(&cli).await; ctx.etcd_cluster.start_node(1); put_and_get(&cli, 5).await; ctx.etcd_cluster.stop_node(3); expect_timeout(&cli).await; ctx.etcd_cluster.start_node(2); put_and_get(&cli, 5).await; } #[tokio::test] async fn test_kv_when_cluster_down() { let ctx = Context::new(false); let cli = ctx.connect_to_cluster().await; put_and_get(&cli, 0).await; ctx.etcd_cluster.stop_node(1); ctx.etcd_cluster.stop_node(2); ctx.etcd_cluster.stop_node(3); ctx.etcd_cluster.print_status(); expect_timeout(&cli).await; ctx.etcd_cluster.start_node(1); ctx.etcd_cluster.start_node(2); ctx.etcd_cluster.start_node(3); put_and_get(&cli, 5).await; put_and_get(&cli, 0).await; put_and_get(&cli, 0).await; } #[tokio::test] async fn test_watch_when_cluster_down() { let ctx = Context::new(false); let cli = ctx.connect_to_cluster().await; const PREFIX: &str = "prefix-"; let (mut stream, _cancel) = cli .watch(KeyRange::prefix(PREFIX)) .await .expect("watch created"); ctx.etcd_cluster.stop_node(1); ctx.etcd_cluster.stop_node(2); ctx.etcd_cluster.stop_node(3); { let mut interrupted = false; for _ in 0..10 { let x = timeout(Duration::from_secs(1), stream.inbound()).await; match x { Ok(etcd_rs::WatchInbound::Interrupted(_)) => { interrupted = true; break; } Ok(etcd_rs::WatchInbound::Closed) => { panic!("should not close watch stream"); } Err(e) => { println!("timeout: {:?}", e); } Ok(v) => { panic!("should not reach here: {:?}", v) } } } assert!(interrupted); } expect_timeout(&cli).await; ctx.etcd_cluster.start_node(1); ctx.etcd_cluster.start_node(2); ctx.etcd_cluster.start_node(3); tokio::time::sleep(Duration::from_secs(2)).await; put_and_get(&cli, 5).await; // re-connect to cluster put_and_get(&cli, 0).await; put_and_get(&cli, 0).await; let (mut stream, cancel) = cli .watch(KeyRange::prefix(PREFIX)) .await .expect("watch created"); let ops: Vec<_> = vec![ KVOp::Put("foo1".to_owned(), "bar1".to_owned()), KVOp::Put("foo2".to_owned(), "bar2".to_owned()), KVOp::Put("foo3".to_owned(), "bar3".to_owned()), KVOp::Delete("foo1".to_owned()), KVOp::Delete("foo2".to_owned()), ] .into_iter() .map(|op| match op { KVOp::Put(k, v) => KVOp::Put(format!("{}-{}", PREFIX, k), v), KVOp::Delete(k) => KVOp::Delete(format!("{}-{}", PREFIX, k)), }) .collect(); apply_kv_ops!(cli, ops); cancel.cancel().await.expect("watch canceled"); assert_ops_events!(ops, stream); }