use valkey_module::{ valkey_module, BlockedClient, CallOptionResp, CallOptionsBuilder, CallReply, CallResult, Context, FutureCallReply, PromiseCallReply, ThreadSafeContext, ValkeyError, ValkeyResult, ValkeyString, ValkeyValue, }; use std::thread; use valkey_module::alloc::ValkeyAlloc; fn call_test(ctx: &Context, _: Vec) -> ValkeyResult { let res: String = ctx.call("ECHO", &["TEST"])?.try_into()?; if "TEST" != &res { return Err(ValkeyError::Str("Failed calling 'ECHO TEST'")); } let res: String = ctx.call("ECHO", vec!["TEST"].as_slice())?.try_into()?; if "TEST" != &res { return Err(ValkeyError::Str( "Failed calling 'ECHO TEST' dynamic str vec", )); } let res: String = ctx.call("ECHO", &[b"TEST"])?.try_into()?; if "TEST" != &res { return Err(ValkeyError::Str( "Failed calling 'ECHO TEST' with static [u8]", )); } let res: String = ctx.call("ECHO", vec![b"TEST"].as_slice())?.try_into()?; if "TEST" != &res { return Err(ValkeyError::Str( "Failed calling 'ECHO TEST' dynamic &[u8] vec", )); } let res: String = ctx.call("ECHO", &[&"TEST".to_string()])?.try_into()?; if "TEST" != &res { return Err(ValkeyError::Str("Failed calling 'ECHO TEST' with String")); } let res: String = ctx .call("ECHO", vec![&"TEST".to_string()].as_slice())? .try_into()?; if "TEST" != &res { return Err(ValkeyError::Str( "Failed calling 'ECHO TEST' dynamic &[u8] vec", )); } let res: String = ctx .call("ECHO", &[&ctx.create_string("TEST")])? .try_into()?; if "TEST" != &res { return Err(ValkeyError::Str( "Failed calling 'ECHO TEST' with ValkeyString", )); } let res: String = ctx .call("ECHO", vec![&ctx.create_string("TEST")].as_slice())? .try_into()?; if "TEST" != &res { return Err(ValkeyError::Str( "Failed calling 'ECHO TEST' with dynamic array of ValkeyString", )); } let call_options = CallOptionsBuilder::new().script_mode().errors_as_replies(); let res: CallResult = ctx.call_ext::<&[&str; 0], _>("SHUTDOWN", &call_options.build(), &[]); if let Err(err) = res { let error_msg = err.to_utf8_string().unwrap(); if !error_msg.contains("not allow") { return Err(ValkeyError::String(format!( "Failed to verify error messages, expected error message to contain 'not allow', error message: '{error_msg}'", ))); } } else { return Err(ValkeyError::Str("Failed to set script mode on call_ext")); } // test resp3 on call_ext let call_options = CallOptionsBuilder::new() .script_mode() .resp(CallOptionResp::Resp3) .errors_as_replies() .build(); ctx.call_ext::<_, CallResult>("HSET", &call_options, &["x", "foo", "bar"]) .map_err(|e| -> ValkeyError { e.into() })?; let res: CallReply = ctx .call_ext::<_, CallResult>("HGETALL", &call_options, &["x"]) .map_err(|e| -> ValkeyError { e.into() })?; if let CallReply::Map(map) = res { let res = map.iter().fold(Vec::new(), |mut vec, (key, val)| { if let CallReply::String(key) = key.unwrap() { vec.push(key.to_string().unwrap()); } if let CallReply::String(val) = val.unwrap() { vec.push(val.to_string().unwrap()); } vec }); if res != vec!["foo".to_string(), "bar".to_string()] { return Err(ValkeyError::String( "Reply of hgetall does not match expected value".into(), )); } } else { return Err(ValkeyError::String( "Did not get a set type on hgetall".into(), )); } Ok("pass".into()) } fn call_blocking_internal(ctx: &Context) -> PromiseCallReply { let call_options = CallOptionsBuilder::new().build_blocking(); ctx.call_blocking("blpop", &call_options, &["list", "1"]) } fn call_blocking_handle_future(ctx: &Context, f: FutureCallReply, blocked_client: BlockedClient) { let future_handler = f.set_unblock_handler(move |_ctx, reply| { let thread_ctx = ThreadSafeContext::with_blocked_client(blocked_client); thread_ctx.reply(reply.map_or_else(|e| Err(e.into()), |v| Ok((&v).into()))); }); future_handler.dispose(ctx); } fn call_blocking(ctx: &Context, _: Vec) -> ValkeyResult { let res = call_blocking_internal(ctx); match res { PromiseCallReply::Resolved(r) => r.map_or_else(|e| Err(e.into()), |v| Ok((&v).into())), PromiseCallReply::Future(f) => { let blocked_client = ctx.block_client(); call_blocking_handle_future(ctx, f, blocked_client); Ok(ValkeyValue::NoReply) } } } fn call_blocking_from_detach_ctx(ctx: &Context, _: Vec) -> ValkeyResult { let blocked_client = ctx.block_client(); thread::spawn(move || { let ctx_guard = valkey_module::MODULE_CONTEXT.lock(); let res = call_blocking_internal(&ctx_guard); match res { PromiseCallReply::Resolved(r) => { let thread_ctx = ThreadSafeContext::with_blocked_client(blocked_client); thread_ctx.reply(r.map_or_else(|e| Err(e.into()), |v| Ok((&v).into()))); } PromiseCallReply::Future(f) => { call_blocking_handle_future(&ctx_guard, f, blocked_client); } } }); Ok(ValkeyValue::NoReply) } ////////////////////////////////////////////////////// valkey_module! { name: "call", version: 1, allocator: (ValkeyAlloc, ValkeyAlloc), data_types: [], commands: [ ["call.test", call_test, "", 0, 0, 0], ["call.blocking", call_blocking, "", 0, 0, 0], ["call.blocking_from_detached_ctx", call_blocking_from_detach_ctx, "", 0, 0, 0], ], }