/* * Licensed to Elasticsearch B.V. under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch B.V. licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* * SPDX-License-Identifier: Apache-2.0 * * The OpenSearch Contributors require contributions made to * this file be licensed under the Apache-2.0 license or a * compatible open source license. * * Modifications Copyright OpenSearch Contributors. See * GitHub history for details. */ // From reqwest crate // Licensed under Apache License, Version 2.0 // https://github.com/seanmonstar/reqwest/blob/master/LICENSE-APACHE use std::{ convert::Infallible, future::Future, net, sync::mpsc as std_mpsc, thread, time::Duration, }; use tokio::sync::oneshot; pub use http::Response; use tokio::runtime; pub struct Server { addr: net::SocketAddr, panic_rx: std_mpsc::Receiver<()>, shutdown_tx: Option>, } impl Server { pub fn addr(&self) -> net::SocketAddr { self.addr } } impl Drop for Server { fn drop(&mut self) { if let Some(tx) = self.shutdown_tx.take() { let _ = tx.send(()); } if !::std::thread::panicking() { self.panic_rx .recv_timeout(Duration::from_secs(3)) .expect("test server should not panic"); } } } pub fn http(func: F) -> Server where F: Fn(http::Request) -> Fut + Clone + Send + 'static, Fut: Future> + Send + 'static, { //Spawn new runtime in thread to prevent reactor execution context conflict thread::spawn(move || { let rt = runtime::Builder::new_current_thread() .enable_all() .build() .expect("new rt"); let srv = rt.block_on(async move { hyper::Server::bind(&([127, 0, 0, 1], 0).into()).serve(hyper::service::make_service_fn( move |_| { let func = func.clone(); async move { Ok::<_, Infallible>(hyper::service::service_fn(move |req| { let fut = func(req); async move { Ok::<_, Infallible>(fut.await) } })) } }, )) }); let addr = srv.local_addr(); let (shutdown_tx, shutdown_rx) = oneshot::channel(); let srv = srv.with_graceful_shutdown(async move { let _ = shutdown_rx.await; }); let (panic_tx, panic_rx) = std_mpsc::channel(); let tname = format!( "test({})-support-server", thread::current().name().unwrap_or("") ); thread::Builder::new() .name(tname) .spawn(move || { rt.block_on(srv).unwrap(); let _ = panic_tx.send(()); }) .expect("thread spawn"); Server { addr, panic_rx, shutdown_tx: Some(shutdown_tx), } }) .join() .unwrap() }