//! This is the testsuite for `deploy`. It runs the tests in `test/src/bin/` first directly, and then deployed to a local `fabric`. //! //! At the top of each test is some JSON, denoted with the special comment syntax `//=`. //! `output` is a hashmap of file descriptor to a regex of expected output. As it is a regex ensure that any literal `\.+*?()|[]{}^$#&-~` are escaped. #![feature(backtrace)] #![warn( // missing_copy_implementations, missing_debug_implementations, // missing_docs, trivial_numeric_casts, unused_extern_crates, unused_import_braces, unused_qualifications, unused_results, clippy::pedantic, )] // from https://github.com/rust-unofficial/patterns/blob/master/anti_patterns/deny-warnings.md #![allow( clippy::if_not_else, clippy::type_complexity, clippy::cast_possible_truncation, clippy::derive_hash_xor_eq, clippy::filter_map )] mod ext; use multiset::HashMultiSet; use palaver::file::FdIter; use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, env, ffi::OsStr, fmt, fmt::Debug, fs::{self, File}, hash, io::{self, BufRead, BufReader}, iter, mem, net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}, path::{Path, PathBuf}, process, str, sync::mpsc, thread, thread::JoinHandle, time::{self, Duration} }; use systemstat::{saturating_sub_bytes, Platform, System}; use constellation_internal::{abort_on_unwind, Cpu, ExitStatus, FabricOutputEvent, Fd, Mem}; use ext::serialize_as_regex_string::SerializeAsRegexString; const DEPLOY: &str = "src/bin/deploy.rs"; const FABRIC: &str = "src/bin/constellation/main.rs"; const TESTS: &str = "tests/"; const SELF: &str = "tests/tester/main.rs"; const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::LOCALHOST); const FORWARD_STDERR: bool = true; #[derive(PartialEq, Eq, Serialize, Debug)] struct Output { output: HashMap, #[serde(with = "ext::serde_multiset")] children: HashMultiSet, exit: ExitStatus, } impl hash::Hash for Output { fn hash(&self, state: &mut H) where H: hash::Hasher, { let mut output = self.output.iter().collect::>(); output.sort_by(|&(ref a_fd, _), &(ref b_fd, _)| a_fd.cmp(b_fd)); for output in output { output.hash(state); } // self.children.hash(state); // TODO self.exit.hash(state); } } #[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Debug)] struct OutputTest { output: HashMap, #[serde(with = "ext::serde_multiset")] children: HashMultiSet, exit: ExitStatus, } impl hash::Hash for OutputTest { fn hash(&self, state: &mut H) where H: hash::Hasher, { let mut output = self.output.iter().collect::>(); output.sort_by(|&(ref a_fd, _), &(ref b_fd, _)| a_fd.cmp(b_fd)); for output in output { output.hash(state); } // self.children.hash(state); // TODO self.exit.hash(state); } } impl OutputTest { fn is_match(&self, output: &Output) -> bool { if self.exit != output.exit || self.output.len() != output.output.len() || self.children.len() != output.children.len() { return false; } for (_, test, output) in ext::hashmap::intersection(&self.output, &output.output) { if test.1 != output.1 || !test.0.is_match(&(output.0).0) { return false; } } let mut check = (0..self.children.len()) .map(|_| (false, false)) .collect::>(); for (i, test) in self.children.iter().enumerate() { for (j, output) in output.children.iter().enumerate() { if !(check[i].0 && check[j].1) && test.is_match(output) { check[i].0 = true; check[j].1 = true; } } } if !check.into_iter().all(|(a, b)| a && b) { return false; } true } } fn parse_output(output: &process::Output) -> Result> { if !FORWARD_STDERR { println!("stderr: {:?}", str::from_utf8(&output.stderr).unwrap()); } else if !output.stderr.is_empty() { return Err(None); } let mut log = HashMap::new(); let mut top = None; for message in serde_json::Deserializer::from_slice(&output.stdout) .into_iter::() { match message? { constellation_internal::DeployOutputEvent::Output(a, b, c) => { if top.is_none() { top = Some(a); let _ = log.insert(a, (HashMap::new(), Vec::new(), None)); } let output = log .get_mut(&a) .unwrap() .0 .entry(b) .or_insert((Vec::new(), false)); assert!(!output.1); if !c.is_empty() { output.0.extend(c); } else { output.1 = true; } } constellation_internal::DeployOutputEvent::Spawn(a, b) => { if top.is_none() { top = Some(a); let _ = log.insert(a, (HashMap::new(), Vec::new(), None)); } log.get_mut(&a).unwrap().1.push(b); let x = log.insert(b, (HashMap::new(), Vec::new(), None)); assert!(x.is_none()); } constellation_internal::DeployOutputEvent::Exit(a, b) => { if top.is_none() { top = Some(a); let _ = log.insert(a, (HashMap::new(), Vec::new(), None)); } log.get_mut(&a).unwrap().2 = Some(b); } } } let top = top.unwrap(); Ok(treeize(log.remove(&top).unwrap(), &mut log)) } fn remove_stderr(mut output_test: OutputTest) -> OutputTest { let _stderr = output_test.output.remove(&2).unwrap(); let mut children = output_test.children; let mut new_children = HashMultiSet::new(); while !children.is_empty() { let key = children.distinct_elements().next().unwrap(); let count = children.count_of(key); assert_ne!(count, 0); let key = key.clone(); children.remove_all(&key); new_children.insert_times(remove_stderr(key), count); } output_test.children = new_children; output_test } fn treeize( output: ( HashMap, bool)>, Vec, Option, ), nodes: &mut HashMap< constellation_internal::Pid, ( HashMap, bool)>, Vec, Option, ), >, ) -> Output { Output { output: output .0 .into_iter() .map(|(k, (v, v1))| (k, (SerializeAsRegexString(v), v1))) .collect(), children: output .1 .into_iter() .map(|pid| treeize(nodes.remove(&pid).unwrap(), nodes)) .collect(), exit: output.2.unwrap(), } } #[allow(clippy::too_many_lines)] fn main() { let start = time::Instant::now(); std::env::set_var("RUST_BACKTRACE", "full"); std::panic::set_hook(Box::new(|info| { eprintln!( "thread '{}' {}", thread::current().name().unwrap_or(""), info ); eprintln!("{:?}", std::backtrace::Backtrace::force_capture()); std::process::abort(); })); // https://github.com/rust-lang/cargo/issues/6344 #[cfg(unix)] for fd in FdIter::new().unwrap() { if fd > 2 { nix::unistd::close(fd).unwrap(); } } let _ = thread::Builder::new() .spawn(abort_on_unwind(move || loop { thread::sleep(time::Duration::new(10, 0)); println!("{:?}", start.elapsed()); })) .unwrap(); let current_dir = env::current_dir().unwrap(); let mut products = HashMap::new(); let iterations: usize = env::var("CONSTELLATION_TEST_ITERATIONS") .map(|x| x.parse().unwrap()) .unwrap_or(1); let test = env::var("CONSTELLATION_TEST").ok(); let tests = fs::read_dir(TESTS).unwrap().filter_map(|path| { let path = path.ok()?.path(); let is_test = path.extension()? == "rs" && test.as_ref().map_or(true, |test| { Some(&**test) == path.file_stem().unwrap().to_str() }); if is_test { Some(String::from(path.file_stem()?.to_str()?)) } else { None } }); let target_dir = Path::new(env!("OUT_DIR")) .ancestors() .nth(4) .and_then(Path::file_name); debug_assert!( target_dir.unwrap() == "target" || target_dir.unwrap() == env!("TARGET"), "{:?}", target_dir ); // This is to avoid building twice, once with unset and once with set --target let target = if target_dir != Some(OsStr::new("target")) { Some(format!("--target={}", env!("TARGET"))) } else { None }; let tests = tests.map(|test| format!("--test={}", test)); let profile = match env!("PROFILE") { "debug" => None, "release" => Some(String::from("--release")), _ => unreachable!(), }; let features = format!("--features={}", env!("FEATURES")); let args = iter::once(String::from("build")) .chain(tests) .chain(iter::once(String::from("--message-format=json"))) .chain(target) .chain(iter::once(String::from("--no-default-features"))) .chain(iter::once(features)) .chain(profile) .collect::>(); println!( "Building with: cargo {}", args.iter() .map(|arg| if arg.contains(' ') { format!("\"{}\"", arg) } else { arg.to_owned() }) .collect::>() .join(" ") ); let output = process::Command::new("cargo") .args(&args) .stderr(process::Stdio::inherit()) .output() .expect("Failed to invoke cargo"); if !output.status.success() { panic!("cargo build failed"); } for message in serde_json::Deserializer::from_slice(&output.stdout).into_iter() { if let cargo_metadata::Message::CompilerArtifact(artifact) = message.unwrap_or_else(|_| { panic!( "Failed to parse output of cargo {}", itertools::join(args.iter(), " ") ) }) { if let Ok(path) = PathBuf::from(&artifact.target.src_path).strip_prefix(¤t_dir) { if (artifact.target.kind == vec![String::from("bin")] && !artifact.profile.test) || artifact.target.kind == vec![String::from("test")] { let x = products.insert( path.to_owned(), artifact.filenames.into_iter().next().unwrap(), ); // We're assuming the first filename is the binary – .dSYM etc seem to always be second? assert!(x.is_none()); } } } } let (deploy, fabric) = (&products[Path::new(DEPLOY)], &products[Path::new(FABRIC)]); let mut products = products .iter() .filter(|&(src, _bin)| src.starts_with(Path::new(TESTS)) && src != Path::new(SELF)) .map(|(src, bin)| { let file = BufReader::new(File::open(src).unwrap()) .lines() .map(Result::unwrap) .take_while(|x| x.get(0..3) == Some("//=")) .flat_map(|x| ext::string::Chars::new(x).skip(3)) .collect::(); let mut deserializer = serde_json::Deserializer::from_str(&file); deserializer.disable_recursion_limit(); let mut file: Result = Deserialize::deserialize(&mut deserializer); if !FORWARD_STDERR { file = file.map(remove_stderr); } (src, bin, file) }) .collect::>(); products.sort_by(|&(ref a_src, _, _), &(ref b_src, _, _)| a_src.cmp(b_src)); let node1_fabric_port = 12340; let node1_bridge_port = 12341; let node2_fabric_port = 12350; let node3_fabric_port = 12360; let (mut succeeded, mut failed) = (0, 0); for environment in &mut [ &mut Native as &mut dyn Environment, &mut Cluster::new( deploy.clone(), fabric.clone(), vec![ Node { fabric: Socket::localhost(node1_fabric_port), bridge: Some(Socket::localhost(node1_bridge_port)), mem: 2 * Mem::GIB, cpu: 2 * Cpu::CORE, }, Node { fabric: Socket::localhost(node2_fabric_port), bridge: None, mem: Mem::GIB, cpu: Cpu::CORE, }, Node { fabric: Socket::localhost(node3_fabric_port), bridge: None, mem: Mem::GIB / 2, cpu: Cpu::CORE / 2, }, ], ), ] { println!("Running in environment: {:#?}", environment); environment.start(); for (ref src, ref bin, ref file) in &products { println!("{}", src.display()); for i in 0..iterations { println!(" {}", i); let result = environment.run(bin, file.as_ref().ok()); let output = parse_output(&result); if output.is_err() || file.is_err() || !file.as_ref().unwrap().is_match(output.as_ref().unwrap()) { println!("Error in {:?}", src); match file { Ok(ref file) => println!("Documented:\n{}", json_to_string(file).unwrap()), Err(ref e) => println!("Documented:\nInvalid result JSON: {:?}\n", e), } match output { Ok(ref output) => println!("Actual:\n{}", json_to_string(output).unwrap()), Err(ref e) => println!("Actual:\nFailed to parse: {:?}\n{:?}", result, e), } failed += 1; } else { succeeded += 1; } } print!("{:?}", SystemLoad::measure()); } environment.stop(); } println!( "{}/{} succeeded in {:?}", succeeded, succeeded + failed, start.elapsed() ); if failed > 0 || succeeded == 0 { process::exit(1); } } trait Environment: Debug { fn start(&mut self) {} fn run(&mut self, bin: &Path, output: Option<&OutputTest>) -> process::Output; fn stop(&mut self) {} } #[derive(Debug)] struct Native; impl Environment for Native { fn run(&mut self, bin: &Path, _output: Option<&OutputTest>) -> process::Output { process::Command::new(bin) .env_remove("CONSTELLATION_VERSION") .env("CONSTELLATION_FORMAT", "json") .output() .unwrap() } } #[derive(Debug)] #[allow(clippy::large_enum_variant)] enum Cluster { Init(ClusterConfig), Running { config: ClusterConfig, cluster: Vec, stdout: mpsc::Receiver<(usize, Option>)>, }, Invalid, } #[derive(Debug)] struct ClusterConfig { deploy: PathBuf, fabric: PathBuf, nodes: Vec, } #[derive(Debug)] struct Node { fabric: Socket, bridge: Option, mem: Mem, cpu: Cpu, } #[derive(Clone, Copy, Debug)] struct Socket { bind: SocketAddr, external: SocketAddr, } impl Socket { fn localhost(port: u16) -> Self { let bind = SocketAddr::new(LOCALHOST, port); Self { bind, external: bind, } } } struct ClusterNode { process: process::Child, stdout: JoinHandle<()>, stderr: JoinHandle, } impl Debug for ClusterNode { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ClusterNode").finish() } } impl Cluster { fn new(deploy: PathBuf, fabric: PathBuf, nodes: Vec) -> Self { Cluster::Init(ClusterConfig { deploy, fabric, nodes, }) } } impl Environment for Cluster { fn start(&mut self) { let config = if let Self::Init(config) = mem::replace(self, Cluster::Invalid) { config } else { panic!() }; let (sender, receiver) = mpsc::sync_channel(0); let mut cluster = config .nodes .iter() .enumerate() .rev() .map(|(i, node)| { if TcpStream::connect(node.fabric.external).is_ok() { panic!( "Service already running on FABRIC_ADDR {}", node.fabric.external ); } if let Some(bridge) = node.bridge { if TcpStream::connect(bridge.external).is_ok() { panic!("Service already running on BRIDGE_ADDR {}", bridge.external); } } let mut args = vec![ String::from("--format"), String::from("json"), String::from("-v"), node.fabric.bind.to_string(), ]; if i == 0 { for node in &config.nodes { args.extend(vec![ node.fabric.external.to_string(), node.bridge.map_or_else( || String::from("-"), |bridge| bridge.bind.to_string(), ), node.mem.to_string(), node.cpu.to_string(), ]); } } let mut process = process::Command::new(&config.fabric) .args(args) .stdin(process::Stdio::null()) .stdout(process::Stdio::piped()) .stderr(process::Stdio::piped()) .spawn() .unwrap(); let stdout = process.stdout.take().unwrap(); let sender = sender.clone(); let stdout = thread::spawn(abort_on_unwind(move || { for msg in serde_json::StreamDeserializer::new(serde_json::de::IoRead::new(stdout)) { sender.send((i, Some(msg))).unwrap(); } sender.send((i, None)).unwrap(); })); let stderr = process.stderr.take().unwrap(); let stderr = capture_stdout(stderr, true, move |none, output| { *none = false; println!("fab stderr: {:?}", str::from_utf8(output).unwrap()); }); let start_ = time::Instant::now(); while TcpStream::connect(node.fabric.external).is_err() { // TODO: parse output rather than this loop and timeout if start_.elapsed() > time::Duration::new(5, 0) { panic!("Fabric not up within 5s"); } thread::sleep(std::time::Duration::new(0, 1_000_000)); } if let Some(bridge) = node.bridge { let _bridge_pid: FabricOutputEvent = receiver.recv().unwrap().1.unwrap().unwrap(); while TcpStream::connect(bridge.external).is_err() { // TODO: parse output rather than this loop and timeout if start_.elapsed() > time::Duration::new(15, 0) { panic!("Bridge not up within 15s"); } thread::sleep(std::time::Duration::new(0, 1_000_000)); } } ClusterNode { process, stdout, stderr, } }) .collect::>(); cluster.reverse(); *self = Cluster::Running { config, cluster, stdout: receiver, } } fn run(&mut self, bin: &Path, output: Option<&OutputTest>) -> process::Output { let (config, stdout) = if let Self::Running { config, stdout, .. } = self { (config, stdout) } else { panic!() }; crossbeam::thread::scope(|scope| { if let Some(output) = output { fn count_processes(output_test: &OutputTest) -> usize { 1 + output_test .children .iter() .map(count_processes) .sum::() } let processes = count_processes(output); let _ = scope.spawn(move |_| { let mut pids = HashMap::new(); for _ in 0..processes * 2 { let (_node, msg) = stdout.recv().unwrap(); match msg.unwrap().unwrap() { FabricOutputEvent::Init { pid, system_pid } => { let x = pids.insert(pid, system_pid); assert!(x.is_none()); } FabricOutputEvent::Exit { pid, system_pid } => { let x = pids.remove(&pid); assert_eq!(x, Some(system_pid)); } } } assert!(pids.is_empty(), "{:?}", pids); if let Ok(err) = stdout.try_recv() { panic!("{:?}", err); } }); } process::Command::new(&config.deploy) .env_remove("CONSTELLATION_VERSION") .env_remove("CONSTELLATION_FORMAT") .args(&[ "--format=json", &config.nodes[0].bridge.unwrap().external.to_string(), bin.to_str().unwrap(), ]) .output() .unwrap() }) .unwrap() } fn stop(&mut self) { let (config, cluster, stdout) = if let Self::Running { config, cluster, stdout, } = mem::replace(self, Self::Invalid) { (config, cluster, stdout) } else { panic!() }; for (i, mut node) in cluster.into_iter().enumerate() { println!("killing"); node.process.kill().unwrap(); println!("waiting"); let _ = node.process.wait().unwrap(); println!("waiting stderr"); let stderr_empty = node.stderr.join().unwrap(); assert!(stderr_empty); println!("waiting stdout"); let x = stdout.recv().unwrap(); assert_eq!(x.0, i); assert!(x.1.is_none()); println!("waiting stdout thread"); node.stdout.join().unwrap(); } *self = Self::Init(config); } } fn capture_stdout< R: io::Read + Send + 'static, F: FnMut(&mut S, &[u8]) + Send + 'static, S: Send + 'static, >( mut read: R, mut state: S, mut f: F, ) -> JoinHandle { thread::spawn(abort_on_unwind(move || { let mut buf = vec![0; 16 * 1024]; loop { let n = read.read(&mut buf).unwrap(); if n == 0 { break state; } f(&mut state, &buf[..n]); } })) } fn json_to_string(value: &T) -> Result where T: Serialize, { let dense = serde_json::to_string(value)?; if dense.len() < 1000 { if let Ok(pretty) = serde_json::to_string_pretty(value) { return Ok(pretty); } } Ok(dense) } struct SystemLoad { memory: Result, load_average: Result, load_aggregate: Result, processes: usize, threads: usize, file_stats: Result, } impl SystemLoad { fn measure() -> Self { let sys = System::new(); Self { memory: sys.memory(), load_average: sys.load_average(), load_aggregate: sys.cpu_load_aggregate().and_then(|cpu| { thread::sleep(Duration::from_millis(500)); // TODO: make sleep opt-in cpu.done() }), processes: palaver::process::count(), threads: palaver::process::count_threads(), file_stats: sys.file_stats(), } } } impl Debug for SystemLoad { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match &self.memory { Ok(mem) => writeln!( f, "Memory: {} used / {} ({} bytes) total ({:?})", saturating_sub_bytes(mem.total, mem.free), mem.total, mem.total.as_u64(), mem.platform_memory )?, Err(x) => writeln!(f, "Memory: error: {}", x)?, } match &self.load_average { Ok(loadavg) => writeln!( f, "Load average: {} {} {}", loadavg.one, loadavg.five, loadavg.fifteen )?, Err(x) => writeln!(f, "Load average: error: {}", x)?, } match &self.load_aggregate { Ok(cpu) => writeln!( f, "CPU load: {}% user, {}% nice, {}% system, {}% intr, {}% idle", cpu.user * 100.0, cpu.nice * 100.0, cpu.system * 100.0, cpu.interrupt * 100.0, cpu.idle * 100.0 )?, Err(x) => writeln!(f, "CPU load: error: {}", x)?, } writeln!( f, "Processes: {}, threads: {}", self.processes, self.threads )?; if let Ok(file_stats) = &self.file_stats { writeln!( f, "File descriptors: {} open, {} max", file_stats.open, file_stats.max )?; } Ok(()) } }