#![cfg(unix)] use std::{ io::{self, Cursor, Read, Write}, time::{Duration, Instant}, }; #[cfg(not(feature = "async"))] use std::io::sink; #[cfg(not(feature = "async"))] use expectrl::{interact::actions::lookup::Lookup, spawn, stream::stdin::Stdin, NBytes}; #[cfg(not(feature = "async"))] use expectrl::WaitStatus; #[cfg(unix)] #[cfg(not(feature = "async"))] #[ignore = "It requires manual interaction; Or it's necessary to redirect an stdin of current process"] #[test] fn interact_callback() { use expectrl::interact::InteractOptions; let mut input_handle = Lookup::new(); let mut output_handle = Lookup::new(); let mut session = spawn("cat").unwrap(); let mut stdin = Stdin::open().unwrap(); let opts = InteractOptions::default() .on_output(|ctx| { if let Some(m) = output_handle.on(ctx.buf, ctx.eof, b'\n')? { let line = m.before(); println!("Line in output {:?}", String::from_utf8_lossy(line)); } Ok(false) }) .on_input(|ctx| { if input_handle.on(ctx.buf, ctx.eof, "213")?.is_some() { ctx.session.send_line("Hello World")?; } Ok(false) }); session.interact(&mut stdin, sink()).spawn(opts).unwrap(); stdin.close().unwrap(); } #[cfg(unix)] #[cfg(not(feature = "async"))] #[test] fn interact_output_callback() { use expectrl::interact::{InteractOptions, InteractSession}; let mut session = expectrl::spawn("sleep 1 && echo 'Hello World'").unwrap(); let mut stdin = Stdin::open().unwrap(); let stdout = std::io::sink(); let mut state = 0; let mut lookup = Lookup::new(); let interact_opts = InteractOptions::new(&mut state).on_output(|ctx| { if lookup.on(ctx.buf, ctx.eof, "World")?.is_some() { **ctx.state += 1; } Ok(false) }); let mut interact = InteractSession::new(&mut session, &mut stdin, stdout); interact.spawn(interact_opts).unwrap(); stdin.close().unwrap(); // fixme: sometimes it's 0 // I guess because the process gets down to fast. assert!(matches!(state, 1 | 0), "{state:?}"); } #[cfg(unix)] #[cfg(not(feature = "async"))] #[test] fn interact_callbacks_called_after_exit() { use expectrl::interact::InteractOptions; let mut session = expectrl::spawn("echo 'Hello World'").unwrap(); assert_eq!( session.get_process().wait().unwrap(), WaitStatus::Exited(session.get_process().pid(), 0) ); let mut stdin = Stdin::open().unwrap(); let stdout = std::io::sink(); let mut state = 0; let mut lookup = Lookup::new(); session .interact(&mut stdin, stdout) .spawn(&mut InteractOptions::new(&mut state).on_output(|ctx| { if lookup.on(ctx.buf, ctx.eof, "World")?.is_some() { **ctx.state += 1; } Ok(false) })) .unwrap(); stdin.close().unwrap(); assert_eq!(state, 0); } #[cfg(unix)] #[cfg(not(any(feature = "async", feature = "polling")))] #[test] fn interact_callbacks_with_stream_redirection() { use expectrl::interact::InteractOptions; let output_lines = vec![ "NO_MATCHED\n".to_string(), "QWE\n".to_string(), "QW123\n".to_string(), "NO_MATCHED_2\n".to_string(), ]; let reader = ListReaderWithDelayedEof::new(output_lines, Duration::from_secs(2)); let mut writer = io::Cursor::new(vec![0; 2048]); let mut session = spawn("cat").unwrap(); let mut input_handle = Lookup::new(); session .interact(reader, &mut writer) .spawn(InteractOptions::default().on_input(|ctx| { if input_handle.on(ctx.buf, ctx.eof, "QWE")?.is_some() { ctx.session.send_line("Hello World")?; }; Ok(false) })) .unwrap(); let buffer = String::from_utf8_lossy(writer.get_ref()); assert!(buffer.contains("Hello World"), "{buffer:?}"); } #[cfg(unix)] #[cfg(not(any(feature = "async", feature = "polling")))] #[test] fn interact_filters() { use expectrl::interact::InteractOptions; let reader = ReaderWithDelayEof::new("1009\nNO\n", Duration::from_secs(4)); let mut writer = io::Cursor::new(vec![0; 2048]); let mut session = spawn("cat").unwrap(); session .interact(reader, &mut writer) .spawn( InteractOptions::default() .input_filter(|buf| { // ignore 0 chars let v = buf.iter().filter(|&&b| b != b'0').copied().collect(); Ok(v) }) .output_filter(|buf| { // Make NO -> YES let v = buf .chunks(2) .flat_map(|s| match s { &[b'N', b'O'] => &[b'Y', b'E', b'S'], other => other, }) .copied() .collect(); Ok(v) }), ) .unwrap(); let buffer = String::from_utf8_lossy(writer.get_ref()); let buffer = buffer.trim_end_matches(char::from(0)); // fixme: somehow the output is duplicated which is wrong. assert_eq!(buffer, "19\r\nYES\r\n19\r\nYES\r\n"); } #[cfg(all(unix, not(any(feature = "async", feature = "polling"))))] #[test] fn interact_context() { use expectrl::interact::InteractOptions; let mut session = spawn("cat").unwrap(); let reader = ListReaderWithDelayedEof::new( vec![ "QWE\n".into(), "QWE\n".into(), "QWE\n".into(), "QWE\n".into(), ], Duration::from_secs(2), ); let mut writer = io::Cursor::new(vec![0; 2048]); let mut input_data = Lookup::new(); let mut output_data = Lookup::new(); let mut opts = InteractOptions::new((0, 0)) .on_input(|ctx| { if input_data.on(ctx.buf, ctx.eof, "QWE\n")?.is_some() { ctx.state.0 += 1; ctx.session.send_line("123")?; } Ok(false) }) .on_output(|ctx| { if output_data.on(ctx.buf, ctx.eof, NBytes(1))?.is_some() { ctx.state.1 += 1; output_data.clear(); } Ok(false) }); let is_alive = session .interact(reader, &mut writer) .spawn(&mut opts) .unwrap(); let state = opts.into_inner(); assert!(is_alive); assert_eq!(state.0, 4); assert!(state.1 > 0, "{:?}", state.1); let buffer = String::from_utf8_lossy(writer.get_ref()); assert!(buffer.contains("123"), "{buffer:?}"); } #[cfg(all(unix, not(any(feature = "async", feature = "polling"))))] #[test] fn interact_on_output_not_matched() { // Stops interact mode after 123 being read. // Which may cause it to stay buffered in session. // Verify this buffer was cleaned and 123 won't be accessed then. use expectrl::interact::InteractOptions; let reader = ListReaderWithDelayedEof::new( vec![ "QWE\n".to_string(), "123\n".to_string(), String::from_utf8_lossy(&[29]).to_string(), "WWW\n".to_string(), ], Duration::from_secs(2), ); let mut writer = io::Cursor::new(vec![0; 2048]); let mut input = Lookup::new(); let mut session = spawn("cat").unwrap(); let mut opts = InteractOptions::new((0, 0)) .on_input(|ctx| { if input.on(ctx.buf, ctx.eof, "QWE\n")?.is_some() { ctx.state.0 += 1; } if input.on(ctx.buf, ctx.eof, "WWW\n")?.is_some() { ctx.state.1 += 1; } Ok(false) }) .on_output(|_ctx| Ok(false)) .on_idle(|_ctx| { std::thread::sleep(Duration::from_millis(500)); Ok(false) }); let is_alive = session .interact(reader, &mut writer) .spawn(&mut opts) .unwrap(); let state = opts.into_inner(); assert!(is_alive); assert_eq!(state.0, 2); assert_eq!(state.1, 0); let buffer = String::from_utf8_lossy(writer.get_ref()); let buffer = buffer.trim_end_matches(char::from(0)); assert_eq!(buffer, "QWE\r\nQWE\r\n123\r\n123\r\n"); session.send_line("WWW").unwrap(); let m = session.expect("WWW\r\n").unwrap(); assert_ne!(m.before(), b"123\r\n"); assert_eq!(m.before(), b""); } // #[cfg(unix)] // #[cfg(not(feature = "polling"))] // #[cfg(not(feature = "async"))] // #[test] // fn interact_stream_redirection() { // let commands = "Hello World\nIt works :)\n"; // let mut reader = ReaderWithDelayEof::new(commands, Duration::from_secs(4)); // let mut writer = io::Cursor::new(vec![0; 1024]); // let mut session = expectrl::spawn("cat").unwrap(); // let mut opts = expectrl::interact::InteractOptions::default(); // opts.interact(&mut session, &mut reader, &mut writer) // .unwrap(); // drop(opts); // let buffer = String::from_utf8_lossy(writer.get_ref()); // let buffer = buffer.trim_end_matches(char::from(0)); // assert_eq!(buffer, "Hello World\r\nIt works :)\r\n"); // } #[cfg(unix)] #[cfg(feature = "async")] #[test] fn interact_stream_redirection() { use expectrl::interact::InteractOptions; futures_lite::future::block_on(async { let commands = "Hello World\nIt works :)\n"; let reader = ReaderWithDelayEof::new(commands, Duration::from_secs(4)); let mut writer = io::Cursor::new(vec![0; 1024]); let mut session = expectrl::spawn("cat").unwrap(); session .interact(reader, &mut writer) .spawn(InteractOptions::default()) .await .unwrap(); let buffer = String::from_utf8_lossy(writer.get_ref()); let buffer = buffer.trim_end_matches(char::from(0)); assert_eq!( buffer, "Hello World\r\nIt works :)\r\nHello World\r\nIt works :)\r\n" ); }); } #[cfg(feature = "async")] #[test] fn interact_output_callback() { use expectrl::{ interact::{actions::lookup::Lookup, InteractOptions, InteractSession}, stream::stdin::Stdin, }; let mut session = expectrl::spawn("sleep 1 && echo 'Hello World'").unwrap(); let mut stdin = Stdin::open().unwrap(); let stdout = std::io::sink(); let mut otps = InteractOptions::new((0, Lookup::new())).on_output(|ctx| { if ctx.state.1.on(ctx.buf, ctx.eof, "World")?.is_some() { ctx.state.0 += 1; } Ok(false) }); let mut interact = InteractSession::new(&mut session, &mut stdin, stdout); futures_lite::future::block_on(interact.spawn(&mut otps)).unwrap(); let (state, _) = otps.into_inner(); stdin.close().unwrap(); // fixme: sometimes it's 0 // I guess because the process gets down to fast. assert!(matches!(state, 1 | 0), "{state:?}"); } struct ListReaderWithDelayedEof { lines: Vec, eof_timeout: Duration, now: Option, } impl ListReaderWithDelayedEof { #[cfg(not(feature = "async"))] fn new(lines: Vec, eof_timeout: Duration) -> Self { Self { lines, eof_timeout, now: None, } } } impl Read for ListReaderWithDelayedEof { fn read(&mut self, mut buf: &mut [u8]) -> io::Result { if self.now.is_none() { self.now = Some(Instant::now()); } if !self.lines.is_empty() { let line = self.lines.remove(0); buf.write_all(line.as_bytes())?; Ok(line.as_bytes().len()) } else if self.now.unwrap().elapsed() < self.eof_timeout { Err(io::Error::new(io::ErrorKind::WouldBlock, "")) } else { Ok(0) } } } #[cfg(unix)] impl std::os::unix::io::AsRawFd for ListReaderWithDelayedEof { fn as_raw_fd(&self) -> std::os::unix::prelude::RawFd { 0 } } struct ReaderWithDelayEof { inner: Cursor, fire_timeout: Duration, now: Instant, } impl ReaderWithDelayEof where T: AsRef<[u8]>, { fn new(buf: T, timeout: Duration) -> Self { Self { inner: Cursor::new(buf), now: Instant::now(), fire_timeout: timeout, } } } impl Read for ReaderWithDelayEof where T: AsRef<[u8]>, { fn read(&mut self, buf: &mut [u8]) -> io::Result { let n = self.inner.read(buf)?; if n == 0 && self.now.elapsed() < self.fire_timeout { Err(io::Error::new(io::ErrorKind::WouldBlock, "")) } else { Ok(n) } } } #[cfg(feature = "async")] impl futures_lite::AsyncRead for ReaderWithDelayEof where T: AsRef<[u8]> + Unpin, { fn poll_read( self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, buf: &mut [u8], ) -> std::task::Poll> { let result = self.get_mut().read(buf); std::task::Poll::Ready(result) } }