#![cfg(feature = "tokio")] use std::{env, error, net::SocketAddr}; use async_ssh2_lite::{util::ConnectInfo, AsyncSession}; use futures_util::future::join_all; #[cfg(not(feature = "_integration_tests_tokio_ext"))] use futures_util::AsyncReadExt as _; #[cfg(feature = "_integration_tests_tokio_ext")] use tokio::io::AsyncReadExt as _; use super::{ helpers::{get_connect_addr, get_listen_addr, is_internal_test_openssh_server}, session__userauth_pubkey::__run__session__userauth_pubkey_file, }; #[tokio::test(flavor = "multi_thread", worker_threads = 10)] async fn simple_with_tokio() -> Result<(), Box> { let http_server_listen_addr = get_listen_addr(); let http_server_listen_addr_for_server = http_server_listen_addr; let http_server_listen_addr_for_forwarding = SocketAddr::from(( if is_internal_test_openssh_server() { [172, 17, 0, 1] } else { [127, 0, 0, 1] }, http_server_listen_addr.port(), )); let ssh_server_connect_addr = get_connect_addr()?; let remote_port = http_server_listen_addr.port() + 1; // let server_task: tokio::task::JoinHandle>> = tokio::task::spawn(async move { use core::convert::Infallible; use hyper::{ service::{make_service_fn, service_fn}, Body, Request, Response, Server, StatusCode, }; async fn handle(req: Request) -> Result, Infallible> { if req.uri().path() == "/200" { Ok(Response::new(Body::from(""))) } else { let mut res = Response::new(Body::from("")); *res.status_mut() = StatusCode::NOT_FOUND; Ok(res) } } let addr = http_server_listen_addr_for_server; let make_service = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) }); let server = Server::bind(&addr).serve(make_service); match server.await { Ok(_) => {} Err(err) => { eprintln!("server error, err:{err}"); } } Ok(()) }); // let mut session = AsyncSession::::connect(ssh_server_connect_addr, None) .await?; __run__session__userauth_pubkey_file(&mut session).await?; let forwarding_task: tokio::task::JoinHandle>> = tokio::task::spawn(async move { match session .remote_port_forwarding( remote_port, None, None, ConnectInfo::Tcp(http_server_listen_addr_for_forwarding), ) .await { Ok(_) => {} Err(err) => { eprintln!("session.remote_port_forwarding error, err:{err}"); } } Ok(()) }); // tokio::time::sleep(tokio::time::Duration::from_millis( if is_internal_test_openssh_server() { 500 } else { env::var("REMOTE_PORT_FORWARDING_WAIT_SECS") .as_deref() .unwrap_or("4") .parse::() .unwrap_or(4) * 1000 }, )) .await; // let futures = (1..=10) .map(|i| { async move { let mut session = AsyncSession::::connect(ssh_server_connect_addr, None) .await?; __run__session__userauth_pubkey_file(&mut session).await?; let mut channel = session.channel_session().await?; channel .exec( format!( r#"curl http://127.0.0.1:{remote_port}/200 -H "x-foo: bar" -v -w "%{{http_code}}""#, ) .as_ref(), ) .await?; let mut s = String::new(); channel.read_to_string(&mut s).await?; println!("remote_port_forwarding exec curl output:{s} i:{i}"); assert_eq!(s, "200"); channel.close().await?; println!("remote_port_forwarding exec curl exit_status:{} i:{i}", channel.exit_status()?); Result::<_, Box>::Ok(()) } }) .collect::>(); let rets = join_all(futures).await; println!("remote_port_forwarding exec curl rets:{rets:?}"); assert!(rets.iter().all(|x| x.is_ok())); // server_task.abort(); assert!(server_task.await.unwrap_err().is_cancelled()); forwarding_task.abort(); assert!(forwarding_task.await.unwrap_err().is_cancelled()); Ok(()) }