use futures::{future, prelude::*, StreamExt}; use redis::{aio::MultiplexedConnection, cmd, AsyncCommands, ErrorKind, RedisResult}; use crate::support::*; mod support; #[test] fn test_args() { let ctx = TestContext::new(); let connect = ctx.async_connection(); block_on_all(connect.and_then(|mut con| async move { redis::cmd("SET") .arg("key1") .arg(b"foo") .query_async(&mut con) .await?; redis::cmd("SET") .arg(&["key2", "bar"]) .query_async(&mut con) .await?; let result = redis::cmd("MGET") .arg(&["key1", "key2"]) .query_async(&mut con) .await; assert_eq!(result, Ok(("foo".to_string(), b"bar".to_vec()))); result })) .unwrap(); } #[test] fn dont_panic_on_closed_multiplexed_connection() { let ctx = TestContext::new(); let connect = ctx.multiplexed_async_connection(); drop(ctx); block_on_all(async move { connect .and_then(|con| async move { let cmd = move || { let mut con = con.clone(); async move { redis::cmd("SET") .arg("key1") .arg(b"foo") .query_async(&mut con) .await } }; let result: RedisResult<()> = cmd().await; assert_eq!( result.as_ref().unwrap_err().kind(), redis::ErrorKind::IoError, "{}", result.as_ref().unwrap_err() ); cmd().await }) .map(|result| { assert_eq!( result.as_ref().unwrap_err().kind(), redis::ErrorKind::IoError, "{}", result.as_ref().unwrap_err() ); }) .await }); } #[test] fn test_pipeline_transaction() { let ctx = TestContext::new(); block_on_all(async move { let mut con = ctx.async_connection().await?; let mut pipe = redis::pipe(); pipe.atomic() .cmd("SET") .arg("key_1") .arg(42) .ignore() .cmd("SET") .arg("key_2") .arg(43) .ignore() .cmd("MGET") .arg(&["key_1", "key_2"]); pipe.query_async(&mut con) .map_ok(|((k1, k2),): ((i32, i32),)| { assert_eq!(k1, 42); assert_eq!(k2, 43); }) .await }) .unwrap(); } #[test] fn test_pipeline_transaction_with_errors() { use redis::RedisError; let ctx = TestContext::new(); block_on_all(async move { let mut con = ctx.async_connection().await?; con.set::<_, _, ()>("x", 42).await.unwrap(); // Make Redis a replica of a nonexistent master, thereby making it read-only. redis::cmd("slaveof") .arg("1.1.1.1") .arg("1") .query_async::<_, ()>(&mut con) .await .unwrap(); // Ensure that a write command fails with a READONLY error let err: RedisResult<()> = redis::pipe() .atomic() .set("x", 142) .ignore() .get("x") .query_async(&mut con) .await; assert_eq!(err.unwrap_err().kind(), ErrorKind::ReadOnly); let x: i32 = con.get("x").await.unwrap(); assert_eq!(x, 42); Ok::<_, RedisError>(()) }) .unwrap(); } fn test_cmd(con: &MultiplexedConnection, i: i32) -> impl Future> + Send { let mut con = con.clone(); async move { let key = format!("key{i}"); let key_2 = key.clone(); let key2 = format!("key{i}_2"); let key2_2 = key2.clone(); let foo_val = format!("foo{i}"); redis::cmd("SET") .arg(&key[..]) .arg(foo_val.as_bytes()) .query_async(&mut con) .await?; redis::cmd("SET") .arg(&[&key2, "bar"]) .query_async(&mut con) .await?; redis::cmd("MGET") .arg(&[&key_2, &key2_2]) .query_async(&mut con) .map(|result| { assert_eq!(Ok((foo_val, b"bar".to_vec())), result); Ok(()) }) .await } } fn test_error(con: &MultiplexedConnection) -> impl Future> { let mut con = con.clone(); async move { redis::cmd("SET") .query_async(&mut con) .map(|result| match result { Ok(()) => panic!("Expected redis to return an error"), Err(_) => Ok(()), }) .await } } #[test] fn test_args_multiplexed_connection() { let ctx = TestContext::new(); block_on_all(async move { ctx.multiplexed_async_connection() .and_then(|con| { let cmds = (0..100).map(move |i| test_cmd(&con, i)); future::try_join_all(cmds).map_ok(|results| { assert_eq!(results.len(), 100); }) }) .map_err(|err| panic!("{}", err)) .await }) .unwrap(); } #[test] fn test_args_with_errors_multiplexed_connection() { let ctx = TestContext::new(); block_on_all(async move { ctx.multiplexed_async_connection() .and_then(|con| { let cmds = (0..100).map(move |i| { let con = con.clone(); async move { if i % 2 == 0 { test_cmd(&con, i).await } else { test_error(&con).await } } }); future::try_join_all(cmds).map_ok(|results| { assert_eq!(results.len(), 100); }) }) .map_err(|err| panic!("{}", err)) .await }) .unwrap(); } #[test] fn test_transaction_multiplexed_connection() { let ctx = TestContext::new(); block_on_all(async move { ctx.multiplexed_async_connection() .and_then(|con| { let cmds = (0..100).map(move |i| { let mut con = con.clone(); async move { let foo_val = i; let bar_val = format!("bar{i}"); let mut pipe = redis::pipe(); pipe.atomic() .cmd("SET") .arg("key") .arg(foo_val) .ignore() .cmd("SET") .arg(&["key2", &bar_val[..]]) .ignore() .cmd("MGET") .arg(&["key", "key2"]); pipe.query_async(&mut con) .map(move |result| { assert_eq!(Ok(((foo_val, bar_val.into_bytes()),)), result); result }) .await } }); future::try_join_all(cmds) }) .map_ok(|results| { assert_eq!(results.len(), 100); }) .map_err(|err| panic!("{}", err)) .await }) .unwrap(); } fn test_async_scanning(batch_size: usize) { let ctx = TestContext::new(); block_on_all(async move { ctx.multiplexed_async_connection() .and_then(|mut con| { async move { let mut unseen = std::collections::HashSet::new(); for x in 0..batch_size { redis::cmd("SADD") .arg("foo") .arg(x) .query_async(&mut con) .await?; unseen.insert(x); } let mut iter = redis::cmd("SSCAN") .arg("foo") .cursor_arg(0) .clone() .iter_async(&mut con) .await .unwrap(); while let Some(x) = iter.next_item().await { // type inference limitations let x: usize = x; // if this assertion fails, too many items were returned by the iterator. assert!(unseen.remove(&x)); } assert_eq!(unseen.len(), 0); Ok(()) } }) .map_err(|err| panic!("{}", err)) .await }) .unwrap(); } #[test] fn test_async_scanning_big_batch() { test_async_scanning(1000) } #[test] fn test_async_scanning_small_batch() { test_async_scanning(2) } #[test] #[cfg(feature = "script")] fn test_script() { use redis::RedisError; // Note this test runs both scripts twice to test when they have already been loaded // into Redis and when they need to be loaded in let script1 = redis::Script::new("return redis.call('SET', KEYS[1], ARGV[1])"); let script2 = redis::Script::new("return redis.call('GET', KEYS[1])"); let script3 = redis::Script::new("return redis.call('KEYS', '*')"); let ctx = TestContext::new(); block_on_all(async move { let mut con = ctx.multiplexed_async_connection().await?; script1 .key("key1") .arg("foo") .invoke_async(&mut con) .await?; let val: String = script2.key("key1").invoke_async(&mut con).await?; assert_eq!(val, "foo"); let keys: Vec = script3.invoke_async(&mut con).await?; assert_eq!(keys, ["key1"]); script1 .key("key1") .arg("bar") .invoke_async(&mut con) .await?; let val: String = script2.key("key1").invoke_async(&mut con).await?; assert_eq!(val, "bar"); let keys: Vec = script3.invoke_async(&mut con).await?; assert_eq!(keys, ["key1"]); Ok::<_, RedisError>(()) }) .unwrap(); } #[test] #[cfg(feature = "script")] fn test_script_load() { let ctx = TestContext::new(); let script = redis::Script::new("return 'Hello World'"); block_on_all(async move { let mut con = ctx.multiplexed_async_connection().await.unwrap(); let hash = script.prepare_invoke().load_async(&mut con).await.unwrap(); assert_eq!(hash, script.get_hash().to_string()); }); } #[test] #[cfg(feature = "script")] fn test_script_returning_complex_type() { let ctx = TestContext::new(); block_on_all(async { let mut con = ctx.multiplexed_async_connection().await?; redis::Script::new("return {1, ARGV[1], true}") .arg("hello") .invoke_async(&mut con) .map_ok(|(i, s, b): (i32, String, bool)| { assert_eq!(i, 1); assert_eq!(s, "hello"); assert!(b); }) .await }) .unwrap(); } // Allowing `nth(0)` for similarity with the following `nth(1)`. // Allowing `let ()` as `query_async` requries the type it converts the result to. #[allow(clippy::let_unit_value, clippy::iter_nth_zero)] #[tokio::test] async fn io_error_on_kill_issue_320() { let ctx = TestContext::new(); let mut conn_to_kill = ctx.async_connection().await.unwrap(); cmd("CLIENT") .arg("SETNAME") .arg("to-kill") .query_async::<_, ()>(&mut conn_to_kill) .await .unwrap(); let client_list: String = cmd("CLIENT") .arg("LIST") .query_async(&mut conn_to_kill) .await .unwrap(); eprintln!("{client_list}"); let client_to_kill = client_list .split('\n') .find(|line| line.contains("to-kill")) .expect("line") .split(' ') .nth(0) .expect("id") .split('=') .nth(1) .expect("id value"); let mut killer_conn = ctx.async_connection().await.unwrap(); let () = cmd("CLIENT") .arg("KILL") .arg("ID") .arg(client_to_kill) .query_async(&mut killer_conn) .await .unwrap(); let mut killed_client = conn_to_kill; let err = loop { match killed_client.get::<_, Option>("a").await { // We are racing against the server being shutdown so try until we a get an io error Ok(_) => tokio::time::sleep(std::time::Duration::from_millis(50)).await, Err(err) => break err, } }; assert_eq!(err.kind(), ErrorKind::IoError); // Shouldn't this be IoError? } #[tokio::test] async fn invalid_password_issue_343() { let ctx = TestContext::new(); let coninfo = redis::ConnectionInfo { addr: ctx.server.client_addr().clone(), redis: redis::RedisConnectionInfo { db: 0, username: None, password: Some("asdcasc".to_string()), }, }; let client = redis::Client::open(coninfo).unwrap(); let err = client .get_multiplexed_tokio_connection() .await .err() .unwrap(); assert_eq!( err.kind(), ErrorKind::AuthenticationFailed, "Unexpected error: {err}", ); } // Test issue of Stream trait blocking if we try to iterate more than 10 items // https://github.com/mitsuhiko/redis-rs/issues/537 and https://github.com/mitsuhiko/redis-rs/issues/583 #[tokio::test] async fn test_issue_stream_blocks() { let ctx = TestContext::new(); let mut con = ctx.multiplexed_async_connection().await.unwrap(); for i in 0..20usize { let _: () = con.append(format!("test/{i}"), i).await.unwrap(); } let values = con.scan_match::<&str, String>("test/*").await.unwrap(); tokio::time::timeout(std::time::Duration::from_millis(100), async move { let values: Vec<_> = values.collect().await; assert_eq!(values.len(), 20); }) .await .unwrap(); } // Test issue of AsyncCommands::scan returning the wrong number of keys // https://github.com/redis-rs/redis-rs/issues/759 #[tokio::test] async fn test_issue_async_commands_scan_broken() { let ctx = TestContext::new(); let mut con = ctx.async_connection().await.unwrap(); let mut keys: Vec = (0..100).map(|k| format!("async-key{k}")).collect(); keys.sort(); for key in &keys { let _: () = con.set(key, b"foo").await.unwrap(); } let iter: redis::AsyncIter = con.scan().await.unwrap(); let mut keys_from_redis: Vec<_> = iter.collect().await; keys_from_redis.sort(); assert_eq!(keys, keys_from_redis); assert_eq!(keys.len(), 100); } mod pub_sub { use std::collections::HashMap; use std::time::Duration; use super::*; #[test] fn pub_sub_subscription() { use redis::RedisError; let ctx = TestContext::new(); block_on_all(async move { let mut pubsub_conn = ctx.async_connection().await?.into_pubsub(); pubsub_conn.subscribe("phonewave").await?; let mut pubsub_stream = pubsub_conn.on_message(); let mut publish_conn = ctx.async_connection().await?; publish_conn.publish("phonewave", "banana").await?; let msg_payload: String = pubsub_stream.next().await.unwrap().get_payload()?; assert_eq!("banana".to_string(), msg_payload); Ok::<_, RedisError>(()) }) .unwrap(); } #[test] fn pub_sub_unsubscription() { use redis::RedisError; const SUBSCRIPTION_KEY: &str = "phonewave-pub-sub-unsubscription"; let ctx = TestContext::new(); block_on_all(async move { let mut pubsub_conn = ctx.async_connection().await?.into_pubsub(); pubsub_conn.subscribe(SUBSCRIPTION_KEY).await?; pubsub_conn.unsubscribe(SUBSCRIPTION_KEY).await?; let mut conn = ctx.async_connection().await?; let subscriptions_counts: HashMap = redis::cmd("PUBSUB") .arg("NUMSUB") .arg(SUBSCRIPTION_KEY) .query_async(&mut conn) .await?; let subscription_count = *subscriptions_counts.get(SUBSCRIPTION_KEY).unwrap(); assert_eq!(subscription_count, 0); Ok::<_, RedisError>(()) }) .unwrap(); } #[test] fn automatic_unsubscription() { use redis::RedisError; const SUBSCRIPTION_KEY: &str = "phonewave-automatic-unsubscription"; let ctx = TestContext::new(); block_on_all(async move { let mut pubsub_conn = ctx.async_connection().await?.into_pubsub(); pubsub_conn.subscribe(SUBSCRIPTION_KEY).await?; drop(pubsub_conn); let mut conn = ctx.async_connection().await?; let mut subscription_count = 1; // Allow for the unsubscription to occur within 5 seconds for _ in 0..100 { let subscriptions_counts: HashMap = redis::cmd("PUBSUB") .arg("NUMSUB") .arg(SUBSCRIPTION_KEY) .query_async(&mut conn) .await?; subscription_count = *subscriptions_counts.get(SUBSCRIPTION_KEY).unwrap(); if subscription_count == 0 { break; } std::thread::sleep(Duration::from_millis(50)); } assert_eq!(subscription_count, 0); Ok::<_, RedisError>(()) }) .unwrap(); } #[test] fn pub_sub_conn_reuse() { use redis::RedisError; let ctx = TestContext::new(); block_on_all(async move { let mut pubsub_conn = ctx.async_connection().await?.into_pubsub(); pubsub_conn.subscribe("phonewave").await?; pubsub_conn.psubscribe("*").await?; let mut conn = pubsub_conn.into_connection().await; redis::cmd("SET") .arg("foo") .arg("bar") .query_async(&mut conn) .await?; let res: String = redis::cmd("GET").arg("foo").query_async(&mut conn).await?; assert_eq!(&res, "bar"); Ok::<_, RedisError>(()) }) .unwrap(); } #[test] fn pipe_errors_do_not_affect_subsequent_commands() { use redis::RedisError; let ctx = TestContext::new(); block_on_all(async move { let mut conn = ctx.multiplexed_async_connection().await?; conn.lpush::<&str, &str, ()>("key", "value").await?; let res: Result<(String, usize), redis::RedisError> = redis::pipe() .get("key") // WRONGTYPE .llen("key") .query_async(&mut conn) .await; assert!(res.is_err()); let list: Vec = conn.lrange("key", 0, -1).await?; assert_eq!(list, vec!["value".to_owned()]); Ok::<_, RedisError>(()) }) .unwrap(); } }