#![allow(dead_code)] use futures; use net2; use rand; use redis; use std::env; use std::fs; use std::io; use std::process; use std::thread::sleep; use std::time::Duration; use std::path::PathBuf; use self::futures::Future; use redis::Value; pub fn current_thread_runtime() -> tokio::runtime::Runtime { let mut builder = tokio::runtime::Builder::new(); #[cfg(feature = "aio")] builder.enable_io(); builder.basic_scheduler().build().unwrap() } pub fn block_on_all(f: F) -> F::Output where F: Future, { current_thread_runtime().block_on(f) } #[cfg(feature = "async-std-comp")] pub fn block_on_all_using_async_std(f: F) -> F::Output where F: Future, { async_std::task::block_on(f) } #[cfg(feature = "cluster")] mod cluster; #[cfg(feature = "cluster")] pub use self::cluster::*; #[derive(PartialEq)] enum ServerType { Tcp, Unix, } pub struct RedisServer { pub process: process::Child, addr: redis::ConnectionAddr, } impl ServerType { fn get_intended() -> ServerType { match env::var("REDISRS_SERVER_TYPE") .ok() .as_ref() .map(|x| &x[..]) { Some("tcp") => ServerType::Tcp, Some("unix") => ServerType::Unix, val => { panic!("Unknown server type {:?}", val); } } } } impl RedisServer { pub fn new() -> RedisServer { let server_type = ServerType::get_intended(); let addr = match server_type { ServerType::Tcp => { // this is technically a race but we can't do better with // the tools that redis gives us :( let listener = net2::TcpBuilder::new_v4() .unwrap() .reuse_address(true) .unwrap() .bind("127.0.0.1:0") .unwrap() .listen(1) .unwrap(); let server_port = listener.local_addr().unwrap().port(); redis::ConnectionAddr::Tcp("127.0.0.1".to_string(), server_port) } ServerType::Unix => { let (a, b) = rand::random::<(u64, u64)>(); let path = format!("/tmp/redis-rs-test-{}-{}.sock", a, b); redis::ConnectionAddr::Unix(PathBuf::from(&path)) } }; RedisServer::new_with_addr(addr, |cmd| cmd.spawn().unwrap()) } pub fn new_with_addr process::Child>( addr: redis::ConnectionAddr, spawner: F, ) -> RedisServer { let mut cmd = process::Command::new("redis-server"); cmd.stdout(process::Stdio::null()) .stderr(process::Stdio::null()); match addr { redis::ConnectionAddr::Tcp(ref bind, server_port) => { cmd.arg("--port") .arg(server_port.to_string()) .arg("--bind") .arg(bind); } redis::ConnectionAddr::Unix(ref path) => { cmd.arg("--port").arg("0").arg("--unixsocket").arg(&path); } }; RedisServer { process: spawner(&mut cmd), addr, } } pub fn wait(&mut self) { self.process.wait().unwrap(); } pub fn get_client_addr(&self) -> &redis::ConnectionAddr { &self.addr } pub fn stop(&mut self) { let _ = self.process.kill(); let _ = self.process.wait(); if let redis::ConnectionAddr::Unix(ref path) = *self.get_client_addr() { fs::remove_file(&path).ok(); } } } impl Drop for RedisServer { fn drop(&mut self) { self.stop() } } pub struct TestContext { pub server: RedisServer, pub client: redis::Client, } impl TestContext { pub fn new() -> TestContext { let server = RedisServer::new(); let client = redis::Client::open(redis::ConnectionInfo { addr: Box::new(server.get_client_addr().clone()), db: 0, passwd: None, }) .unwrap(); let mut con; let millisecond = Duration::from_millis(1); loop { match client.get_connection() { Err(err) => { if err.is_connection_refusal() { sleep(millisecond); } else { panic!("Could not connect: {}", err); } } Ok(x) => { con = x; break; } } } redis::cmd("FLUSHDB").execute(&mut con); TestContext { server, client } } pub fn connection(&self) -> redis::Connection { self.client.get_connection().unwrap() } #[cfg(feature = "aio")] pub async fn async_connection(&self) -> redis::RedisResult { self.client.get_async_connection().await } #[cfg(feature = "async-std-comp")] pub async fn async_connection_async_std(&self) -> redis::RedisResult { self.client.get_async_std_connection().await } pub fn stop_server(&mut self) { self.server.stop(); } #[cfg(feature = "tokio-rt-core")] pub fn multiplexed_async_connection( &self, ) -> impl Future> { self.multiplexed_async_connection_tokio() } #[cfg(feature = "tokio-rt-core")] pub fn multiplexed_async_connection_tokio( &self, ) -> impl Future> { let client = self.client.clone(); async move { client.get_multiplexed_tokio_connection().await } } #[cfg(feature = "async-std-comp")] pub fn multiplexed_async_connection_async_std( &self, ) -> impl Future> { let client = self.client.clone(); async move { client.get_multiplexed_async_std_connection().await } } } pub fn encode_value(value: &Value, writer: &mut W) -> io::Result<()> where W: io::Write, { #![allow(clippy::write_with_newline)] match *value { Value::Nil => write!(writer, "$-1\r\n"), Value::Int(val) => write!(writer, ":{}\r\n", val), Value::Data(ref val) => { write!(writer, "${}\r\n", val.len())?; writer.write_all(val)?; writer.write_all(b"\r\n") } Value::Bulk(ref values) => { write!(writer, "*{}\r\n", values.len())?; for val in values.iter() { encode_value(val, writer)?; } Ok(()) } Value::Okay => write!(writer, "+OK\r\n"), Value::Status(ref s) => write!(writer, "+{}\r\n", s), } }