//! Request/Reply IPC sample. This component will write requests to a channel, //! than wait for the matching replies on a separate channel. The requests are 3 u64 values: //! a request id, and 2 values which the repliers is suppose to sum them up. //! In order to start the requester type cargo run --example req use kekbit::api::EncoderHandler; use kekbit::api::Reader; use kekbit::api::Writer; use kekbit::core::TickUnit::Secs; use kekbit::core::*; use kekbit::retry::*; use std::collections::HashSet; #[inline] fn read_u64(data: &[u8], offset: usize) -> u64 { assert!(offset + 8 <= data.len()); u64::from_le_bytes([ data[offset], data[offset + 1], data[offset + 2], data[offset + 3], data[offset + 4], data[offset + 5], data[offset + 6], data[offset + 7], ]) } fn main() { let args: Vec = std::env::args().skip(1).map(|id| id.parse().unwrap()).collect(); assert!(args.len() == 2); let req_id = 0xCDEF; let req_channel_id = args[0]; let reply_channel_id = args[1]; let timeout_secs = 10; //channel times out in 10 secs let tmp_dir = std::env::temp_dir().join("kekbit").join("req_rep"); let max_msg_size = 1024; let metadata = Metadata::new(req_id, req_channel_id, max_msg_size * 1000, max_msg_size, timeout_secs, Secs); //creates the channel where the requests will be sent together with the associated writer let mut writer = shm_writer(&tmp_dir, &metadata, EncoderHandler::default()).unwrap(); //tries to connect to the channel from where the replies will be read let reader_rep = try_shm_reader(&tmp_dir, reply_channel_id, 15000, 45); if reader_rep.is_err() { println!("Could not connect to replier. Giving up.."); std::process::exit(1); } let mut reader = reader_rep.unwrap(); let mut waiting_for: HashSet = HashSet::new(); let requests: Vec<(u64, u64)> = vec![(1, 1), (2, 2), (3, 3), (4, 4), (5, 5)]; for (i, el) in requests.iter().enumerate() { //send a request let idx = i as u64; let mut msg: [u8; 24] = [0; 24]; msg[0..8].clone_from_slice(&idx.to_le_bytes()); msg[8..16].clone_from_slice(&(&el.0).to_le_bytes()); msg[16..24].clone_from_slice(&(&el.1).to_le_bytes()); writer.write(&msg).unwrap(); println!("Sent request {} ", i); waiting_for.insert(idx); //check for a reply, it may or may not have come yet reader.try_read().expect("Can't access replies queue").map(|bytes_msg| { let id = read_u64(&bytes_msg, 0); let res = read_u64(&bytes_msg, 8); waiting_for.remove(&id); println!("Reply for request {} is {}.", id, res); }); } //check for all replies which are missing let mut msg_iter: RetryIter = reader.try_iter().into(); for read_res in &mut msg_iter { match read_res { ReadResult::Record(msg) => { let id = read_u64(&msg, 0); let res = read_u64(&msg, 8); waiting_for.remove(&id); println!("Reply for request {} is {}.", id, res); if waiting_for.is_empty() { break; } } ReadResult::Nothing => { //just hold your breath std::thread::sleep(std::time::Duration::from_millis(20)); } ReadResult::Failed(err) => { println!("Requests channel read error {:?}", err); break; } } } }