//! Tests that verify that HTTP requeuests are correctly sent #![allow(clippy::unwrap_used, clippy::expect_used)] use std::{ iter::FromIterator, sync::atomic::{AtomicBool, AtomicU32, Ordering}, time::Duration, }; use requeuest::{ self, client::{Channels, Client}, request::Request, HeaderMap, Url, }; use reqwest::header::{HeaderValue, AUTHORIZATION}; use tokio::sync::Notify; static INSTALL_EYRE: std::sync::Once = std::sync::Once::new(); fn install_eyre() { INSTALL_EYRE.call_once(|| color_eyre::install().expect("Installing eyre failed")); } /// Crate a hyper `Service` with the given future as its `service_fn` #[macro_export] macro_rules! service { ($closure:expr) => { hyper::service::make_service_fn(|_conn| async { Ok::<_, hyper::Error>(hyper::service::service_fn($closure)) }) }; } /// Create a hyper `Server` with the given graceful shutdown closure, returning /// the ip address and server object. #[macro_export] macro_rules! server { ($service:expr, $shutdown:expr) => {{ let addr = std::net::SocketAddr::from(([127, 0, 0, 1], 0)); let server = hyper::Server::bind(&addr).serve($service); // Get the address from the server so we know what port was picked let addr = server.local_addr(); let server = server.with_graceful_shutdown($shutdown); (addr, server) }}; } static SEND_EMPTY_NOTIF: Notify = Notify::const_new(); /// Verifies that a request without a body is correctly sent #[sqlx_database_tester::test(pool(variable = "pool", skip_migrations))] #[ntest::timeout(60_000)] async fn send_empty() -> color_eyre::eyre::Result<()> { install_eyre(); requeuest::migrate(&pool).await?; let client = Client::new(pool, Channels::All).await?; let service = service!(|req| async move { assert_eq!(req.method(), hyper::Method::GET, "Wrong method"); assert_eq!(req.uri().path(), "/path", "Wrong URI path"); assert_eq!(req.uri().query().unwrap(), "query=foo¶m=bar", "Wrong URI query"); assert_eq!(req.headers()[AUTHORIZATION], &"Bearer: secret", "Wrong HTTP header"); SEND_EMPTY_NOTIF.notify_one(); Ok::<_, hyper::Error>(hyper::Response::new(hyper::Body::from("OK"))) }); let (addr, server) = server!(service, async { SEND_EMPTY_NOTIF.notified().await }); let headers = HeaderMap::from_iter([(AUTHORIZATION, HeaderValue::from_static("Bearer: secret"))]); let request = Request::get(format!("http://{}/path?query=foo¶m=bar", addr).as_str())? .headers(headers) .build(); client.spawn("channel", &request).await?; server.await?; Ok(()) } static RETRY_COUNT: AtomicU32 = AtomicU32::new(0); static RETRY_NOTIF: Notify = Notify::const_new(); /// Verifies that a request is correctly retried #[sqlx_database_tester::test(pool(variable = "pool", skip_migrations))] #[ntest::timeout(60_000)] async fn retry() -> color_eyre::eyre::Result<()> { install_eyre(); requeuest::migrate(&pool).await?; let client = Client::new(pool, Channels::All).await?; let service = service!(|_req| async move { let attempt = RETRY_COUNT.fetch_add(1, Ordering::SeqCst); let response = match attempt { 0..=2 => { hyper::Response::builder().status(400).body(hyper::Body::from("Try again")).unwrap() } 3 => { RETRY_NOTIF.notify_one(); hyper::Response::new(hyper::Body::from("OK")) } _ => panic!("Too many retries!"), }; Ok::<_, hyper::Error>(response) }); let (addr, server) = server!(service, async { RETRY_NOTIF.notified().await }); let request = Request::get(format!("http://{}/", addr).as_str())?.build(); client .spawn_cfg("channel", &request, |req| { req.set_retries(3); req.set_retry_backoff(Duration::from_millis(10)); }) .await?; server.await?; Ok(()) } static ORDER_NOTIF: Notify = Notify::const_new(); static ORDER_REQ_NUM: AtomicU32 = AtomicU32::new(1); /// Verifies that returning requests get delivered sequentially #[sqlx_database_tester::test(pool(variable = "pool", skip_migrations))] #[ntest::timeout(30_000)] async fn order() -> color_eyre::eyre::Result<()> { install_eyre(); requeuest::migrate(&pool).await?; let client = Client::new(pool, Channels::All).await?; let service = service!(|req: hyper::Request| async move { let num = ORDER_REQ_NUM.fetch_add(1, Ordering::AcqRel); let body = hyper::body::to_bytes(req.into_body()).await?; let req_num = String::from_utf8_lossy(&body).parse::(); if req_num != Ok(num) { panic!("Wrong order"); } if num == 3 { ORDER_NOTIF.notify_one(); } Ok::<_, hyper::Error>(hyper::Response::new(hyper::Body::from("OK"))) }); let (addr, server) = server!(service, async { ORDER_NOTIF.notified().await }); let url: Url = format!("http://{}/", addr).parse()?; let request1 = Request::post(url.clone(), b"1".to_vec())?.build(); let request2 = Request::post(url.clone(), b"2".to_vec())?.build(); let request3 = Request::post(url.clone(), b"3".to_vec())?.build(); let handle = tokio::spawn(async move { server.await }); let cfg = |job: &mut sqlxmq::JobBuilder| { job.set_ordered(true); }; client.spawn_cfg("order", &request1, cfg).await?; client.spawn_cfg("order", &request2, cfg).await?; client.spawn_cfg("order", &request3, cfg).await?; handle.await??; Ok(()) } static RECEIVED_REQUEST: AtomicBool = AtomicBool::new(false); /// Verifies that clearing requests for a queue works #[sqlx_database_tester::test(pool(variable = "pool", skip_migrations))] #[ntest::timeout(30_000)] async fn clear() -> color_eyre::eyre::Result<()> { install_eyre(); requeuest::migrate(&pool).await?; let client = Client::new(pool, Channels::All).await?; let service = service!(|_| async move { RECEIVED_REQUEST.store(true, Ordering::SeqCst); Ok::<_, hyper::Error>(hyper::Response::new(hyper::Body::from("ERR"))) }); let (addr, server) = server!(service, async { tokio::time::sleep(Duration::from_secs(1)).await }); let request = Request::get(format!("http://{}/", addr).as_str())?.build(); client .spawn_cfg("clear", &request, |req| { req.set_delay(Duration::from_millis(500)); }) .await?; client.clear(Channels::List(&["clear"])).await?; server.await?; assert!(!RECEIVED_REQUEST.load(Ordering::SeqCst), "Failed to clear"); Ok(()) }