#![cfg(not(target_arch = "wasm32"))] use std::convert::Infallible; use std::future::Future; use std::net; use std::sync::mpsc as std_mpsc; use std::thread; use std::time::Duration; use tokio::runtime; use tokio::sync::oneshot; pub struct Server { addr: net::SocketAddr, panic_rx: std_mpsc::Receiver<()>, shutdown_tx: Option>, } impl Server { pub fn addr(&self) -> net::SocketAddr { self.addr } } impl Drop for Server { fn drop(&mut self) { if let Some(tx) = self.shutdown_tx.take() { let _ = tx.send(()); } if !::std::thread::panicking() { self.panic_rx .recv_timeout(Duration::from_secs(3)) .expect("test server should not panic"); } } } pub fn http(func: F) -> Server where F: Fn(http::Request) -> Fut + Clone + Send + 'static, Fut: Future> + Send + 'static, { http_with_config(func, |_builder| {}) } type Builder = hyper_util::server::conn::auto::Builder; pub fn http_with_config(func: F1, apply_config: F2) -> Server where F1: Fn(http::Request) -> Fut + Clone + Send + 'static, Fut: Future> + Send + 'static, F2: FnOnce(&mut Builder) -> Bu + Send + 'static, { // Spawn new runtime in thread to prevent reactor execution context conflict thread::spawn(move || { let rt = runtime::Builder::new_current_thread() .enable_all() .build() .expect("new rt"); let listener = rt.block_on(async move { tokio::net::TcpListener::bind(&std::net::SocketAddr::from(([127, 0, 0, 1], 0))) .await .unwrap() }); let addr = listener.local_addr().unwrap(); let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); let (panic_tx, panic_rx) = std_mpsc::channel(); let tname = format!( "test({})-support-server", thread::current().name().unwrap_or("") ); thread::Builder::new() .name(tname) .spawn(move || { rt.block_on(async move { let mut builder = hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new()); apply_config(&mut builder); loop { tokio::select! { _ = &mut shutdown_rx => { break; } accepted = listener.accept() => { let (io, _) = accepted.expect("accepted"); let func = func.clone(); let svc = hyper::service::service_fn(move |req| { let fut = func(req); async move { Ok::<_, Infallible>(fut.await) } }); let builder = builder.clone(); tokio::spawn(async move { let _ = builder.serve_connection_with_upgrades(hyper_util::rt::TokioIo::new(io), svc).await; }); } } } let _ = panic_tx.send(()); }); }) .expect("thread spawn"); Server { addr, panic_rx, shutdown_tx: Some(shutdown_tx), } }) .join() .unwrap() }