use futures::{FutureExt, SinkExt, StreamExt, TryFutureExt}; use zmq::{Context, SocketType}; use std::thread::spawn; use tmq::{dealer, Multipart, Result}; use utils::{ generate_tcp_address, hammer_receive, msg, send_multipart_repeated, send_multiparts, sync_echo, sync_receive_multipart_repeated, sync_receive_multiparts, }; mod utils; #[tokio::test] async fn send_single_message() -> Result<()> { let address = generate_tcp_address(); let ctx = Context::new(); let sock = dealer(&ctx).connect(&address)?; let data = vec![vec!["hello", "world"]]; let thread = sync_receive_multiparts(address, SocketType::DEALER, data.clone()); send_multiparts(sock, data).await?; thread.join().unwrap(); Ok(()) } #[tokio::test] async fn send_multiple_messages() -> Result<()> { let address = generate_tcp_address(); let ctx = Context::new(); let sock = dealer(&ctx).connect(&address)?; let data = vec![ vec!["hello", "world"], vec!["second", "message"], vec!["third", "message"], ]; let thread = sync_receive_multiparts(address, SocketType::DEALER, data.clone()); send_multiparts(sock, data).await?; thread.join().unwrap(); Ok(()) } #[tokio::test] async fn send_empty_message() -> Result<()> { let address = generate_tcp_address(); let ctx = Context::new(); let sock = dealer(&ctx).connect(&address)?; let data = vec!["hello", "world"]; let thread = sync_receive_multiparts(address, SocketType::DEALER, vec![data.clone()]); send_multiparts(sock, vec![vec![], vec![], vec![], data]).await?; thread.join().unwrap(); Ok(()) } #[tokio::test] async fn send_hammer() -> Result<()> { let address = generate_tcp_address(); let ctx = Context::new(); let sock = dealer(&ctx).connect(&address)?; let count = 1_000; let data = vec!["hello", "world"]; let thread = sync_receive_multipart_repeated(address, SocketType::DEALER, data.clone(), count); send_multipart_repeated(sock, data, count).await?; thread.join().unwrap(); Ok(()) } #[tokio::test] async fn receive_hammer() -> Result<()> { let address = generate_tcp_address(); let ctx = Context::new(); let sock = dealer(&ctx).bind(&address)?; hammer_receive(sock, address, SocketType::DEALER).await } #[tokio::test] async fn proxy_sequence() -> Result<()> { let address = generate_tcp_address(); let ctx = Context::new(); let mut sock = dealer(&ctx).connect(&address)?; let count = 1_000; let make_msg = |index| -> Multipart { let m1 = format!("Msg #{}", index); let m2 = format!("Msg #{} (contd.)", index); vec![msg(m1.as_bytes()), msg(m2.as_bytes())].into() }; for i in 0..count { sock.send(make_msg(i)).await?; } let echo = sync_echo(address, SocketType::DEALER, count); for i in 0..count { if let Some(multipart) = sock.next().await { let multipart = multipart?; assert_eq!(make_msg(i), multipart); } else { panic!("Stream ended too soon."); } } echo.join().unwrap(); Ok(()) } #[tokio::test] async fn proxy_interleaved() -> Result<()> { let address = generate_tcp_address(); let ctx = Context::new(); let mut sock = dealer(&ctx).connect(&address)?; let count = 1_000; let echo = sync_echo(address, SocketType::DEALER, count); for i in 0..count { let m1 = format!("Msg #{}", i); let m2 = format!("Msg #{} (contd.)", i); sock.send(vec![msg(m1.as_bytes()), msg(m2.as_bytes())]) .await?; if let Some(multipart) = sock.next().await { let multipart = multipart?; let expected: Multipart = vec![msg(m1.as_bytes()), msg(m2.as_bytes())].into(); assert_eq!(expected, multipart); } else { panic!("Iteam in stream is missing."); } } echo.join().unwrap(); Ok(()) } #[tokio::test] async fn split_echo() -> Result<()> { let address = generate_tcp_address(); let ctx = Context::new(); let (tx, rx) = dealer(&ctx).bind(&address)?.split(); let count = 10; let thread = spawn(move || { let ctx = Context::new(); let sender = ctx.socket(SocketType::DEALER).unwrap(); sender.connect(&address).unwrap(); for _ in 0..count { sender .send_multipart(vec!["hello", "world"].into_iter(), 0) .unwrap(); assert_eq!( sender.recv_multipart(0).unwrap(), vec!(b"hello".to_vec(), b"world".to_vec()) ); } }); rx.take(count) .forward(tx) .map_err(|e| panic!("{}", e)) .map(|_| ()) .await; thread.join().unwrap(); Ok(()) } #[tokio::test] async fn split_send_all() -> Result<()> { let address = generate_tcp_address(); let ctx = Context::new(); let (mut tx, _) = dealer(&ctx).connect(&address)?.split(); let count = 10_000; let thread = spawn(move || { let ctx = Context::new(); let receiver = ctx.socket(SocketType::DEALER).unwrap(); receiver.bind(&address).unwrap(); for i in 0..count { let received = receiver.recv_multipart(0).unwrap(); assert_eq!( received, vec!( i.to_string().as_bytes().to_vec(), (i + 1).to_string().as_bytes().to_vec(), ) ); } }); let mut count = futures::stream::iter((0..count).map(|i| { Ok(Multipart::from(vec![ zmq::Message::from(&i.to_string()), zmq::Message::from(&(i + 1).to_string()), ])) })); tx.send_all(&mut count).await?; thread.join().unwrap(); Ok(()) }