use std::thread::{spawn, JoinHandle}; use tmq::{reply, Result}; use utils::generate_tcp_address; use zmq::Context; mod utils; #[tokio::test] async fn single_message() -> Result<()> { let address = generate_tcp_address(); let ctx = Context::new(); let recv_sock = reply(&ctx).bind(&address)?; let part2 = "single_message"; let echo = sync_requester(address, 1, part2); let (multipart, send_sock) = recv_sock.recv().await?; assert_eq!(multipart.len(), 2); assert_eq!(multipart[1].as_str().unwrap(), part2); send_sock.send(multipart).await?; echo.join().unwrap(); Ok(()) } #[tokio::test] async fn hammer_reply() -> Result<()> { let address = generate_tcp_address(); let ctx = Context::new(); let mut recv_sock = reply(&ctx).bind(&address)?; let count = 1_000; let part2 = "single_message"; let echo = sync_requester(address, count, part2); for _ in 0..count { let (multipart, send_sock) = recv_sock.recv().await?; assert_eq!(multipart.len(), 2); assert_eq!(multipart[1].as_str().unwrap(), part2); recv_sock = send_sock.send(multipart).await?; } echo.join().unwrap(); Ok(()) } pub fn sync_requester(address: String, count: u64, part2: &'static str) -> JoinHandle<()> { spawn(move || { let socket = Context::new().socket(zmq::SocketType::REQ).unwrap(); socket.connect(&address).unwrap(); for i in 0..count { let msg = format!("Req#{}", i); socket .send_multipart(&[msg.as_bytes(), part2.as_bytes()], 0) .unwrap(); let received = socket.recv_multipart(0).unwrap(); assert_eq!(&received[0], &msg.as_bytes()); } }) }