mod test_util; use crate::test_util::{async_test, connected_streams}; use beatrice::internal::{handle_http_conn, Token}; use beatrice::{HttpConn, Request, Response}; use futures_lite::{AsyncReadExt, AsyncWriteExt}; use permit::Permit; use std::future::Future; use std::io::ErrorKind; use std::net::Shutdown; use std::time::{Duration, Instant}; use temp_dir::TempDir; const MILLIS_100: Duration = Duration::from_millis(100); async fn handle_http_conn_task(request_handler: F) -> async_net::TcpStream where Fut: Future + std::marker::Send, F: FnOnce(Request) -> Fut + 'static + Send + Sync + Clone, { let (stream0, stream1) = connected_streams().await; let addr = stream1.local_addr().unwrap(); let temp_dir = TempDir::new().unwrap(); safina_executor::spawn(async move { handle_http_conn( Permit::new(), Token::new(), HttpConn::new(addr, stream0), Some(temp_dir.path().to_path_buf()), 64 * 1024, request_handler, ) .await; }); stream1 } async fn read_response( stream: &mut async_net::TcpStream, timeout: Duration, ) -> Result { let deadline = Instant::now() + timeout; let mut buf = Vec::new(); while Instant::now() < deadline { let mut chunk = [0_u8; 1024]; let result = safina_timer::with_timeout( async { stream.read(&mut chunk).await }, Duration::from_millis(10), ) .await; match result { Err(safina_timer::DeadlineExceeded) => {} Ok(Ok(0)) => break, Ok(Ok(num_read)) => buf.extend(&chunk[..num_read]), Ok(Err(e)) => return Err(e), }; } Ok(String::from_utf8(buf) .map_err(|_| std::io::Error::new(ErrorKind::InvalidData, "response is not UTF-8"))?) } #[test] fn handle_http_conn_ok() { async_test(async { let mut stream = handle_http_conn_task(|_req: Request| async { Response::text(200, "ok") }).await; stream.write_all(b"M / HTTP/1.1\r\n\r\n").await.unwrap(); assert_eq!( "HTTP/1.1 200 OK\r\ncontent-type: text/plain; charset=UTF-8\r\ncontent-length: 2\r\n\r\nok", read_response(&mut stream, MILLIS_100) .await .unwrap() .as_str() ); }); } #[test] fn handle_http_conn_shutdown() { async_test(async { let mut stream = handle_http_conn_task(|_req: Request| async { Response::text(200, "ok") }).await; stream.shutdown(Shutdown::Write).unwrap(); assert_eq!( "", read_response(&mut stream, MILLIS_100) .await .unwrap() .as_str() ); }); } #[test] fn handle_http_conn_upload() { async_test(async { let mut stream = handle_http_conn_task(|req: Request| async move { let mut body_string = String::new(); req.body .async_reader() .await .unwrap() .read_to_string(&mut body_string) .await .unwrap(); Response::text(200, format!("read {:?}", body_string)) }) .await; stream .write_all(b"M / HTTP/1.1\r\ncontent-length:3\r\n\r\nabc") .await .unwrap(); assert_eq!( "HTTP/1.1 200 OK\r\ncontent-type: text/plain; charset=UTF-8\r\ncontent-length: 10\r\n\r\nread \"abc\"", read_response(&mut stream, MILLIS_100) .await .unwrap() .as_str() ); }); } #[test] fn handle_http_conn_upload_large() { async_test(async { let mut stream = handle_http_conn_task(|req: Request| async move { if req.body.is_pending() { return Response::get_body_and_reprocess(10_000_000); } let mut body_string = String::new(); req.body .async_reader() .await .unwrap() .read_to_string(&mut body_string) .await .unwrap(); Response::text(200, format!("got {}", body_string.len())) }) .await; stream .write_all(b"M / HTTP/1.1\r\ncontent-length:10000000\r\n\r\n") .await .unwrap(); for _ in 0..10_000 { stream.write_all(&[b'a'; 1000]).await.unwrap(); } stream.flush().await.unwrap(); assert_eq!( "HTTP/1.1 200 OK\r\ncontent-type: text/plain; charset=UTF-8\r\ncontent-length: 12\r\n\r\ngot 10000000", read_response(&mut stream, Duration::from_secs(3)) .await .unwrap() .as_str() ); }); }