70 r*B70 0 0 ٷmRmv,%EE_o1Z|}^Zܪ{5I#x bhL*  J$`0qEA @0bJ1BDy41)); assert!(ContentLength(0) < 123); assert!(0 < ContentLength(123)); } } use std::sync::Arc; use actix_utils:/// The "Content-Length" header field indicates the associated representation's data length as a /// decimal non-negative integer number of octets. /// /// # ABNF /// ```plain /// Content-Length = 1*DIGIT /// ``` /// /// [RFC 9110 §8.6]: https://www.rfc-editor.org/rfc/rfc9110#name-content-length #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct ContentLength(usize); impl ContentLength { /// Returns Content-Length value. pub fn into_inner(&self) -> usize { self.0 } } impl str::FromStr for ContentLength { type Err = ::Err; #[inline] fn from_str(val: &str) -> Result { let val = val.trim(); // decoder prevents this case debug_assert!(!val.starts_with('+')); val.parse().map(Self) } } impl TryIntoHeaderValue for ContentLength { type Error = Infallible; fn try_into_value(self) -> Result { Ok(HeaderValue::from(self.0)) } } impl Header for C/ /// Also see the [Forwarded header's MDN docs][mdn] for field semantics. /// /// [RFC 7239]: https://datatracker.ietf.org/doc/html/rfc7239 /// [mdn]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Forwarded #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] #[cfg_attr(test, derive(Default))] pub struct Forwarded { /// The interface where the request came in to the proxy server. The identifier can be: /// /// - an obfuscated identifier (such as "hidden" or "secret"). This should be treated as the /// default. /// - an IP address (v4 or v6, optionally with a port. IPv6 address are quoted and enclosed in /// square brackets) /// - "unknown" when the preceding entity is not known (and you still want to indicate that /// forwarding of the request was made) by: Option, /// The client that initiated the request and subsequent proxies in a chain of proxies. The /// identifier has the same possible values as the by directive. r#for: Vec { let inner = ready!(inner_fut.poll(cx))?; Poll::Ready(Ok(BodyHash { inner, hash: mem::take(hash), })) } BodyHashFutProj::Inner { inner_fut, hasher, mut forked_payload, } => { // poll original extractor match inner_fut.poll(cx)? { Poll::Ready(inner) => { trace!("inner extractor complete"); let next = BodyHashFut::InnerDone { inner: Some(inner), hasher: mem::replace(hasher, D::new()), forked_payload: mem::replace(forked_payload, dev::Payload::None), }; self.set(next); // re-enter poll in done state Create a relative or absolute redirect. /// /// _This feature has [graduated to Actix Web][graduated]. Further development will occur there._ /// /// See [`Redirect`] docs for usage details. /// /// # Examples /// ``` /// use actix_web::App; /// use actix_web_lab::web as web_lab; /// /// let app = App::new() /// .service(web_lab::redirect("/one", "/two")); /// ``` /// /// [graduated]: https://docs.rs/actix-web/4/actix_web/web/struct.Redirect.html #[allow(deprecated)] #[deprecated(since = "0.19.0", note = "Type has graduated to Actix Web.")] pub fn redirect(from: impl Into>, to: impl Into>) -> Redirect { Redirect::new(from, to) } /// Constructs a new Single-page Application (SPA) builder. /// /// See [`Spa`] docs for more details. /// /// # Examples /// ``` /// # use actix_web::App; /// # use actix_web_lab::web::spa; /// let app = App::new() /// // ...api routes... /// .service( /// spa() /// .index_file("./examples/assets/spa.html") /// fut: Cell::new(Some(Box::pin(init()))), }), } } /// Returns reference to result of lazy `T` value, initializing if necessary. pub async fn get(&self) -> &T { self.inner .cell .get_or_init(|| async move { match self.inner.fut.take() { Some(fut) => fut.await, None => panic!("LazyData instance has previously been poisoned"), } }) .await } } impl FromRequest for LazyData { type Error = Error; type Future = Ready>; #[inline] fn from_request(req: &HttpRequest, _: &mut dev::Payload) -> Self::Future { if let Some(lazy) = req.app_data::>() { ready(Ok(lazy.clone())) } else { debug!( "Failed to extract `LazyData<{}>` for `{}` handler. For the Data extractor to work \ correctly, wrap the data with `LazyData::nreq.headers().get("Last-Event-ID")); common_countdown(n.try_into().unwrap()) } fn common_countdown(n: i32) -> impl Responder { let countdown_stream = stream::unfold((false, n), |(started, n)| async move { // allow first countdown value to yield immediately if started { sleep(Duration::from_secs(1)).await; } if n > 0 { let data = sse::Data::new(n.to_string()) .event("countdown") .id(n.to_string()); Some((Ok::<_, Infallible>(sse::Event::Data(data)), (true, n - 1))) } else { None } }); sse::Sse::from_stream(countdown_stream).with_retry_duration(Duration::from_secs(5)) } #[get("/time")] async fn timestamp() -> impl Responder { let (sender, sse) = sse::channel(2); actix_web::rt::spawn(async move { loop { let time = time::OffsetDateTime::now_utc(); let msg = sse::Data::new(time.format(&Rfc3339).unwrap()).event("timestamp"); der(( header::CONTENT_LENGTH, header::HeaderValue::from_static("9"), )) .set_payload(Bytes::from_static(b"name=test")) .to_http_parts(); let s = UrlEncodedForm::::from_request(&req, &mut pl).await; let err = format!("{}", s.unwrap_err()); assert!( err.contains("payload is larger") && err.contains("than allowed"), "unexpected error string: {err:?}" ); } #[actix_web::test] async fn test_form_body() { let (req, mut pl) = TestRequest::default().to_http_parts(); let form = UrlEncodedFormBody::::new(&req, &mut pl) .await; assert!(err_eq(form.unwrap_err(), UrlencodedError::ContentType)); let (req, mut pl) = TestRequest::default() .insert_header(( header::CONTENT_TYPE, header::HeaderValue::from_static("application/text"), ix_web::test] async fn compat_compat() { let _ = App::new().wrap(Compat::new(from_fn(noop))); let _ = App::new().wrap(Compat::new(from_fn(mutate_body_type))); } #[actix_web::test] async fn feels_good() { let app = test::init_service( App::new() .wrap(from_fn(mutate_body_type)) .wrap(from_fn(add_res_header)) .wrap(Logger::default()) .wrap(from_fn(noop)) .default_service(web::to(HttpResponse::NotFound)), ) .await; let req = test::TestRequest::default().to_request(); let res = test::call_service(&app, req).await; assert!(res.headers().contains_key(header::WARNING)); } #[actix_web::test] async fn closure_capture_and_return_from_fn() { let app = test::init_service( App::new() .wrap(Logger::default()) .wrap(MyMw(true).into_middleware()) .wrap(Logger::default()), fut: Box::pin(T::from_request(req, payload)), counter_pl: counter, size: 0, }, } } } pub struct BodyLimitFut where T: FromRequest + 'static, T::Error: fmt::Debug + fmt::Display, { inner: Inner, } impl BodyLimitFut where T: FromRequest + 'static, T::Error: fmt::Debug + fmt::Display, { fn new_error(err: BodyLimitError) -> Self { Self { inner: Inner::Error { err: Some(err) }, } } } enum Inner where T: FromRequest + 'static, T::Error: fmt::Debug + fmt::Display, { Error { err: Option>, }, Body { /// Wrapped extractor future. fut: Pin>, /// Forked request payload. counter_pl: dev::Payload, /// Running payload size count. size: usize, }, } impl Unpin for Inner< use crate::util::{InfallibleStream, MutWriter}; pin_project! { /// A buffered CSV serializing body stream. /// /// This has significant memory efficiency advantages over returning an array of CSV rows when /// the data set is very large because it avoids buffering the entire response. /// /// # Examples /// ``` /// # use actix_web::Responder; /// # use actix_web_lab::respond::Csv; /// # use futures_core::Stream; /// fn streaming_data_source() -> impl Stream { /// // get item stream from source /// # futures_util::stream::empty() /// } /// /// async fn handler() -> impl Responder { /// let data_stream = streaming_data_source(); /// /// Csv::new_infallible(data_stream) /// .into_responder() /// } /// ``` pub struct Csv { // The wrapped item stream. #[pin] stream: S, } } impl Csv { /// Constructs a new `Csv` from a stream of rows. string. Example: `/users?n=100`. //! //! Also includes a low-efficiency route to demonstrate the difference. use std::io::{self, Write as _}; use actix_web::{ get, web::{self, BufMut as _, BytesMut}, App, HttpResponse, HttpServer, Responder, }; use actix_web_lab::respond::NdJson; use futures_core::Stream; use futures_util::{stream, StreamExt as _}; use rand::{distributions::Alphanumeric, Rng as _}; use serde::Deserialize; use serde_json::json; use tracing::info; fn streaming_data_source(n: u32) -> impl Stream> { stream::repeat_with(|| { Ok(json!({ "email": random_email(), "address": random_address(), })) }) .take(n as usize) } #[derive(Debug, Deserialize)] struct Opts { n: Option, } /// This handler streams data as NDJSON to the client in a fast and memory efficient way. /// /// A real data source might be a downstream server, database query, or other external resource. #[get("/users")] as body, hex!( "cf83e135 7eefb8bd f1542850 d66d8007 d620e405 0b5715dc 83f4a921 d36ce9ce 47d0d13c 5d85f2b0 ff8318d2 877eec2f 63b931bd 47417a81 a538327a f927da3e" ) .as_ref() ); let (req, _) = test::TestRequest::default() .to_request() .replace_payload(dev::Payload::Stream { payload: Box::pin( stream::iter([b"a", b"b", b"c"].map(|b| Bytes::from_static(b))).map(Ok), ) as BoxedPayloadStream, }); let body = test::call_and_read_body(&app, req).await; assert_eq!( body, hex!("ba7816bf 8f01cfea 414140de 5dae2223 b00361a3 96177a9c b410ff61 f20015ad").as_ref() ); } #[actix_web::test] async fn type_alias_equivalence() { let app = test::init_service( App::new() .route( "/alias", web::get().to(|body: BodySha256| async move { Bytes::copy_from_slice(body.hash()unt = self.static_resources_mount.into_owned(); let files = { let index_file = index_file.clone(); Files::new(&static_resources_mount, static_resources_location) // HACK: FilesService will try to read a directory listing unless index_file is provided // FilesService will fail to load the index_file and will then call our default_handler .index_file("extremely-unlikely-to-exist-!@$%^&*.txt") .default_handler(move |req| serve_index(req, index_file.clone())) }; SpaService { index_file, files } } } #[derive(Debug)] struct SpaService { index_file: String, files: Files, } impl HttpServiceFactory for SpaService { fn register(self, config: &mut actix_web::dev::AppService) { // let Files register its mount path as-is self.files.register(config); // also define a root prefix handler directed towards our SPA index let rdef = ResourceDef::root_prefix(""); t!("{s}, {s:?}"), "test, Query(Id { id: \"test\" })"); s.id = "test1".to_string(); let s = s.into_inner(); assert_eq!(s.id, "test1"); } #[actix_web::test] #[should_panic] async fn test_tuple_panic() { let req = TestRequest::with_uri("/?one=1&two=2").to_srv_request(); let (req, mut pl) = req.into_parts(); Query::<(u32, u32)>::from_request(&req, &mut pl) .await .unwrap(); } } //! Hashing utilities for Actix Web. //! //! # Crate Features //! All features are enabled by default. //! - `blake2`: Blake2 types //! - `blake3`: Blake3 types //! - `md5`: MD5 types 🚩 //! - `md4`: MD4 types 🚩 //! - `sha1`: SHA-1 types 🚩 //! - `sha2`: SHA-2 types //! - `sha3`: SHA-3 types //! //! # Security Warning 🚩 //! The `md4`, `md5`, and `sha1` types are included for completeness and interoperability but they //! are considered cryptographically broken by modern standards. For security critical use cases, //! you should move to Some(code) if path_altered => { let mut res = HttpResponse::with_body(code, ()); res.headers_mut().insert( header::LOCATION, req.head_mut().uri.to_string().parse().unwrap(), ); NormalizePathFuture::redirect(req.into_response(res)) } _ => NormalizePathFuture::service(self.service.call(req)), } } } pin_project! { pub struct NormalizePathFuture, B> { #[pin] inner: Inner, } } impl, B> NormalizePathFuture { fn service(fut: S::Future) -> Self { Self { inner: Inner::Service { fut, _body: PhantomData, }, } } fn redirect(res: ServiceResponse<()>) -> Self { Self { inner: Inner::Redirect { res: Some(res) }, } } } pin_project! { #[project = InnerProj] enum Inner App< impl ServiceFactory< ServiceRequest, Response = ServiceResponse, Config = (), InitError = (), Error = Error, >, > { App::new().wrap(from_fn(redirect_to_www)).route( "/", web::get().to(|| async { HttpResponse::Ok().body("content") }), ) } #[actix_web::test] async fn redirect_non_www() { es; use tokio::{ io::AsyncWrite, sync::mpsc::{UnboundedReceiver, UnboundedSender}, }; /// Returns an `AsyncWrite` response body writer and its associated body type. /// /// # Examples /// ``` /// # use actix_web::{HttpResponse, web}; /// use tokio::io::AsyncWriteExt as _; /// use actix_web_lab::body; /// /// # async fn index() { /// let (mut wrt, body) = body::writer(); /// /// let _ = tokio::spawn(async move { /// wrt.write_all(b"body from another thread").await /// }); /// /// HttpResponse::Ok().body(body) /// # ;} /// ``` pub fn writer() -> (Writer, impl MessageBody) { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); (Writer { tx }, BodyStream { rx }) } /// An `AsyncWrite` response body writer. #[derive(Debug, Clone)] pub struct Writer { tx: UnboundedSender, } impl AsyncWrite for Writer { fn poll_write( self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { self.tx .send(Bytes(), "data: foo\n\n"); let st = stream::repeat(Ok::<_, Infallible>(Event::Data(Data::new("foo")))).take(2); let sse = Sse::from_stream(st); assert_eq!( body::to_bytes(sse).await.unwrap(), "data: foo\n\ndata: foo\n\n", ); } #[actix_web::test] async fn appropriate_headers_are_set_on_responder() { let st = stream::empty::>(); let sse = Sse::from_stream(st); let res = sse.respond_to(&TestRequest::default().to_http_request()); assert_response_matches!(res, OK; "content-type" => "text/event-stream" "content-encoding" => "identity" "cache-control" => "no-cache" ); } #[actix_web::test] async fn messages_are_received_from_sender() { let (sender, mut sse) = channel(9); assert!(poll_fn(|cx| Pin::new(&mut sse).poll_next(cx)) .now_or_never() .is_none()); sender.send(Data::new("bar").event("foo"amples /// ``` /// #[actix_web::main] async fn test() { /// use actix_web_lab::sse; /// /// let (sender, sse_stream) = sse::channel(5); /// sender.try_send(sse::Data::new("my data").event("my event name")).unwrap(); /// sender.try_send(sse::Event::Comment("my comment".into())).unwrap(); /// # } test(); /// ``` pub fn try_send(&self, msg: impl Into) -> Result<(), TrySendError> { self.tx.try_send(msg.into()).map_err(|err| match err { mpsc::error::TrySendError::Full(ev) => TrySendError::Full(ev), mpsc::error::TrySendError::Closed(ev) => TrySendError::Closed(ev), }) } } pin_project! { /// Server-sent events (`text/event-stream`) responder. /// /// Constructed with an [SSE channel](channel) or [using your own stream](Self::from_stream). #[must_use] #[derive(Debug)] pub struct Sse { #[pin] stream: S, keep_alive: Option, retry_interval: Option, p_data(Data::new(AbcSigningKey([0; 32]))) .route( "/", web::post().to(|body: RequestSignature| async move { let (body, sig) = body.into_parts(); let sig = sig.into_bytes().to_vec(); format!("{body:?}\n\n{sig:x?}") }), ) }) .workers(1) .bind(("127.0.0.1", 8080))? .run() .await } //! Expiremental testing utilities. #[doc(inline)] #[cfg(test)] pub(crate) use crate::test_header_macros::{header_round_trip_test, header_test_module}; #[doc(inline)] pub use crate::test_request_macros::test_request; #[doc(inline)] pub use crate::test_response_macros::assert_response_matches; pub use crate::test_services::echo_path_service; //! Semantic server-sent events (SSE) responder with a channel-like interface. //! //! # Examples //! ```no_run //! use std::{convert::Infallible, time::Duration}; //! use actix_web::{Responder, get}; //! use actix_web_lab::sse; //! /ranges: unsafe { IpCidrCombiner::from_cidr_vec_unchecked(ipv4_cidr_vec, ipv6_cidr_vec) }, } } } /// Fetched trusted Cloudflare IP addresses from their API. #[cfg(feature = "fetch-ips")] pub async fn fetch_trusted_cf_ips() -> Result { let client = awc::Client::new(); tracing::debug!("fetching cloudflare ips"); let mut res = client.get(CF_URL_IPS).send().await.map_err(|err| { tracing::error!("{err}"); Err::Fetch })?; tracing::debug!("parsing response"); let res = res.json::().await.map_err(|err| { tracing::error!("{err}"); Err::Fetch })?; TrustedIps::try_from_response(res) } #[cfg(test)] mod tests { use super::*; #[test] fn cf_ips_from_response() { let res = CfIpsResponse::Failure { success: false }; assert!(Trustednse::Success { result } => result, CfIpsResponse::Failure { .. } => { tracing::error!("parsing response returned success: false"); return Err(Err::Fetch); } }; let mut cidr_ranges = IpCidrCombiner::new(); for cidr in ips.ipv4_cidrs { cidr_ranges.push(IpCidr::V4(cidr)); } for cidr in ips.ipv6_cidrs { cidr_ranges.push(IpCidr::V6(cidr)); } Ok(Self { cidr_ranges }) } /// Add trusted IP range to list. pub fn with_ip_range(mut self, cidr: IpCidr) -> Self { self.cidr_ranges.push(cidr); self } /// Returns true if `ip` is controlled by Cloudflare. pub fn contains(&self, ip: IpAddr) -> bool { self.cidr_ranges.contains(ip) } } impl Clone for TrustedIps { fn clone(&self) -> Self { let ipv4_cidr_vec = self.cidr_ranges.get_ipv4_cidrs().to_vec(); let ipv6_cidr_vec = self.cidr_ranges.get_ipv6_cidrs().to_vec(); //! Utilities for working with Actix Web types. // stuff in here comes in and out of usage #![allow(dead_code)] use std::{ convert::Infallible, io, pin::Pin, task::{ready, Context, Poll}, }; use actix_http::{error::PayloadError, BoxedPayloadStream}; use actix_web::{dev, web::BufMut}; use futures_core::Stream; use futures_util::StreamExt as _; use local_channel::mpsc; /// Returns an effectively cloned payload that supports streaming efficiently. /// /// The cloned payload: /// - yields identical chunks; /// - does not poll ahead of the original; /// - does not poll significantly slower than the original; /// - receives an error signal if the original errors, but details are opaque to the copy. /// /// If the payload is forked in one of the extractors used in a handler, then the original _must_ be /// read in another extractor or else the request will hang. pub fn fork_request_payload(orig_payload: &mut dev::Payload) -> dev::Payload { const TARGET: &str = concat!(module_path!(), "::fork_q).unwrap(), expect); } #[track_caller] pub(crate) fn assert_parse_fail< H: Header + fmt::Debug, I: IntoIterator, V: AsRef<[u8]>, >( headers: I, ) { let req = req_from_raw_headers::(headers); H::parse(&req).unwrap_err(); } } #[cfg(test)] pub(crate) use header_test_helpers::{assert_parse_eq, assert_parse_fail}; use actix_client_ip_cloudflare::{fetch_trusted_cf_ips, CfConnectingIp, TrustedClientIp}; use actix_web::{get, web::Header, App, HttpServer, Responder}; #[get("/raw-header")] async fn header(Header(client_ip): Header) -> impl Responder { match client_ip { CfConnectingIp::Trusted(_ip) => unreachable!(), CfConnectingIp::Untrusted(ip) => format!("Possibly fake client IP: {ip}"), } } #[get("/client-ip")] async fn trusted_client_ip(client_ip: TrustedClientIp) -> impl Responder { format!("Trusted client IP: {client_ip}") } #[actix_web::main] async fn main() -> std::io::Res by: Some("203.0.113.43".to_owned()), r#for: vec!["192.0.2.60".to_owned()], host: Some("rust-lang.org".to_owned()), proto: Some("https".to_owned()), }; assert_eq!( fwd.try_into_value().unwrap(), r#"by="203.0.113.43"; for="192.0.2.60"; host="rust-lang.org"; proto="https""# ); } #[test] fn case_sensitivity() { assert_parse_eq::( ["For=192.0.2.60"], Forwarded { r#for: vec!["192.0.2.60".to_owned()], ..Forwarded::default() }, ); } #[test] fn weird_whitespace() { assert_parse_eq::( ["for= 1.2.3.4; proto= https"], Forwarded { r#for: vec!["1.2.3.4".to_owned()], proto: Some("https".to_owned()), ..Forwarded::default() }, ); assert_parse_eq::( [" for = 1.2. let mut proto = None; let mut r#for = vec![]; // "for=1.2.3.4, for=5.6.7.8; scheme=https" for (name, val) in val .split(';') // ["for=1.2.3.4, for=5.6.7.8", " proto=https"] .flat_map(|vals| vals.split(',')) // ["for=1.2.3.4", " for=5.6.7.8", " proto=https"] .flat_map(|pair| { let mut items = pair.trim().splitn(2, '='); Some((items.next()?, items.next()?)) }) { // [(name , val ), ... ] // [("for", "1.2.3.4"), ("for", "5.6.7.8"), ("scheme", "https")] match name.trim().to_lowercase().as_str() { "by" => { // multiple values on other properties have no defined semantics by.get_or_insert_with(|| unquote(val)); } "for" => { // parameter order is defined to be client first and last proxy last tractor that /// _takes_ the payload. In this case, the resulting hash will be as if an empty input was given to /// the hasher. /// /// # Example /// ``` /// use actix_web::{Responder, web}; /// use actix_hash::BodyHash; /// use sha2::Sha256; /// /// # type T = u64; /// async fn hash_payload(form: BodyHash, Sha256>) -> impl Responder { /// if !form.verify_slice(b"correct-signature") { /// // return unauthorized error /// } /// /// "Ok" /// } /// ``` #[derive(Debug, Clone)] pub struct BodyHash { inner: T, hash: GenericArray, } impl BodyHash { /// Returns hash slice. pub fn hash(&self) -> &[u8] { self.hash.as_slice() } /// Returns hash output size. pub fn hash_size(&self) -> usize { self.hash.len() } /// Verifies HMAC hash against provided `tag` using constant-time equality. pub fn verify_slice(&self, tag: &[u8]) -> bool { use subtle::ConstantTimeEq as _; use async_trait::async_trait; use bytes::{BufMut as _, BytesMut}; use ed25519_dalek::{PublicKey, Signature, Verifier as _}; use hex_literal::hex; use once_cell::sync::Lazy; use rustls::{Certificate, PrivateKey, ServerConfig}; use rustls_pemfile::{certs, pkcs8_private_keys}; use tracing::info; const APP_PUBLIC_KEY_BYTES: &[u8] = &hex!("d7d9a14753b591be99a0c5721be8083b1e486c3fcdc6ac08bfb63a6e5c204569"); static SIG_HDR_NAME: HeaderName = HeaderName::from_static("x-signature-ed25519"); static TS_HDR_NAME: HeaderName = HeaderName::from_static("x-signature-timestamp"); static APP_PUBLIC_KEY: Lazy = Lazy::new(|| PublicKey::from_bytes(APP_PUBLIC_KEY_BYTES).unwrap()); #[derive(Debug)] struct DiscordWebhook { /// Signature taken from webhook request header. candidate_signature: Signature, /// Cloned payload state. chunks: Vec, } impl DiscordWebhook { fn get_timestamp(req: &HttpRequest) -> Result<&[u8], Error> { req.headers() .get(&TS_HDR_NAME) s_core::ready; use pin_project_lite::pin_project; /// Creates a middleware from an async function that is used as a mapping function for a /// [`ServiceResponse`]. /// /// # Examples /// Adds header: /// ``` /// # use actix_web_lab::middleware::map_response; /// use actix_web::{body::MessageBody, dev::ServiceResponse, http::header}; /// /// async fn add_header( /// mut res: ServiceResponse, /// ) -> actix_web::Result> { /// res.headers_mut() /// .insert(header::WARNING, header::HeaderValue::from_static("42")); /// /// Ok(res) /// } /// # actix_web::App::new().wrap(map_response(add_header)); /// ``` /// /// Maps body: /// ``` /// # use actix_web_lab::middleware::map_response; /// use actix_web::{body::MessageBody, dev::ServiceResponse}; /// /// async fn mutate_body_type( /// res: ServiceResponse, /// ) -> actix_web::Result> { /// Ok(res.map_into_left_body::<()>()) /// }b); // catch panics in service call AssertUnwindSafe(self.service.call(req)) .catch_unwind() .map(move |maybe_res| match maybe_res { Ok(res) => res, Err(panic_err) => { // invoke callback with panic arg (cb)(&panic_err); // continue unwinding panic::resume_unwind(panic_err) } }) .boxed_local() } } #[cfg(test)] mod tests { use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, }; use actix_web::{ dev::Service as _, test, web::{self, ServiceConfig}, App, }; use super::*; fn configure_test_app(cfg: &mut ServiceConfig) { cfg.route("/", web::get().to(|| async { "content" })).route( "/disco", #[allow(unreachable_code)] web::get().to(|| async { panic!("the disco"); "" // /// /// Deserialize payload with a higher 32MiB limit. /// #[post("/big-payload")] /// async fn big_payload(info: UrlEncodedForm) -> String { /// format!("Welcome {}!", info.username) /// } /// ``` #[doc(alias = "html_form", alias = "html form", alias = "form")] #[derive(Debug, Deref, DerefMut, Display)] pub struct UrlEncodedForm(pub T); impl UrlEncodedForm { /// Unwraps into inner `T` value. pub fn into_inner(self) -> T { self.0 } } /// See [here](#extractor) for example of usage as an extractor. impl FromRequest for UrlEncodedForm { type Error = Error; type Future = UrlEncodedFormExtractFut; #[inline] fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { UrlEncodedFormExtractFut { req: Some(req.clone()), fut: UrlEncodedFormBody::new(req, payloice { service: boxed::rc_service(service), mw_fn: Rc::clone(&self.mw_fn), _phantom: PhantomData, })) } } /// Middleware service for [`from_fn`]. pub struct MiddlewareFnService { service: RcService, Error>, mw_fn: Rc, _phantom: PhantomData<(B, Es)>, } impl Service for MiddlewareFnService where F: Fn(ServiceRequest, Next) -> Fut, Fut: Future, Error>>, B2: MessageBody, { type Response = ServiceResponse; type Error = Error; type Future = Fut; forward_ready!(service); fn call(&self, req: ServiceRequest) -> Self::Future { (self.mw_fn)( req, Next:: { service: Rc::clone(&self.service), }, ) } } macro_rules! impl_middleware_fn_service { ($($ext_type:ident),*) => { impl Trans assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR); } #[actix_web::test] async fn test_override_data() { let srv = init_service( App::new().app_data(LocalData::new(1usize)).service( web::resource("/") .app_data(LocalData::new(10usize)) .route(web::get().to(|data: LocalData| { assert_eq!(*data, 10); HttpResponse::Ok() })), ), ) .await; let req = TestRequest::default().to_request(); let resp = srv.call(req).await.unwrap(); assert_eq!(resp.status(), StatusCode::OK); } #[actix_web::test] async fn test_data_from_rc() { let data_new = LocalData::new(String::from("test-123")); let data_from_rc = LocalData::from(Rc::new(String::from("test-123"))); assert_eq!(data_new.0, data_from_rc.0); } #[actix_web::test] async fn test_data_from_dyn_rc() nsume_chunk(&mut self, _req: &HttpRequest, chunk: Bytes) -> Result<(), Self::Error> { Digest::update(&mut self.hasher, &chunk); Ok(()) } async fn finalize(self, _req: &HttpRequest) -> Result { println!("using key: {:X?}", &self.key); let mut hmac = >::new_from_slice(&self.key).unwrap(); let payload_hash = self.hasher.finalize(); println!("payload hash: {payload_hash:X?}"); Mac::update(&mut hmac, &payload_hash); Ok(hmac.finalize()) } fn verify( signature: Self::Signature, req: &HttpRequest, ) -> Result { let user_sig = get_user_signature(req)?; let user_sig = CtOutput::new(GenericArray::from_slice(&user_sig).to_owned()); if signature == user_sig { Ok(signature) } else { Err(error::ErrorUnauthorized( "given signature does not match calculated signature", form: web::Json>, } fn main() {} use std::io; use actix_web::{ error, http::header::HeaderValue, middleware::Logger, web::{self, Bytes}, App, Error, HttpRequest, HttpServer, }; use actix_web_lab::extract::{RequestSignature, RequestSignatureScheme}; use async_trait::async_trait; use digest::{CtOutput, Digest, Mac}; use generic_array::GenericArray; use hmac::SimpleHmac; use sha2::{Sha256, Sha512}; use tracing::info; #[allow(non_upper_case_globals)] const db: () = (); async fn lookup_public_key_in_db(_db: &(), val: T) -> T { val } /// Extracts user's public key from request and pretends to look up secret key in the DB. async fn get_base64_api_key(req: &HttpRequest) -> actix_web::Result> { // public key, not encryption key let pub_key = req .headers() .get("Api-Key") .map(HeaderValue::as_bytes) .map(base64::decode) .transpose() .map_err(|_| error::ErrorInternalServerError("invalid api key"))?app, req).await; assert_eq!(res.status(), StatusCode::OK); let body = test::read_body(res).await; assert_eq!(body, "content"); } #[actix_web::test] async fn catch_panic_return_internal_server_error_response() { let app = test::init_service(test_app()).await; let req = test::TestRequest::with_uri("/disco").to_request(); let err = match app.call(req).await { Ok(_) => panic!("unexpected Ok response"), Err(err) => err, }; let res = err.error_response(); assert_eq!(res.status(), StatusCode::INTERNAL_SERVER_ERROR); let body = to_bytes(res.into_body()).await.unwrap(); assert!(body.is_empty()); } } use std::{ pin::Pin, task::{Context, Poll}, }; use actix_web::body::{BodySize, MessageBody}; use bytes::Bytes; use tokio::sync::mpsc::{error::SendError, UnboundedReceiver, UnboundedSender}; use crate::BoxError; /// Returns a sender half and a receiver half that can be used as a body ton: Cow<'static, str>, } impl Spa { /// Location of the SPA index file. /// /// This file will be served if: /// - the Actix Web router has reached this service, indicating that none of the API routes /// matched the URL path; /// - and none of the static resources handled matched. /// /// The default is "./index.html". I.e., the `index.html` file located in the directory that /// the server is running from. pub fn index_file(mut self, index_file: impl Into>) -> Self { self.index_file = index_file.into(); self } /// The URL path prefix that static files should be served from. /// /// The default is "/". I.e., static files are served from the root URL path. pub fn static_resources_mount( mut self, static_resources_mount: impl Into>, ) -> Self { self.static_resources_mount = static_resources_mount.into(); self } /// The location in the filesystem to ser #[doc = concat!("# type Hasher = ", stringify!($digest), ";")] #[doc = concat!("# const OutSize: usize = ", $out_size, ";")] /// # assert_eq!( /// # digest::generic_array::GenericArray::::OutputSize /// # >::default().len(), /// # OutSize /// # ); /// ``` #[cfg(feature = $feature)] pub type $name = BodyHash; }; } // Obsolete body_hash_alias!(BodyMd4, md4::Md4, "md4", "MD4", 16); body_hash_alias!(BodyMd5, md5::Md5, "md5", "MD5", 16); body_hash_alias!(BodySha1, sha1::Sha1, "sha1", "SHA-1", 20); // SHA-2 body_hash_alias!(BodySha224, sha2::Sha224, "sha2", "SHA-224", 28); body_hash_alias!(BodySha256, sha2::Sha256, "sha2", "SHA-256", 32); body_hash_alias!(BodySha384, sha2::Sha384, "sha2", "SHA-384", 48); body_hash_alias!(BodySha512, sha2::Sha512, "sha2", "SHA-512", 64); // SHA-3 body_hash_alias!(BodySha3_224, sha3::Sha3_224, "sha3", "SHA-3-224", e_slash.replace_all(&path, "/"); // Ensure root paths are still resolvable. If resulting path is blank after previous // step it means the path was one or more slashes. Reduce to single slash. let path = if path.is_empty() { "/" } else { path.as_ref() }; // Check whether the path has been changed // // This check was previously implemented as string length comparison // // That approach fails when a trailing slash is added, // and a duplicate slash is removed, // since the length of the strings remains the same // // For example, the path "/v1//s" will be normalized to "/v1/s/" // Both of the paths have the same length, // so the change can not be deduced from the length comparison if path != original_path { let mut parts = head.uri.clone().into_parts(); let query = parts.path_and_query.as_ref().and3, 21, 247, 0]))); dbg!(ips.contains(IpAddr::from([103, 21, 248, 0]))); } use actix_web::{ body::MessageBody, dev::{ServiceRequest, ServiceResponse}, web::Redirect, Error, Responder, }; use crate::middleware_from_fn::Next; /// A function middleware to redirect traffic to `www.` if not already there. /// /// # Examples /// ``` /// # use actix_web::App; /// use actix_web_lab::middleware::{from_fn, redirect_to_www}; /// /// App::new() /// .wrap(from_fn(redirect_to_www)) /// # ; /// ``` pub async fn redirect_to_www( req: ServiceRequest, next: Next, ) -> Result, Error> { #![allow(clippy::await_holding_refcell_ref)] // RefCell is dropped before await let (req, pl) = req.into_parts(); let conn_info = req.connection_info(); if !conn_info.host().starts_with("www.") { let scheme = conn_info.scheme(); let host = conn_info.host(); let path = req.uri().path(); let uri = form$hdr_name:expr => $hdr_val:expr)+; @raw $payload:expr) => {{ assert_response_matches!($res, $status; $($hdr_name => $hdr_val)+); assert_eq!(::actix_web::test::read_body($res).await, $payload); }}; ($res:ident, $status:ident; @json $payload:tt) => {{ assert_response_matches!($res, $status); assert_eq!( ::actix_web::test::read_body_json::<$crate::__reexports::serde_json::Value, _>($res).await, $crate::__reexports::serde_json::json!($payload), ); }}; } pub use assert_response_matches; #[cfg(test)] mod tests { use actix_web::{ dev::ServiceResponse, http::header::ContentType, test::TestRequest, HttpResponse, }; use super::*; #[actix_web::test] async fn response_matching() { let res = ServiceResponse::new( TestRequest::default().to_http_request(), HttpResponse::Created() .insert_header(("date", "today")) .insert_header(("set-cookie", "a=b")) (Some(Ok(Event::retry_to_bytes(retry)))); } if let Poll::Ready(msg) = this.stream.poll_next(cx) { return match msg { Some(Ok(msg)) => Poll::Ready(Some(Ok(msg.into_bytes()))), Some(Err(err)) => Poll::Ready(Some(Err(err.into()))), None => Poll::Ready(None), }; } if let Some(ref mut keep_alive) = this.keep_alive { if keep_alive.poll_tick(cx).is_ready() { return Poll::Ready(Some(Ok(Event::keep_alive_bytes()))); } } Poll::Pending } } /// Create server-sent events (SSE) channel pair. /// /// The `buffer` argument controls how many unsent messages can be stored without waiting. /// /// The first item in the tuple is the sender half. Much like a regular channel, it can be cloned, /// sent to another thread/task, and send event messages to the response stream. It provides several /// methods that represent the event-stream format. /// /// The second ite.into()); self } /// Sets `id` field. pub fn set_id(&mut self, id: impl Into) { self.id = Some(id.into()); } /// Sets `event` name field, returning a new data message. pub fn event(mut self, event: impl Into) -> Self { self.event = Some(event.into()); self } /// Sets `event` name field. pub fn set_event(&mut self, event: impl Into) { self.event = Some(event.into()); } } impl From for Event { fn from(data: Data) -> Self { Self::Data(data) } } /// Server-sent events message containing one or more fields. #[must_use] #[derive(Debug, Clone)] pub enum Event { /// A `data` message with optional ID and event name. /// /// Data messages looks like this in the response stream. /// ```plain /// event: foo /// id: 42 /// data: my data /// /// data: { /// data: "multiline": "data" /// data: } /// ``` Data(Data), /// A comm(noop))); let _ = App::new().wrap(Compat::new(map_response_body(mutate_body_type))); } #[actix_web::test] async fn feels_good() { let app = test::init_service( App::new() .default_service(web::to(HttpResponse::Ok)) .wrap(map_response_body(|_req, body| async move { Ok(body) })) .wrap(map_response_body(noop)) .wrap(Logger::default()) .wrap(map_response_body(mutate_body_type)), ) .await; let req = test::TestRequest::default().to_request(); let body = test::call_and_read_body(&app, req).await; assert_eq!(body, "foo"); } } use actix_web::{ http::{Method, StatusCode}, web, App, HttpResponse, Responder, }; use actix_web_lab_derive::FromRequest; #[derive(Debug, FromRequest)] struct RequestParts { method: Method, pool: web::Data, body: String, body2: String, #[from_request(copy_from_app_data)] copied_data: u64, } asstedIps::try_from_response(res).is_err()); } } //! For path segment extractor documentation, see [`Path`]. use actix_router::PathDeserializer; use actix_utils::future::{ready, Ready}; use actix_web::{ dev::Payload, error::{Error, ErrorNotFound}, FromRequest, HttpRequest, }; use derive_more::{AsRef, Display, From}; use serde::de; use tracing::debug; /// Extract typed data from request path segments. /// /// Alternative to `web::Path` extractor from Actix Web that allows deconstruction, but omits the /// implementation of `Deref`. /// /// Unlike, [`HttpRequest::match_info`], this extractor will fully percent-decode dynamic segments, /// including `/`, `%`, and `+`. /// /// # Examples /// ``` /// use actix_web::get; /// use actix_web_lab::extract::Path; /// /// // extract path info from "/{name}/{count}/index.html" into tuple /// // {name} - deserialize a String /// // {count} - deserialize a u32 /// #[get("/{name}/{count}/index.html")] /// async fn index(Path((name, count)): Path<(String, u32 else { buf.extend_from_slice(&chunk); } } None => { let json = serde_json::from_slice::(buf) .map_err(JsonPayloadError::Deserialize)?; return Poll::Ready(Ok(json)); } } }, JsonBody::Error(e) => Poll::Ready(Err(e.take().unwrap())), } } } #[cfg(test)] mod tests { use actix_web::{http::header, test::TestRequest, web::Bytes}; use serde::{Deserialize, Serialize}; use super::*; #[derive(Serialize, Deserialize, PartialEq, Debug)] struct MyObject { name: String, } fn json_eq(err: JsonPayloadError, other: JsonPayloadError) -> bool { match err { JsonPayloadError::Overflow { .. } => { matches!(other, JsonPayloadError::Overflow { .. }) } JsonPayloadError::OverflowKnownLength { .. }format!("error from original stream: {err}"), )))) .unwrap(), } })); tracing::trace!(target: TARGET, "creating proxy payload"); *orig_payload = dev::Payload::from(proxy_stream); dev::Payload::Stream { payload: Box::pin(rx), } } /// An `io::Write`r that only requires mutable reference and assumes that there is space available /// in the buffer for every write operation or that it can be extended implicitly (like /// `bytes::BytesMut`, for example). /// /// This is slightly faster (~10%) than `bytes::buf::Writer` in such cases because it does not /// perform a remaining length check before writing. pub(crate) struct MutWriter<'a, B>(pub(crate) &'a mut B); impl<'a, B> MutWriter<'a, B> { pub fn get_ref(&self) -> &B { self.0 } } impl<'a, B: BufMut> io::Write for MutWriter<'a, B> { fn write(&mut self, buf: &[u8]) -> io::Result { self.0.put_slice(buf); Ok(buf.len()) } fn flush(&mut self) -> iof.into_chunk_stream()) } /// Creates a `Responder` type with a line-by-line serializing stream and `text/plain` /// content-type header. pub fn into_responder(self) -> impl Responder where S: 'static, T: 'static, E: 'static, { HttpResponse::Ok() .content_type(mime::TEXT_PLAIN_UTF_8) .message_body(self.into_body_stream()) .unwrap() } /// Creates a stream of serialized chunks. pub fn into_chunk_stream(self) -> impl Stream> { self.stream.map_ok(write_display) } } fn write_display(item: impl fmt::Display) -> Bytes { let mut buf = BytesMut::new(); let mut wrt = MutWriter(&mut buf); writeln!(wrt, "{item}").unwrap(); buf.freeze() } #[cfg(test)] mod tests { use std::error::Error as StdError; use actix_web::body; use futures_util::stream; use super::*; #[actix_web::test] async fn serializes_into_body() { let ndjson_body = Dio.map(|proto| format!("proto=\"{proto}\""))) .join("; ") .try_into_value() } } impl Header for Forwarded { fn name() -> HeaderName { header::FORWARDED } fn parse(msg: &M) -> Result { let combined = msg .headers() .get_all(Self::name()) .filter_map(|hdr| hdr.to_str().ok()) .filter_map(|hdr_str| match hdr_str.trim() { "" => None, val => Some(val), }) .collect::>(); if combined.is_empty() { return Err(ParseError::Header); } // pass to FromStr impl as if it were one concatenated header with semicolon joiners // https://datatracker.ietf.org/doc/html/rfc7239#section-7.1 combined.join(";").parse().map_err(|_| ParseError::Header) } } /// Trim whitespace then any quote marks. fn unquote(val: &str) -> &str { val.trim().trim_start_matches('"').trim_end_matche/// Should equal the `Host` request header field as received by the proxy. pub fn host(&self) -> Option<&str> { self.host.as_deref() } /// Returns the "proto" identifier, if set. /// /// Indicates which protocol was used to make the request (typically "http" or "https"). pub fn proto(&self) -> Option<&str> { self.proto.as_deref() } /// Adds an identifier to the "for" chain. /// /// Useful when re-forwarding a request and needing to update the request headers with previous /// proxy's address. pub fn push_for(&mut self, identifier: impl Into) { self.r#for.push(identifier.into()) } /// Returns true if all of the fields are empty. fn has_no_info(&self) -> bool { self.by.is_none() && self.r#for.is_empty() && self.host.is_none() && self.proto.is_none() } // TODO: parse with trusted IP ranges fn } impl str::FromStr for Forwarded { type Err = Infallible; #[inline] fn from_str(val: &str) -> Re // Ignore Pending because its possible the inner extractor never // polls the payload stream and ignore errors because they will be // propagated by original payload polls. Poll::Ready(Some(Err(_))) | Poll::Pending => break, } } Poll::Pending } } } BodyHashFutProj::InnerDone { inner, hasher, forked_payload, } => { let mut pl = Pin::new(forked_payload); // drain forked payload loop { match pl.as_mut().poll_next(cx) { // update hasher with chunks Poll::Ready(Some(Ok(chunk))) => hasher.update(&chunk), // when drain is complete, finalize hash and return parts keys: Vec = pkcs8_private_keys(key_file) .unwrap() .into_iter() .map(PrivateKey) .collect(); // exit if no keys could be parsed if keys.is_empty() { eprintln!("Could not locate PKCS 8 private keys."); std::process::exit(1); } config.with_single_cert(cert_chain, keys.remove(0)).unwrap() } use std::{io, time::Duration}; use actix_web::{ get, http::{ self, header::{ContentEncoding, ContentType}, }, App, HttpResponse, HttpServer, Responder, }; use actix_web_lab::body; use async_zip::{write::ZipFileWriter, ZipEntryBuilder}; use tokio::{ fs, io::{AsyncWrite, AsyncWriteExt as _}, }; fn zip_to_io_err(err: async_zip::error::ZipError) -> io::Error { io::Error::new(io::ErrorKind::Other, err) } async fn read_dir(zipper: &mut ZipFileWriter) -> io::Result<()> where W: AsyncWrite + Unpin, { let mut path = fs::canonicalize(env!("CARGO_MANIFEST_DIR")).await?; path.push("examples"); sync::OnceCell; use tracing::debug; /// A lazy extractor for thread-local data. /// /// Using `LazyData` as an extractor will not initialize the data; [`get`](Self::get) must be used. pub struct LazyData { inner: Rc>, } struct LazyDataInner { cell: OnceCell, fut: Cell>>, } impl Clone for LazyData { fn clone(&self) -> Self { Self { inner: self.inner.clone(), } } } impl fmt::Debug for LazyData { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Lazy") .field("cell", &self.inner.cell) .field("fut", &"..") .finish() } } impl LazyData { /// Constructs a new `LazyData` extractor with the given initialization function. /// /// Initialization functions must return a future that resolves to `T`. pub fn new(init: F) -> LazyData where F: FnOnce() -> Fut, Fut assert!(s.is_err()); let err_str = s.unwrap_err().to_string(); assert_eq!( err_str, "URL encoded payload is larger (9 bytes) than allowed (limit: 8 bytes).", ); } } //! Panic reporter middleware. //! //! See [`PanicReporter`] for docs. use std::{ any::Any, future::{ready, Ready}, panic::{self, AssertUnwindSafe}, rc::Rc, }; use actix_web::dev::{forward_ready, Service, Transform}; use futures_core::future::LocalBoxFuture; use futures_util::FutureExt as _; type PanicCallback = Rc; /// A middleware that triggers a callback when the worker is panicking. /// /// Mostly useful for logging or metrics publishing. The callback received the object with which /// panic was originally invoked to allow down-casting. /// /// # Examples /// ```ignore /// # use actix_web::App; /// use actix_web_lab::middleware::PanicReporter; /// use metrics::increment_counter; /// /// App::new() /// .wrap(PanicReporter::new(|_| increme::Bytes}; use serde::{Deserialize, Serialize}; use super::*; #[derive(Serialize, Deserialize, PartialEq, Debug)] struct MyObject { name: String, } fn err_eq(err: UrlencodedError, other: UrlencodedError) -> bool { match err { UrlencodedError::Overflow { .. } => { matches!(other, UrlencodedError::Overflow { .. }) } UrlencodedError::ContentType => matches!(other, UrlencodedError::ContentType), _ => false, } } #[actix_web::test] async fn test_extract() { let (req, mut pl) = TestRequest::default() .insert_header(header::ContentType::form_url_encoded()) .insert_header(( header::CONTENT_LENGTH, header::HeaderValue::from_static("9"), )) .set_payload(Bytes::from_static(b"name=test")) .to_http_parts(); let s = UrlEncodedForm::::fro let mw_fn = Rc::clone(&self.mw_fn); let service = Rc::clone(&self.service); Box::pin(async move { let ($($ext_type,)*) = req.extract::<($($ext_type,)*)>().await?; (mw_fn)($($ext_type),*, req, Next:: { service }).await }) } } }; } impl_middleware_fn_service!(E1); impl_middleware_fn_service!(E1, E2); impl_middleware_fn_service!(E1, E2, E3); impl_middleware_fn_service!(E1, E2, E3, E4); impl_middleware_fn_service!(E1, E2, E3, E4, E5); impl_middleware_fn_service!(E1, E2, E3, E4, E5, E6); impl_middleware_fn_service!(E1, E2, E3, E4, E5, E6, E7); impl_middleware_fn_service!(E1, E2, E3, E4, E5, E6, E7, E8); impl_middleware_fn_service!(E1, E2, E3, E4, E5, E6, E7, E8, E9); /// Wraps the "next" service in the middleware chain. pub struct Next { service: RcService, Error>, } impl Next { /// Equivalent to `Service::call(self, req)`. pub fn cal/ This works when Sized is required let dyn_rc_box: Rc> = Rc::new(Box::new(A {})); let data_arc_box = LocalData::from(dyn_rc_box); // This works when Data Sized Bound is removed let dyn_rc: Rc = Rc::new(A {}); let data_arc = LocalData::from(dyn_rc); assert_eq!(data_arc_box.get_num(), data_arc.get_num()) } #[actix_web::test] async fn test_get_ref_from_dyn_data() { let dyn_rc: Rc = Rc::new(A {}); let data_arc = LocalData::from(dyn_rc); let ref_data: &dyn TestTrait = &*data_arc; assert_eq!(data_arc.get_num(), ref_data.get_num()) } } use std::io; use actix_web::{get, App, HttpServer, Responder}; use actix_web_lab::respond::Cbor; use serde::Serialize; use tracing::info; #[derive(Debug, Serialize)] struct Test { one: u32, two: String, } #[get("/")] async fn index() -> impl Responder { Cbor(Test { one: 42, two: "two".to_owned(), }) } #[ [456, 789], ])) .into_body_stream(); let body_bytes = body::to_bytes(ndjson_body) .await .map_err(Into::>::into) .unwrap(); const EXP_BYTES: &str = "123,456\n\ 789,12\n\ 345,678\n\ 901,234\n\ 456,789\n"; assert_eq!(body_bytes, EXP_BYTES); } } use std::{any::type_name, ops::Deref, rc::Rc}; use actix_utils::future::{err, ok, Ready}; use actix_web::{dev::Payload, error, Error, FromRequest, HttpRequest}; use tracing::debug; /// A thread-local equivalent to [`SharedData`](crate::extract::SharedData). #[doc(alias = "state")] #[derive(Debug)] pub struct LocalData(Rc); impl LocalData { /// Constructs a new `LocalData` instance. pub fn new(item: T) -> LocalData { LocalData(Rc::new(item)) } } impl Deref for LocalData { type Target = T; fn deref(&self) -> &T { &self.0 } } impl Clone for Lo-results", n_items)) // alternative if you need more control of the HttpResponse: // // HttpResponse::Ok() // .insert_header(("content-type", NdJson::mime())) // .insert_header(("num-results", n_items)) // .body(NdJson::new(data_stream).into_body_stream()) } /// A comparison route that loads all the data into memory before sending it to the client. /// /// If you provide a high number in the query string like `?n=300000` you should be able to observe /// increasing memory usage of the process in your process monitor. #[get("/users-high-mem")] async fn get_high_mem_user_list(opts: web::Query) -> impl Responder { let n_items = opts.n.unwrap_or(10); let mut stream = streaming_data_source(n_items); // buffer all data from the source into a Bytes container let mut buf = BytesMut::new().writer(); while let Some(Ok(item)) = stream.next().await { serde_json::to_writer(&mut buf, &item).unwrap(); buf.write_all(b"\n").unwrap(); } Hrn self.tx.send(Err(err)).map_err(|SendError(err)| match err { Ok(_) => unreachable!(), Err(err) => err, }); } Ok(()) } } #[derive(Debug)] struct Receiver { rx: UnboundedReceiver>, } impl Receiver { fn new(rx: UnboundedReceiver>) -> Self { Self { rx } } } impl MessageBody for Receiver where E: Into, { type Error = E; fn size(&self) -> BodySize { BodySize::Stream } fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { self.rx.poll_recv(cx) } } #[cfg(test)] mod tests { use std::io; use super::*; static_assertions::assert_impl_all!(Sender: Send, Sync, Unpin); static_assertions::assert_impl_all!(Receiver: Send, Sync, Unpin, MessageBody); } use actix_hash::{BodyHash, BodySha256}; use actix_http::BoxedPayloadStreamregister_service( rdef, None, fn_service(move |req| serve_index(req, self.index_file.clone())), None, ); } } async fn serve_index( req: ServiceRequest, index_file: String, ) -> Result { trace!("serving default SPA page"); let (req, _) = req.into_parts(); let file = NamedFile::open_async(&index_file).await?; let res = file.into_response(&req); Ok(ServiceResponse::new(req, res)) } impl Default for Spa { fn default() -> Self { Self { index_file: Cow::Borrowed("./index.html"), static_resources_mount: Cow::Borrowed("/"), static_resources_location: Cow::Borrowed("./"), } } } // Code mostly copied from `tower`: // https://github.com/tower-rs/tower/tree/5064987f/tower/src/load_shed //! Load-shedding middleware. use std::{ cell::Cell, error::Error as StdError, fmt, future::Future, pin::Pin, task::{ready, Contexrde(rename = "user")] /// users: Vec, /// } /// /// // Deserialize `LogsParams` struct from query string. /// // This handler gets called only if the request's query parameters contain both fields. /// // A valid request path for this handler would be `/logs?type=reports&user=foo&user=bar"`. /// #[get("/logs")] /// async fn index(info: Query) -> impl Responder { /// let LogsParams { log_type, users } = info.into_inner(); /// format!("Logs request for type={log_type} and user list={users:?}!") /// } /// /// // Or use destructuring, which is equivalent to `.into_inner()`. /// #[get("/debug2")] /// async fn debug2(Query(info): Query) -> impl Responder { /// dbg!("Authorization object = {info:?}"); /// "OK" /// } /// ``` #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct Query(pub T); impl_more::impl_deref_and_mut!( in Query => T); impl_more::forward_display!( in Query); impl Query { /// Unwrap into inner `T` value. redirects, })) } } pub struct NormalizePathService { service: S, merge_slash: Regex, trailing_slash_behavior: TrailingSlash, use_redirects: Option, } impl Service for NormalizePathService where S: Service, Error = Error>, S::Future: 'static, { type Response = ServiceResponse>; type Error = Error; type Future = NormalizePathFuture; actix_service::forward_ready!(service); fn call(&self, mut req: ServiceRequest) -> Self::Future { let head = req.head_mut(); let mut path_altered = false; let original_path = head.uri.path(); // An empty path here means that the URI has no valid path. We skip normalization in this // case, because adding a path can make the URI invalid if !original_path.is_empty() { // Either adds a string to the end (duplicates will be removed anyways) or trims all ly. /// /// # Normalization Steps /// - Merges consecutive slashes into one. (For example, `/path//one` always becomes `/path/one`.) /// - Appends a trailing slash if one is not present, removes one if present, or keeps trailing /// slashes as-is, depending on which [`TrailingSlash`] variant is supplied /// to [`new`](NormalizePath::new()). /// /// # Default Behavior /// The default constructor chooses to strip trailing slashes from the end of paths with them /// ([`TrailingSlash::Trim`]). The implication is that route definitions should be defined without /// trailing slashes or else they will be inaccessible (or vice versa when using the /// `TrailingSlash::Always` behavior), as shown in the example tests below. /// /// # Examples /// ``` /// use actix_web::{web, middleware, App}; /// /// # actix_web::rt::System::new().block_on(async { /// let app = App::new() /// .wrap(middleware::NormalizePath::trim()) /// .route("/test", web::get().to(|| async { "test" })) /// .route("/unmatchable/", web: = match self.0 { true => req.into_response("short-circuited").map_into_right_body(), false => next.call(req).await?.map_into_left_body(), }; res.headers_mut() .insert(header::WARNING, HeaderValue::from_static("42")); Ok(res) } pub fn into_middleware( self, ) -> impl Transform< S, ServiceRequest, Response = ServiceResponse, Error = Error, InitError = (), > where S: Service, Error = Error> + 'static, B: MessageBody + 'static, { let this = Rc::new(self); from_fn(move |req, next| { let this = Rc::clone(&this); async move { Self::mw_cb(&this, req, next).await } }) } } #[actix_web::main] async fn main() -> io::Result<()> { env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); let bind = ("127.0.0.1", 8080); inond item is the responder and can, therefore, be used as a handler return type directly. /// The stream will be closed after all [senders](SseSender) are dropped. /// /// Read more about server-sent events in [this MDN article][mdn-sse]. /// /// See [module docs](self) for usage example. /// /// [mdn-sse]: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events pub fn channel(buffer: usize) -> (Sender, Sse) { let (tx, rx) = mpsc::channel(buffer); ( Sender { tx }, Sse { stream: ChannelStream(rx), keep_alive: None, retry_interval: None, }, ) } /// Stream implementation for channel-based SSE [`Sender`]. #[derive(Debug)] pub struct ChannelStream(mpsc::Receiver); impl Stream for ChannelStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.0.poll_recv(cx).map(|ev| ev.map(Ok)) A comment message. /// /// Comments look like this in the response stream. /// ```plain /// : my comment /// /// : another comment /// ``` Comment(ByteString), } #[doc(hidden)] #[deprecated(since = "0.17.0", note = "Renamed to `Event`. Prefer `sse::Event`.")] pub type SseMessage = Event; impl Event { /// Splits data into lines and prepend each line with `prefix`. fn line_split_with_prefix(buf: &mut BytesMut, prefix: &'static str, data: ByteString) { // initial buffer size guess is len(data) + 10 lines of prefix + EOLs + EOF buf.reserve(data.len() + (10 * (prefix.len() + 1)) + 1); // append prefix + space + line to buffer for line in data.split('\n') { buf.put_slice(prefix.as_bytes()); buf.put_slice(line.as_bytes()); buf.put_u8(b'\n'); } } /// Serializes message into event-stream format. fn into_bytes(self) -> Bytes { let mut buf = BytesMut::new(); match self { e_stream) = sse::channel(10); //! //! // note: sender will typically be spawned or handed off somewhere else //! let _ = sender.send(sse::Event::Comment("my comment".into())).await; //! let _ = sender.send(sse::Data::new("my data").event("chat_msg")).await; //! //! sse_stream.with_retry_duration(Duration::from_secs(10)) //! } //! //! #[get("/from-stream")] //! async fn from_stream() -> impl Responder { //! let event_stream = futures_util::stream::iter([ //! Ok::<_, Infallible>(sse::Event::Data(sse::Data::new("foo"))), //! ]); //! //! sse::Sse::from_stream(event_stream) //! .with_keep_alive(Duration::from_secs(5)) //! } //! ``` //! //! Complete usage examples can be found in the examples directory of the source code repo. #![doc( alias = "server sent", alias = "server-sent", alias = "server sent events", alias = "server-sent events", alias = "event-stream" )] use std::{ convert::Infallible, pin::Pin, task::{Context, Poll}, time::p(); } #[actix_web::test] async fn test_request_extract() { let mut req = TestRequest::with_uri("/name/user1/?id=test").to_srv_request(); let resource = ResourceDef::new("/{key}/{value}/"); resource.capture_match_info(req.match_info_mut()); let (req, mut pl) = req.into_parts(); let s = Path::::from_request(&req, &mut pl).await.unwrap(); assert_eq!(format!("{s}"), "MyStruct(name, user1)"); assert_eq!( format!("{s:?}"), "Path(MyStruct { key: \"name\", value: \"user1\" })" ); let mut s = s.into_inner(); assert_eq!(s.key, "name"); assert_eq!(s.value, "user1"); s.value = "user2".to_string(); assert_eq!(s.value, "user2"); let Path(s) = Path::<(String, String)>::from_request(&req, &mut pl) .await .unwrap(); assert_eq!(s.0, "name"); assert_eq!(s.1, "user1"); let mut req = TestRequest::with_uri("/name/32/").to_b enum Err { Fetch, } impl_more::impl_display_enum!(Err, Fetch => "failed to fetch"); impl std::error::Error for Err {} #[derive(Debug, Deserialize)] pub struct CfIpsResult { ipv4_cidrs: Vec, ipv6_cidrs: Vec, } #[derive(Debug, Deserialize)] #[serde(untagged)] pub enum CfIpsResponse { Success { result: CfIpsResult }, Failure { success: bool }, } /// Trusted IP ranges. #[derive(Debug)] pub struct TrustedIps { cidr_ranges: IpCidrCombiner, } impl TrustedIps { pub fn try_from_response(res: CfIpsResponse) -> Result { let ips = match res { CfIpsResponse::Success { result } => result, CfIpsResponse::Failure { .. } => { tracing::error!("parsing response returned success: false"); return Err(Err::Fetch); } }; let mut cidr_ranges = IpCidrCombiner::new(); for cidr in ips.ipv4_cidrs { cidr_ranges.push(IpCidr::Vet req = this.req.take().unwrap(); debug!( "Failed to deserialize Json<{}> from payload in handler: {}", core::any::type_name::(), req.match_name().unwrap_or_else(|| req.path()) ); Err(err.into()) } Ok(data) => Ok(Json(data)), }; Poll::Ready(res) } } /// Future that resolves to some `T` when parsed from a JSON payload. /// /// Can deserialize any type `T` that implements [`Deserialize`][serde::Deserialize]. /// /// Returns error if: /// - `Content-Type` is not `application/json`. /// - `Content-Length` is greater than `LIMIT`. /// - The payload, when consumed, is not valid JSON. pub enum JsonBody { Error(Option), Body { /// Length as reported by `Content-Length` header, if present. length: Option, // #[cfg(feature = "__compress")] // payload: Decompress, // next load in handler loads new value let extracted_data = SwapData::::extract(&req).await.unwrap(); assert_eq!(**extracted_data.load(), NonCopy(80)); // initial extracted data stays the same assert_eq!(*initial_data, NonCopy(42)); } } //! Experimental body types. //! //! Analogous to the `body` module in Actix Web. pub use crate::{ body_async_write::{writer, Writer}, body_channel::{channel, Sender}, infallible_body_stream::{new_infallible_body_stream, new_infallible_sized_stream}, }; //! Experimental typed headers. pub use crate::{ cache_control::{CacheControl, CacheDirective}, content_length::ContentLength, forwarded::Forwarded, strict_transport_security::StrictTransportSecurity, }; #[cfg(test)] mod header_test_helpers { use std::fmt; use actix_http::header::Header; use actix_web::{test, HttpRequest}; fn req_from_raw_headers, V: AsRef<[u8]>>( header_lines: I, )e { fn eq(&self, other: &ContentLength) -> bool { *self == other.0 } } impl PartialOrd for ContentLength { fn partial_cmp(&self, other: &usize) -> Option { self.0.partial_cmp(other) } } impl PartialOrd for usize { fn partial_cmp(&self, other: &ContentLength) -> Option { self.partial_cmp(&other.0) } } #[cfg(test)] mod tests { use super::*; use crate::header::{assert_parse_eq, assert_parse_fail}; #[test] fn missing_header() { assert_parse_fail::([""; 0]); assert_parse_fail::([""]); } #[test] fn bad_header() { assert_parse_fail::(["-123"]); assert_parse_fail::(["123_456"]); assert_parse_fail::(["123.456"]); // too large for u64 (2^64, 2^64 + 1) assert_parse_fail::(["18446744073709551616"]); //! Experimental route guards. //! //! Analogous to the `guard` module in Actix Web. #[allow(deprecated)] pub use crate::acceptable::Acceptable; //! Extractor for client IP addresses when proxied through Cloudflare. // #![forbid(unsafe_code)] // urgh why cidr-utils #![deny(rust_2018_idioms, nonstandard_style)] #![warn(future_incompatible)] // #![warn(missing_docs)] #![cfg_attr(docsrs, feature(doc_auto_cfg))] mod extract; mod fetch_cf_ips; mod header_v4; // mod header_v6; pub use self::extract::TrustedClientIp; #[cfg(feature = "fetch-ips")] pub use self::fetch_cf_ips::fetch_trusted_cf_ips; pub use self::fetch_cf_ips::{TrustedIps, CF_URL_IPS}; pub use self::header_v4::CfConnectingIp; //! Forwarded typed header. //! //! See [`Forwarded`] docs. use std::{convert::Infallible, str}; use actix_web::{ error::ParseError, http::header::{self, Header, HeaderName, HeaderValue, TryIntoHeaderValue}, HttpMessage, }; use itertools::Itertools as _; // TODO: implement typed parsing of Node identifiers as pergest::{generic_array::GenericArray, Digest}; use futures_core::Stream as _; use pin_project_lite::pin_project; use tracing::trace; /// Parts of the resulting body hash extractor. pub struct BodyHashParts { /// Extracted item. pub inner: T, /// Bytes of the calculated hash. pub hash_bytes: Vec, } /// Wraps an extractor and calculates a body checksum hash alongside. /// /// If your extractor would usually be `T` and you want to create a hash of type `D` then you need /// to use `BodyHash`. E.g., `BodyHash`. /// /// Any hasher that implements [`Digest`] can be used. Type aliases for common hashing algorithms /// are available at the crate root. /// /// # Errors /// This extractor produces no errors of its own and all errors from the underlying extractor are /// propagated correctly; for example, if the payload limits are exceeded. /// /// # When Used On The Wrong Extractor /// Use on a non-body extractor is tolerated unless it is used after a different extractor thignature"))? .ok_or_else(|| error::ErrorUnauthorized("signature not provided"))? .try_into() .map_err(|_| error::ErrorInternalServerError("invalid signature"))?; Ok(Signature::from(sig)) } } #[async_trait(?Send)] impl RequestSignatureScheme for DiscordWebhook { type Signature = (BytesMut, Signature); type Error = Error; async fn init(req: &HttpRequest) -> Result { let ts = Self::get_timestamp(req)?.to_owned(); let candidate_signature = Self::get_signature(req)?; Ok(Self { candidate_signature, chunks: vec![Bytes::from(ts)], }) } async fn consume_chunk(&mut self, _req: &HttpRequest, chunk: Bytes) -> Result<(), Self::Error> { self.chunks.push(chunk); Ok(()) } async fn finalize(self, _req: &HttpRequest) -> Result { let buf_len = self.chunks.iter().map(|chunk| chunk.len()).sum(); let mut buf = ByteRc, #[pin] state: MapResFutState, } } pin_project! { #[project = MapResFutStateProj] enum MapResFutState { Svc { #[pin] fut: SvcFut }, Fn { #[pin] fut: FnFut }, } } impl Future for MapResFut where SvcFut: Future, Error>>, F: Fn(ServiceResponse) -> FnFut, FnFut: Future, Error>>, { type Output = Result, Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.as_mut().project(); match this.state.as_mut().project() { MapResFutStateProj::Svc { fut } => { let res = ready!(fut.poll(cx))?; let fut = (this.mw_fn)(res); this.state.set(MapResFutState::Fn { fut }); self.poll(cx) } MapResFutStateProj::Fn { fut } => fu chunk stream. /// /// This could be stabilized into Actix Web as `SizedStream::from_infallible()`. pub fn new_infallible_sized_stream>( size: u64, stream: S, ) -> SizedStream> { SizedStream::new(size, InfallibleStream::new(stream)) } use std::{convert::Infallible, io, time::Duration}; use actix_web::{get, middleware::Logger, App, HttpRequest, HttpServer, Responder}; use actix_web_lab::{extract::Path, respond::Html, sse}; use futures_util::stream; use time::format_description::well_known::Rfc3339; use tokio::time::sleep; #[get("/")] async fn index() -> impl Responder { Html(include_str!("./assets/sse.html").to_string()) } /// Countdown event stream starting from 8. #[get("/countdown")] async fn countdown(req: HttpRequest) -> impl Responder { // note: a more production-ready implementation might want to use the lastEventId header // sent by the reconnecting browser after the _retry_ period tracing::debug!("lastEventId: {:?}", req.headere> UrlEncodedFormBody { /// Create a new future to decode a URL-encoded request payload. pub fn new(req: &HttpRequest, payload: &mut Payload) -> Self { // check content-type let can_parse_form = if let Ok(Some(mime)) = req.mime_type() { mime == mime::APPLICATION_WWW_FORM_URLENCODED } else { false }; if !can_parse_form { return UrlEncodedFormBody::Error(Some(UrlencodedError::ContentType)); } let length = req .headers() .get(&header::CONTENT_LENGTH) .and_then(|l| l.to_str().ok()) .and_then(|s| s.parse::().ok()); // Notice the content-length is not checked against config limit here. // As the internal usage always call UrlEncodedBody::limit after UrlEncodedBody::new. // And limit check to return an error variant of UrlEncodedBody happens there. let payload = payload.take(); if let Some(len) = length { new() /// .wrap(from_fn(my_mw)) /// # ; /// ``` /// /// It is also possible to write a middleware that automatically uses extractors, similar to request /// handlers, by declaring them as the first parameters: /// ``` /// # use std::collections::HashMap; /// # use actix_web::{ /// # App, Error, /// # body::MessageBody, /// # dev::{ServiceRequest, ServiceResponse, Service as _}, /// # web, /// # }; /// use actix_web_lab::middleware::Next; /// /// async fn my_extracting_mw( /// string_body: String, /// query: web::Query>, /// req: ServiceRequest, /// next: Next, /// ) -> Result, Error> { /// // pre-processing /// next.call(req).await /// // post-processing /// } /// # actix_web::App::new().wrap(actix_web_lab::middleware::from_fn(my_extracting_mw)); pub fn from_fn(mw_fn: F) -> MiddlewareFn { MiddlewareFn { mw_fn: Rc::new(mw_fn), _phantom: PhantomData, }g, PartialEq, Eq, AsRef, Display, From)] pub struct BodyLimit { inner: T, } impl BodyLimit { /// Returns inner extracted type. pub fn into_inner(self) -> T { self.inner } } impl FromRequest for BodyLimit where T: FromRequest + 'static, T::Error: fmt::Debug + fmt::Display, { type Error = BodyLimitError; type Future = BodyLimitFut; fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { // fast check of Content-Length header match req.get_header::() { // CL header indicated that payload would be too large Some(len) if len > LIMIT => return BodyLimitFut::new_error(BodyLimitError::Overflow), _ => {} } let counter = crate::util::fork_request_payload(payload); BodyLimitFut { inner: Inner::Body { fut: Box::pin( if let Some(st) = req.app_data::>() { ok(st.clone()) } else { debug!( "Failed to extract `LocalData<{}>` for `{}` handler. For the LocalData extractor \ to work correctly, wrap the data with `LocalData::new()` and pass it to \ `App::app_data()`. Ensure that types align in both the set and retrieve calls.", type_name::(), req.match_name().unwrap_or_else(|| req.path()) ); err(error::ErrorInternalServerError( "Requested application data is not configured correctly. \ View/enable debug logs for more details.", )) } } } #[cfg(test)] mod tests { use actix_web::{ dev::Service, http::StatusCode, test::{init_service, TestRequest}, web, App, HttpResponse, }; use super::*; trait TestTrait { fn get_num(&self) -> i32; } struct A {} impl TestTrhttp://{}:{}", &bind.0, &bind.1); HttpServer::new(|| { App::new() .service(get_user_list) .service(get_high_mem_user_list) }) .workers(1) .bind(bind)? .run() .await } fn random_email() -> String { let rng = rand::thread_rng(); let id: String = rng .sample_iter(Alphanumeric) .take(10) .map(char::from) .collect(); format!("user_{id}@example.com") } fn random_address() -> String { let mut rng = rand::thread_rng(); let street_no: u16 = rng.gen_range(10..99); format!("{street_no} Random Street") } /// Create a `TestRequest` using a DSL that looks kinda like on-the-wire HTTP/1.x requests. /// /// # Examples /// ``` /// use actix_web::test::TestRequest; /// use actix_web_lab::test_request; /// /// let _req: TestRequest = test_request! { /// POST "/"; /// "Origin" => "example.com" /// "Access-Control-Request-Method" => "POST" /// "Access-Control-Request-Headers" => "Content-Type, X-CSRFxFuture; use futures_util::FutureExt as _; /// A middleware to catch panics in wrapped handlers and middleware, returning empty 500 responses. /// /// **This middleware should never be used as replacement for proper error handling.** See [this /// thread](https://github.com/actix/actix-web/issues/1501#issuecomment-627517783) for historical /// discussion on why Actix Web does not do this by default. /// /// It is recommended that this middleware be registered last. That is, `wrap`ed after everything /// else except `Logger`. /// /// # Examples /// ``` /// # use actix_web::App; /// use actix_web_lab::middleware::CatchPanic; /// /// App::new() /// .wrap(CatchPanic::default()) /// # ; /// ``` /// /// ```ignore /// // recommended wrap order /// App::new() /// .wrap(NormalizePath::default()) /// .wrap(CatchPanic::default()) // <- after everything except logger /// .wrap(Logger::default()) /// # ; /// ``` #[derive(Debug, Clone, Default)] #[non_exhaustive] pub struct CatchPanic; impl rr(err)) => return Poll::Ready(Err(Overloaded::Service(err))), res => res.is_ready(), }; self.is_ready.set(is_ready); // But we always report Ready, so that layers above don't wait until // the inner service is ready (the entire point of this layer!) Poll::Ready(Ok(())) } fn call(&self, req: Req) -> Self::Future { if self.is_ready.get() { // readiness only counts once, you need to check again! self.is_ready.set(false); LoadShedFuture::called(self.inner.call(req)) } else { LoadShedFuture::overloaded() } } } pin_project! { /// Future for [`LoadShedService`]. pub struct LoadShedFuture { #[pin] state: LoadShedFutureState, } } pin_project! { #[project = LoadShedFutureStateProj] enum LoadShedFutureState { Called { #[pin] fut: F }, Overloaded, } } impl LoadShedFuture { pub(crate) fn called(fut: F) -> Se:Blake2s256, "blake2", "Blake2s", 32); // Blake3 body_hash_alias!(BodyBlake3, blake3::Hasher, "blake3", "Blake3", 32); //! Demonstrates forking a request payload so that multiple extractors can derive data from a body. //! //! ```sh //! curl -X POST localhost:8080/ -d 'foo' //! //! # or using HTTPie //! http POST :8080/ --raw foo //! ``` use std::io; use actix_web::{dev, middleware, web, App, FromRequest, HttpRequest, HttpServer}; use actix_web_lab::util::fork_request_payload; use futures_util::{future::LocalBoxFuture, TryFutureExt as _}; use tokio::try_join; use tracing::info; struct TwoBodies(T, U); impl TwoBodies { fn into_parts(self) -> (T, U) { (self.0, self.1) } } impl FromRequest for TwoBodies where T: FromRequest, T::Future: 'static, U: FromRequest, U::Future: 'static, { type Error = actix_web::Error; type Future = LocalBoxFuture<'static, Result>; fn from_request(req: &HttpRequest, pl: &mut dev::Payload////something?query=test", ]; for uri in test_uris { let req = TestRequest::with_uri(uri).to_request(); let res = call_service(&app, req).await; assert!(res.status().is_success(), "Failed uri: {uri}"); } } #[actix_web::test] async fn always_trailing_slashes() { let app = init_service( App::new() .wrap(NormalizePath::new(TrailingSlash::Always)) .service(web::resource("/").to(HttpResponse::Ok)) .service(web::resource("/v1/something/").to(HttpResponse::Ok)) .service( web::resource("/v2/something/") .guard(fn_guard(|ctx| ctx.head().uri.query() == Some("query=test"))) .to(HttpResponse::Ok), ), ) .await; let test_uris = vec![ "/", "///", "/v1/something", "/v1/something/", "/v1/something////", h` middleware with the specified trailing slash style. pub fn new(behavior: TrailingSlash) -> Self { Self { trailing_slash_behavior: behavior, use_redirects: None, } } /// Constructs a new `NormalizePath` middleware with [trim](TrailingSlash::Trim) semantics. /// /// Use this instead of `NormalizePath::default()` to avoid deprecation warning. pub fn trim() -> Self { Self::new(TrailingSlash::Trim) } /// Configures middleware to respond to requests with non-normalized paths with a 307 redirect. /// /// If configured /// /// For example, a request with the path `/api//v1/foo/` would receive a response with a /// `Location: /api/v1/foo` header (assuming `Trim` trailing slash behavior.) /// /// To customize the status code, use [`use_redirects_with`](Self::use_redirects_with). pub fn use_redirects(mut self) -> Self { self.use_redirects = Some(StatusCode::TEMPORARY_REDIRECT); self } .body("Hello World!"), /// ); /// /// assert_response_matches!(res, CREATED; /// "date" => "today" /// "set-cookie" => "a=b"; /// @raw "Hello World!" /// ); /// /// let res = ServiceResponse::new( /// TestRequest::default().to_http_request(), /// HttpResponse::Created() /// .content_type(ContentType::json()) /// .insert_header(("date", "today")) /// .insert_header(("set-cookie", "a=b")) /// .body(r#"{"abc":"123"}"#), /// ); /// /// assert_response_matches!(res, CREATED; @json { "abc": "123" }); /// # }); /// ``` #[macro_export] macro_rules! assert_response_matches { ($res:ident, $status:ident) => {{ assert_eq!($res.status(), ::actix_web::http::StatusCode::$status) }}; ($res:ident, $status:ident; $($hdr_name:expr => $hdr_val:expr)+) => {{ assert_response_matches!($res, $status); $( assert_eq!( $res.headers().get(::actix_web::http::header::HeaderName::from_static($hdr_name)).unwrap(), assert_eq!( Event::Data(Data { id: Some("42".into()), event: Some("bar".into()), data: "foo".into() }) .into_bytes(), "id: 42\nevent: bar\ndata: foo\n\n" ); } #[test] fn retry_is_first_msg() { let waker = noop_waker(); let mut cx = Context::from_waker(&waker); { let (_sender, mut sse) = channel(9); assert!(Pin::new(&mut sse).poll_next(&mut cx).is_pending()); } { let (_sender, sse) = channel(9); let mut sse = sse.with_retry_duration(Duration::from_millis(42)); match Pin::new(&mut sse).poll_next(&mut cx) { Poll::Ready(Some(Ok(bytes))) => assert_eq!(bytes, "retry: 42\n\n"), res => panic!("poll should return retry message, got {res:?}"), } } } #[actix_web::test] async fn dropping_responder_causes_send_fails() { let (sender,(source))] Event), /// The receiving ([`Sse`]) has been dropped, likely because the client disconnected. #[display(fmt = "channel closed")] Closed(#[error(not(source))] Event), } #[doc(hidden)] #[deprecated( since = "0.17.0", note = "Renamed to `TrySendError`. Prefer `sse::TrySendError`." )] pub type SseTrySendError = TrySendError; /// Server-sent events data message containing a `data` field and optional `id` and `event` fields. /// /// Since it implements `Into`, this can be passed directly to [`send`](SseSender::send) /// or [`try_send`](SseSender::try_send). /// /// # Examples /// ``` /// # #[actix_web::main] async fn test() { /// use std::convert::Infallible; /// use actix_web::body; /// use serde::Serialize; /// use futures_util::stream; /// use actix_web_lab::sse; /// /// #[derive(serde::Serialize)] /// struct Foo { /// bar: u32, /// } /// /// let sse = sse::Sse::from_stream(stream::iter([ /// Ok::<_, Infallible>(sse::Event::Data(sse::Data::new("foo"))), /// .app_data(web::Data::new(42u32)) .default_service(web::to(handler)) }); let res = srv.post("/").send_body("foo").await.unwrap(); assert_eq!(res.status(), StatusCode::OK); } //! Alternative approach to using `BodyHmac` type using more flexible `RequestSignature` type. use std::io; use actix_web::{ middleware::Logger, web::{self, Bytes, Data}, App, Error, FromRequest, HttpRequest, HttpServer, }; use actix_web_lab::extract::{RequestSignature, RequestSignatureScheme}; use async_trait::async_trait; use digest::{CtOutput, Mac}; use hmac::SimpleHmac; use sha2::Sha256; use tracing::info; struct AbcSigningKey([u8; 32]); /// Grabs variable signing key from app data. async fn get_signing_key(req: &HttpRequest) -> actix_web::Result<[u8; 32]> { let key = Data::::extract(req).into_inner()?.0; Ok(key) } #[derive(Debug)] struct AbcApi { /// Payload hash state. hmac: SimpleHmac, } #[async_trait(?Send)] impl RequestSignatureScheme for Aody}, /// web::{BufMut as _, BytesMut}, /// HttpRequest, /// }; /// /// async fn append_bytes( /// _req: HttpRequest, /// body: impl MessageBody /// ) -> actix_web::Result { /// let buf = body::to_bytes(body).await.ok().unwrap(); /// /// let mut body = BytesMut::from(&buf[..]); /// body.put_slice(b" - hope you like things ruining your payload format"); /// /// Ok(body) /// } /// # actix_web::App::new().wrap(map_response_body(append_bytes)); /// ``` pub fn map_response_body(mapper_fn: F) -> MapResBodyMiddleware { MapResBodyMiddleware { mw_fn: Rc::new(mapper_fn), } } /// Middleware transform for [`map_response_body`]. pub struct MapResBodyMiddleware { mw_fn: Rc, } impl Transform for MapResBodyMiddleware where S: Service, Error = Error>, F: Fn(HttpRequest, B) -> Fut, Fut: Future>, B2: MessageBody, { tains("JSON payload (16 bytes) is larger than allowed (limit: 10 bytes)."), "unexpected error string: {err:?}" ); let (req, mut pl) = TestRequest::default() .insert_header(header::ContentType::json()) .insert_header(( header::CONTENT_LENGTH, header::HeaderValue::from_static("16"), )) .set_payload(Bytes::from_static(b"{\"name\": \"test\"}")) .to_http_parts(); let s = Json::::from_request(&req, &mut pl).await; let err = format!("{}", s.unwrap_err()); assert!( err.contains("larger than allowed"), "unexpected error string: {err:?}" ); } #[actix_web::test] async fn test_json_body() { let (req, mut pl) = TestRequest::default().to_http_parts(); let json = JsonBody::::new(&req, &mut pl).await; assert!(json_eq(json.unwrap_err(), JsonPayloadError::ContentType)); zeOwned; use tracing::debug; /// Default JSON payload size limit of 2MiB. pub const DEFAULT_JSON_LIMIT: usize = 2_097_152; /// JSON extractor with const-generic payload size limit. /// /// `Json` is used to extract typed data from JSON request payloads. /// /// # Extractor /// To extract typed data from a request body, the inner type `T` must implement the /// [`serde::Deserialize`] trait. /// /// Use the `LIMIT` const generic parameter to control the payload size limit. The default limit /// that is exported (`DEFAULT_LIMIT`) is 2MiB. /// /// ``` /// use actix_web::{post, App}; /// use actix_web_lab::extract::{DEFAULT_JSON_LIMIT, Json}; /// use serde::Deserialize; /// /// #[derive(Deserialize)] /// struct Info { /// username: String, /// } /// /// /// Deserialize `Info` from request's body. /// #[post("/")] /// async fn index(info: Json) -> String { /// format!("Welcome {}!", info.username) /// } /// /// const LIMIT_32_MB: usize = 33_554_432; /// /// /// Deserialize payload with a higher 32Mi /// async fn handler() -> impl Responder { /// let data_stream = streaming_data_source(); /// /// DisplayStream::new_infallible(data_stream) /// .into_responder() /// } /// ``` pub struct DisplayStream { // The wrapped item stream. #[pin] stream: S, } } impl DisplayStream { /// Constructs a new `DisplayStream` from a stream of lines. pub fn new(stream: S) -> Self { Self { stream } } } impl DisplayStream { /// Constructs a new `DisplayStream` from an infallible stream of lines. pub fn new_infallible(stream: S) -> DisplayStream> { DisplayStream::new(InfallibleStream::new(stream)) } } impl DisplayStream where S: Stream>, T: fmt::Display, E: Into> + 'static, { /// Creates a chunked body stream that serializes as CSV on-the-fly. pub fn into_body_stream(self) -> impl MessageBody { BodyStr #[test] fn for_multiple() { let fwd = Forwarded { r#for: vec!["192.0.2.60".to_owned(), "198.51.100.17".to_owned()], ..Forwarded::default() }; assert_eq!(fwd.for_client().unwrap(), "192.0.2.60"); assert_parse_eq::(["for=192.0.2.60, for=198.51.100.17"], fwd); } } //! Expiremental responders and response helpers. pub use crate::{csv::Csv, display_stream::DisplayStream, html::Html, ndjson::NdJson}; #[cfg(feature = "cbor")] pub use crate::cbor::Cbor; #[cfg(feature = "msgpack")] pub use crate::msgpack::{MessagePack, MessagePackNamed}; //! Content-Length typed header. //! //! See [`ContentLength`] docs. use std::{convert::Infallible, str}; use actix_web::{ error::ParseError, http::header::{ from_one_raw_str, Header, HeaderName, HeaderValue, TryIntoHeaderValue, CONTENT_LENGTH, }, HttpMessage, }; /// `Content-Length` header, defined in [RFC 9110 §8.6]. /// /// The "Content-Length" header field indicatw `Forwarded` header from a single "for" identifier. pub fn new_for(r#for: impl Into) -> Self { Self { by: None, r#for: vec![r#for.into()], host: None, proto: None, } } /// Returns first "for" parameter which is typically the client's identifier. pub fn for_client(&self) -> Option<&str> { // Taking the first value for each property is correct because spec states that first "for" // value is client and rest are proxies. We collect them in the order they are read. // // ```plain // > In a chain of proxy servers where this is fully utilized, the first // > "for" parameter will disclose the client where the request was first // > made, followed by any subsequent proxy identifiers. // - https://datatracker.ietf.org/doc/html/rfc7239#section-5.2 // ``` self.r#for.first().map(String::as_str) } /// Returns iterator over the "for" chain. /nue, // we can't read the file }; let filename = match entry.file_name().into_string() { Ok(filename) => filename, Err(_) => continue, // the file has a non UTF-8 name }; let mut entry = zipper .write_entry_stream(ZipEntryBuilder::new( filename, async_zip::Compression::Deflate, )) .await .map_err(zip_to_io_err)?; tokio::io::copy(&mut file, &mut entry).await?; entry.close().await.map_err(zip_to_io_err)?; } Ok(()) } #[get("/")] async fn index() -> impl Responder { let (wrt, body) = body::writer(); // allow response to be started while this is processing #[allow(clippy::let_underscore_future)] let _ = tokio::spawn(async move { let mut zipper = async_zip::write::ZipFileWriter::new(wrt); if let Err(err) = read_dir(&mut zipper).await { tracing::warn!("Failed to write files from directory to zip: {err}") tatic") /// .static_resources_location("./examples/assets") /// .finish() /// ); /// ``` #[cfg(feature = "spa")] pub fn spa() -> Spa { Spa::default() } //! MessagePack responder. use actix_web::{HttpRequest, HttpResponse, Responder}; use bytes::Bytes; use derive_more::{Deref, DerefMut, Display}; use mime::Mime; use once_cell::sync::Lazy; use serde::Serialize; static MSGPACK_MIME: Lazy = Lazy::new(|| "application/msgpack".parse().unwrap()); /// MessagePack responder. /// /// If you require the fields to be named, use [`MessagePackNamed`]. #[derive(Debug, Deref, DerefMut, Display)] pub struct MessagePack(pub T); impl Responder for MessagePack { type Body = Bytes; fn respond_to(self, _req: &HttpRequest) -> HttpResponse { let body = Bytes::from(rmp_serde::to_vec(&self.0).unwrap()); HttpResponse::Ok() .content_type(MSGPACK_MIME.clone()) .message_body(body) .unwrap() } } /// Macing::warn!("client disconnected; could not send SSE message"); break; } sleep(Duration::from_secs(10)).await; } }); sse.with_keep_alive(Duration::from_secs(3)) } #[actix_web::main] async fn main() -> io::Result<()> { env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); tracing::info!("starting HTTP server at http://localhost:8080"); HttpServer::new(|| { App::new() .service(index) .service(countdown) .service(countdown_from) .service(timestamp) .wrap(Logger::default()) }) .workers(2) .bind(("127.0.0.1", 8080))? .run() .await } use std::{ future::{ready, Future, Ready}, marker::PhantomData, pin::Pin, rc::Rc, task::{Context, Poll}, }; use actix_service::{forward_ready, Service, Transform}; use actix_web::{ body::MessageBody, dev::{ServiceRequest, ServiceResponse}, Error, }; use futures_core::read /// Constructs new panic reporter middleware with `callback`. pub fn new(callback: impl Fn(&(dyn Any + Send)) + 'static) -> Self { Self { cb: Rc::new(callback), } } } impl std::fmt::Debug for PanicReporter { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("PanicReporter") .field("cb", &"") .finish() } } impl Transform for PanicReporter where S: Service, S::Future: 'static, { type Response = S::Response; type Error = S::Error; type Transform = PanicReporterMiddleware; type InitError = (); type Future = Ready>; fn new_transform(&self, service: S) -> Self::Future { ready(Ok(PanicReporterMiddleware { service: Rc::new(service), cb: Rc::clone(&self.cb), })) } } pub struct PanicReporterMiddleware { service: Rc, cb: PanicCallback, } immBody::Error(Some(UrlencodedError::Overflow { size: len, limit: LIMIT, })); } } UrlEncodedFormBody::Body { length, payload, buf: web::BytesMut::with_capacity(8192), _res: PhantomData, } } } impl Future for UrlEncodedFormBody { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); match this { UrlEncodedFormBody::Body { buf, payload, .. } => loop { let res = ready!(Pin::new(&mut *payload).poll_next(cx)); match res { Some(chunk) => { let chunk = chunk?; let buf_len = buf.len() + chunk.len(); if buf_len > LIMIT { return Poll::Ready(Err(Urlenclt>; forward_ready!(service); fn call(&self, req: ServiceRequest) -> Self::Future { self.service.call(req) } } #[cfg(test)] mod tests { use actix_web::{ http::header::{self, HeaderValue}, middleware::{Compat, Logger}, test, web, App, HttpResponse, }; use super::*; async fn noop(req: ServiceRequest, next: Next) -> Result, Error> { next.call(req).await } async fn add_res_header( req: ServiceRequest, next: Next, ) -> Result, Error> { let mut res = next.call(req).await?; res.headers_mut() .insert(header::WARNING, HeaderValue::from_static("42")); Ok(res) } async fn mutate_body_type( req: ServiceRequest, next: Next, ) -> Result, Error> { let res = next.call(req).await?; Ok(res.map_into_left_body:::Overflow"), } } } impl ResponseError for BodyLimitError where T: FromRequest + 'static, T::Error: fmt::Debug + fmt::Display, { } #[cfg(test)] mod tests { use actix_web::{http::header, test::TestRequest}; use bytes::Bytes; use super::*; static_assertions::assert_impl_all!(BodyLimitFut<(), 100>: Unpin); static_assertions::assert_impl_all!(BodyLimitFut: Unpin); #[actix_web::test] async fn within_limit() { let (req, mut pl) = TestRequest::default() .insert_header(header::ContentType::plaintext()) .insert_header(( header::CONTENT_LENGTH, header::HeaderValue::from_static("9"), )) .set_payload(Bytes::from_static(b"123456789")) .to_http_parts(); let body = BodyLimit::::from_request(&req, &mut pl).await; assert_eq!( body.ok().unwrap().into_inner(), Bytes::from_static(b"123456789") ); } actix_utils::future::ok; use actix_web::{ body::BoxBody, dev::{fn_service, Service, ServiceRequest, ServiceResponse}, http::StatusCode, Error, HttpResponseBuilder, }; /// Creates service that always responds with given status code and echoes request path as response /// body. pub fn echo_path_service( status_code: StatusCode, ) -> impl Service, Error = Error> { fn_service(move |req: ServiceRequest| { let path = req.path().to_owned(); ok(req.into_response(HttpResponseBuilder::new(status_code).body(path))) }) } use std::{convert::Infallible, error::Error as StdError}; use actix_web::{ body::{BodyStream, MessageBody}, HttpResponse, Responder, }; use bytes::{Bytes, BytesMut}; use futures_core::Stream; use futures_util::TryStreamExt as _; use mime::Mime; use pin_project_lite::pin_project; use serde::Serialize; use crate::util::{InfallibleStream, MutWriter}; pin_project! { /// A buffered CSV serializing hex!("03ac6742 16f3e15c 761ee1a5 e255f067 953623c8 b388b445 9e13f978 d7c846f4").as_ref() ); } #[actix_web::test] async fn use_on_wrong_extractor_in_wrong_order() { let app = test::init_service(App::new().route( "/", web::get().to( |_body: Json, null: BodyHash<(), Sha256>| async move { Bytes::copy_from_slice(null.hash()) }, ), )) .await; let req = test::TestRequest::default().set_json(1234).to_request(); let res = test::call_service(&app, req).await; assert_eq!(res.status(), StatusCode::OK); let body = test::read_body(res).await; // if the hash wrapper is on a non-body extractor _and_ a body extractor has already taken the // payload, this should return the empty input hash assert_eq!( body, hex!("e3b0c442 98fc1c14 9afbf4c8 996fb924 27ae41e4 649b934c a495991b 7852b855").as_ref() ); } //! How to use `NdJson` as an efficient streaming response type. //! //! The same techniquedy type. /// /// # Examples /// ``` /// # use actix_web::{HttpResponse, web}; /// use std::convert::Infallible; /// use actix_web_lab::body; /// /// # async fn index() { /// let (mut body_tx, body) = body::channel::(); /// /// let _ = web::block(move || { /// body_tx.send(web::Bytes::from_static(b"body from another thread")).unwrap(); /// }); /// /// HttpResponse::Ok().body(body) /// # ;} /// ``` pub fn channel>() -> (Sender, impl MessageBody) { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); (Sender::new(tx), Receiver::new(rx)) } /// A channel-like sender for body chunks. #[derive(Debug, Clone)] pub struct Sender { tx: UnboundedSender>, } impl Sender { fn new(tx: UnboundedSender>) -> Self { Self { tx } } /// Submits a chunk of bytes to the response body stream. /// /// # Errors /// Errors if other side of channel body was dropped, returning `chunk`. pub fn send(&mut self, _env(env_logger::Env::new().default_filter_or("info")); info!("staring server at http://localhost:8080"); HttpServer::new(|| { App::new() .wrap(middleware::Logger::default().log_target("@")) .route( "/", web::post().to(|body: TwoBodies| async move { let (string, bytes) = body.into_parts(); // proves that body was extracted twice since the bytes extracted are byte-equal to // the string, without forking the request payload, the bytes parts would be empty assert_eq!(string.as_bytes(), &bytes); // echo string string }), ) }) .workers(1) .bind(("127.0.0.1", 8080))? .run() .await } use std::borrow::Cow; use actix_files::{Files, NamedFile}; use actix_service::fn_service; use actix_web::dev::{HttpServiceFactory, ResourceDef, ServiceRequest, ServiceResponseequest for Query { type Error = Error; type Future = Ready>; #[inline] fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future { serde_html_form::from_str::(req.query_string()) .map(|val| ready(Ok(Query(val)))) .unwrap_or_else(move |e| { let err = QueryPayloadError::Deserialize(e); debug!( "Failed during Query extractor deserialization. \ Request path: {:?}", req.path() ); ready(Err(err.into())) }) } } #[cfg(test)] mod tests { use actix_web::test::TestRequest; use derive_more::Display; use serde::Deserialize; use super::*; #[derive(Deserialize, Debug, Display)] struct Id { id: String, }