//! Integration tests for the entire `bigml-parallel` executable. use anyhow::{Context, Result}; use bigml::{ resource::{script, source, Execution, Resource, StatusCode}, Client, }; use cli_test_dir::*; use futures::FutureExt; use std::{env, future::Future, io::Write}; use tokio::runtime::Runtime; use tracing::{debug_span, Instrument}; use tracing_subscriber::{ fmt::{format::FmtSpan, Subscriber}, prelude::*, EnvFilter, }; /// Create a BigML client using environment varaibles to authenticate. fn new_client() -> Result { let username = env::var("BIGML_USERNAME").context("must specify BIGML_USERNAME")?; let api_key = env::var("BIGML_API_KEY").context("must specify BIGML_API_KEY")?; Ok(Client::new(username, api_key)?) } /// Run the future `f` asynchronously. fn run_async(fut: F) -> Result where F: Future> + Send + 'static, T: Send + 'static, { let runtime = Runtime::new().expect("Unable to create a runtime"); runtime.block_on(fut.boxed()) } #[test] fn help_flag() { let testdir = TestDir::new("bigml-parallel", "help_flag"); let output = testdir.cmd().arg("--help").expect_success(); assert!(output.stdout_str().contains("bigml-parallel")); } #[test] fn version_flag() { let testdir = TestDir::new("bigml-parallel", "version_flag"); let output = testdir.cmd().arg("--version").expect_success(); assert!(output.stdout_str().contains(env!("CARGO_PKG_VERSION"))); } /// Our test WhizzML script to execute. static WHIZZML_SCRIPT: &str = r#" ;; Input: source ;; Input: n ;; Output: dataset ;; Output: n_times_2 (define dataset (create-and-wait-dataset {"source" source "name" "bigml-parallel test"})) (define n_times_2 (* n 2)) "#; #[test] #[ignore] fn parallel_executions() { // Configure tracing. let filter = EnvFilter::from_default_env(); let _ = Subscriber::builder() .with_writer(std::io::stderr) .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE) .with_env_filter(filter) .finish() .try_init(); let _span = debug_span!("parallel_executions").entered(); let testdir = TestDir::new("bigml-parallel", "parallel_executions"); // Set up our test infrastructure on BigML. let (sources, script) = run_async( async { let client = new_client()?; // Build some source objects to test. let raw_sources = &["id,color\n1,green\n", "id,color\n2,blue\n"]; let mut sources = vec![]; for (i, &raw_source) in raw_sources.iter().enumerate() { let mut args = source::Args::data(raw_source); args.disable_datetime = Some(true); args.name = Some(format!("bigml-parallel test {}", i)); let source = client.create_and_wait(&args).await?; sources.push(source.id().to_owned()); } // Upload our WhizzML script object. let mut args = script::Args::new(WHIZZML_SCRIPT); args.inputs .push(script::Input::new("source", script::Type::ResourceId)); args.inputs .push(script::Input::new("n", script::Type::Integer)); let script = client.create_and_wait(&args).await?; Ok((sources, script.id().to_owned())) } .instrument(debug_span!("setup")), ) .unwrap(); // Construct standard input with let mut input = vec![]; for source in &sources { writeln!(&mut input, "{}", source).unwrap(); } // Run `bigml-parallel`. let output = testdir .cmd() .args(&["-n", "bigml-parallel test"]) .args(&["-s", &script.to_string()]) .args(&["-R", "source"]) .args(&["-i", "n=2"]) .args(&["-o", "dataset"]) .args(&["-o", "n_times_2"]) .args(&["--tag", "bigml-parallel:test"]) .output_with_stdin(&input) .tee_output() .expect("error running bigml-parallel"); // Parse our output as JSON execution resources, and make sure they look // reasonable. assert_eq!(output.stdout_str().lines().count(), sources.len()); for line in output.stdout_str().lines() { let execution: Execution = serde_json::from_str(line).expect("error parsing output JSON"); assert_eq!(execution.status.code, StatusCode::Finished); let outputs = &execution.execution.outputs; assert!(outputs.iter().any(|output| output.name == "dataset")); assert!(outputs.iter().any(|output| { output.name == "n_times_2" && output.value == Some(4.into()) })); } }