#![type_length_limit = "2097152"] use std::task::Poll; use bollard_next::container::*; use bollard_next::errors::Error; use bollard_next::exec::*; use bollard_next::Docker; use futures_util::future; use futures_util::stream::TryStreamExt; use futures_util::StreamExt; use tokio::runtime::Runtime; #[macro_use] pub mod common; use crate::common::*; async fn start_exec_test(docker: Docker) -> Result<(), Error> { create_daemon(&docker, "integration_test_start_exec_test").await?; let message = &docker .create_exec( "integration_test_start_exec_test", CreateExecOptions { attach_stdout: Some(true), cmd: if cfg!(windows) { Some(vec![ "cmd.exe".into(), "/C".into(), "type".into(), "C:\\Windows\\System32\\Inetsrv\\Config\\ApplicationHost.config".into(), ]) } else { Some(vec!["/bin/cat".into(), "/etc/config/uhttpd".into()]) }, ..Default::default() }, ) .await?; let results = docker .start_exec(&message.id, None::) .await?; assert!(match results { StartExecResults::Attached { output, .. } => { let log: Vec<_> = output.try_collect().await?; assert!(!log.is_empty()); match &log[0] { LogOutput::StdOut { message } => { let (n, expected) = if cfg!(windows) { (0, "\r") } else { (1, "config uhttpd main") }; let s = String::from_utf8_lossy(message); s.split('\n').nth(n).expect("log exists") == expected } _ => false, } } _ => false, }); let _ = &docker .kill_container( "integration_test_start_exec_test", None::>, ) .await?; let _ = &docker .wait_container( "integration_test_start_exec_test", None::>, ) .try_collect::>() .await; let _ = &docker .remove_container( "integration_test_start_exec_test", None::, ) .await?; Ok(()) } async fn inspect_exec_test(docker: Docker) -> Result<(), Error> { create_daemon(&docker, "integration_test_inspect_exec_test").await?; let message = &docker .create_exec( "integration_test_inspect_exec_test", CreateExecOptions { attach_stdout: Some(true), cmd: if cfg!(windows) { Some(vec![ "cmd.exe".into(), "/C".into(), "type".into(), "C:\\Windows\\System32\\Inetsrv\\Config\\ApplicationHost.config".into(), ]) } else { Some(vec!["/bin/cat".into(), "/etc/config/uhttpd".into()]) }, ..Default::default() }, ) .await?; docker .start_exec( &message.id, Some(StartExecOptions { detach: true, ..Default::default() }), ) .await?; let exec_process = &docker.inspect_exec(&message.id).await?; assert_eq!( if cfg!(windows) { "cmd.exe" } else { "/bin/cat" }, exec_process .process_config .as_ref() .unwrap() .entrypoint .as_ref() .unwrap() ); let _ = &docker .kill_container( "integration_test_inspect_exec_test", None::>, ) .await?; let _ = &docker .wait_container( "integration_test_inspect_exec_test", None::>, ) .try_collect::>() .await; let _ = &docker .remove_container( "integration_test_inspect_exec_test", None::, ) .await?; Ok(()) } async fn start_exec_output_capacity_test_short(docker: Docker) -> Result<(), Error> { create_daemon(&docker, "start_exec_output_capacity_test_short").await?; let text1 = "a".repeat(1024); let message = &docker .create_exec( "start_exec_output_capacity_test_short", CreateExecOptions { attach_stdout: Some(true), cmd: if cfg!(windows) { Some(vec![ "cmd.exe".into(), "/C".into(), "echo".into(), text1.clone(), ]) } else { Some(vec!["/bin/echo".into(), text1.clone()]) }, ..Default::default() }, ) .await?; let results = docker .start_exec(&message.id, None::) .await?; if let StartExecResults::Attached { output, .. } = results { let mut i = 0; let stop_fut = future::poll_fn(|_cx| { i += 1; if i < text1.len() { Poll::Pending } else { Poll::Ready(()) } }); let stream = output.take_until(stop_fut); let log: Vec<_> = stream.try_collect::>().await?; assert!(!log.is_empty()); let mut buf = String::new(); for chunk in &log { if let LogOutput::StdOut { message } = chunk { let s = String::from_utf8_lossy(message); buf.push_str(&s); } } assert_eq!(buf.trim(), text1); } let _ = &docker .kill_container( "start_exec_output_capacity_test_short", None::>, ) .await?; let _ = &docker .wait_container( "start_exec_output_capacity_test_short", None::>, ) .try_collect::>() .await; let _ = &docker .remove_container( "start_exec_output_capacity_test_short", None::, ) .await?; Ok(()) } async fn start_exec_output_capacity_test_long(docker: Docker) -> Result<(), Error> { create_daemon(&docker, "start_exec_output_capacity_test_long").await?; let text2 = "a".repeat(7 * 1024); let message = &docker .create_exec( "start_exec_output_capacity_test_long", CreateExecOptions { attach_stdout: Some(true), cmd: if cfg!(windows) { Some(vec![ "cmd.exe".into(), "/C".into(), "echo".into(), text2.clone(), ]) } else { Some(vec!["/bin/echo".into(), text2.clone()]) }, ..Default::default() }, ) .await?; let results = docker .start_exec( &message.id, Some(StartExecOptions { output_capacity: Some(100 * 1024), ..Default::default() }), ) .await?; if let StartExecResults::Attached { output, .. } = results { let mut i = 0; let stop_fut = future::poll_fn(|_cx| { i += 1; if i < text2.len() { Poll::Pending } else { Poll::Ready(()) } }); let stream = output.take_until(stop_fut); let log: Vec<_> = stream.try_collect::>().await?; assert!(!log.is_empty()); let mut buf = String::new(); for chunk in &log { if let LogOutput::StdOut { message } = chunk { let s = String::from_utf8_lossy(message); buf.push_str(&s); } } assert_eq!(buf.trim(), text2); } let _ = &docker .kill_container( "start_exec_output_capacity_test_long", None::>, ) .await?; let _ = &docker .wait_container( "start_exec_output_capacity_test_long", None::>, ) .try_collect::>() .await; let _ = &docker .remove_container( "start_exec_output_capacity_test_long", None::, ) .await?; Ok(()) } #[test] fn integration_test_start_exec() { connect_to_docker_and_run!(start_exec_test); } #[test] fn integration_test_inspect_exec() { connect_to_docker_and_run!(inspect_exec_test); } #[test] fn integration_test_start_exec_output_capacity_short() { connect_to_docker_and_run!(start_exec_output_capacity_test_short); } #[test] fn integration_test_start_exec_output_capacity_long() { connect_to_docker_and_run!(start_exec_output_capacity_test_long); }