use anyhow::{anyhow, Context, Error as AnyError}; use nix::sys::signal::{self, Signal}; use nix::unistd::Pid; use std::collections::HashSet; use std::fmt::Debug; use std::io::{BufRead, BufReader, Read}; use std::path::PathBuf; use std::process::{Child, ChildStderr, ChildStdout, Command, Stdio}; use std::sync::{Arc, Condvar, Mutex}; use std::thread; use std::time::Duration; const DEFAULT_OUTPUT_WAIT_MS: u64 = 3000; pub fn waketimed_command() -> Command { let mut cmd_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); cmd_path.push("../target/debug/waketimed"); let mut cmd = Command::new(&cmd_path); cmd.current_dir(env!("CARGO_MANIFEST_DIR")) .env("WAKETIMED_BUS_ADDRESS", env!("WAKETIMED_BUS_ADDRESS")) .env("WAKETIMED_TEST_MODE", "true") .stdin(Stdio::null()) .stderr(Stdio::piped()) .stdout(Stdio::piped()); cmd } pub struct Supervisor { pid: u32, stdout: Arc>>, stderr: Arc>>, child: Option, } impl Supervisor { pub fn new(mut wtd_proc: Child) -> Self { Self { pid: wtd_proc.id(), stdout: Arc::new(Mutex::new(BufReader::new( wtd_proc .stdout .take() .expect("Cannot get stdout of waketimed."), ))), stderr: Arc::new(Mutex::new(BufReader::new( wtd_proc .stderr .take() .expect("Cannot get stderr of waketimed."), ))), child: Some(wtd_proc), } } pub fn terminate(&mut self) -> Result<(), AnyError> { signal::kill(Pid::from_raw(self.pid as i32), Signal::SIGTERM) .context("Failed to terminate waketimed process.") } pub fn assert_success(&mut self) -> Result<(), AnyError> { let mut child = self .child .take() .expect("Called assert_success but child process has already been consumed."); let rc_result = self .wait_upto_ms_or_kill(3000, move || child.wait()) .context("Failed waiting for waketimed to terminate.")?; match rc_result { Ok(rc) => { if rc.success() { self.dump_stdout_and_stderr(); Ok(()) } else { self.dump_stdout_and_stderr(); Err(anyhow!("Waketimed failed with return code {0}.", rc)) } } Err(e) => Err(e).context("Could not query waketimed process return status."), } } pub fn wait_for_stderr + Debug>(&mut self, substr: S) -> Result<(), AnyError> { self.wait_for_stderr_ms(DEFAULT_OUTPUT_WAIT_MS, substr) } pub fn wait_for_stderr_unordered + Debug>( &mut self, substrs: &[S], ) -> Result<(), AnyError> { self.wait_for_stderr_unordered_ms(DEFAULT_OUTPUT_WAIT_MS, substrs) } pub fn wait_for_stderr_ms + Debug>( &mut self, timeout: u64, substr: S, ) -> Result<(), AnyError> { let substrs = vec![substr]; self.wait_for_stderr_unordered_ms(timeout, &substrs) } pub fn wait_for_stderr_unordered_ms + Debug>( &mut self, timeout: u64, substrs: &[S], ) -> Result<(), AnyError> { let stderr = Arc::clone(&self.stderr); let mut substrs_set: HashSet = substrs.iter().map(|s| s.as_ref().to_string()).collect(); self.wait_upto_ms_or_kill(timeout, move || { let mut line_buf = String::new(); loop { stderr .lock() .expect("Failed to lock stderr in wait_for_stderr_ms") .read_line(&mut line_buf) .expect("Failed to read line from stderr."); print!("{}", &line_buf); let mut found = None; for substr in substrs_set.iter() { if line_buf.contains(substr.as_str()) { found = Some(substr.clone()); break; } } if let Some(substr) = found { substrs_set.remove(&substr); } if substrs_set.is_empty() { break; } line_buf.clear(); } }) .with_context(|| format!("Failed waiting for stderr substrings {:?}", substrs)) } pub fn wait_upto_ms_or_kill( &mut self, timeout: u64, func: F, ) -> Result where R: Send, F: FnOnce() -> R + Send, { wait_upto_ms(timeout, func).or_else(|e| { signal::kill(Pid::from_raw(self.pid as i32), Signal::SIGKILL) .expect("Failed to kill waketimed process."); self.dump_stdout_and_stderr(); Err(e).context("Waketimed expectation timed out and waketimed was killed.") }) } fn dump_stdout_and_stderr(&mut self) { let mut out = String::new(); self.stdout .lock() .expect("Failed to lock stdout.") .read_to_string(&mut out) .expect("Failed reading stdout to string."); println!("Remaining stdout: {}", &out); self.stderr .lock() .expect("Failed to lock stderr.") .read_to_string(&mut out) .expect("Failed reading stderr to string."); println!("Remaining stderr: {}", &out); } } pub fn wait_upto_ms(timeout: u64, func: F) -> Result where R: Send, F: FnOnce() -> R + Send, { #[allow(clippy::mutex_atomic)] let finished_setter = Arc::new((Mutex::new(false), Condvar::new())); let finished_waiter = finished_setter.clone(); let join_handle = thread::spawn(move || { let res: R = func(); let (s_lock, s_cvar) = &*finished_setter; *s_lock.lock().unwrap() = true; s_cvar.notify_one(); res }); let (w_lock, w_cvar) = &*finished_waiter; let finished_wait = w_lock.lock().unwrap(); let wait_result = w_cvar .wait_timeout(finished_wait, Duration::from_millis(timeout)) .unwrap(); let finished = wait_result.0; if *finished { Ok(join_handle.join().expect("Failed to join timeout thread.")) } else { Err(anyhow!("Timed out.")) } }