use crate::executor; use core::fmt; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::{cmp, io, mem}; use bytes::BytesMut; use http_body_util::{BodyExt, Full}; use hyper::body::Frame; pub use hyper::body::Bytes; /// An HTTP request. /// /// See [`http::Request`](hyper::Request) and [`Body`] for details. pub type Request = hyper::Request; /// An HTTP response. /// /// You can create a response with the [`new`](hyper::Response::new) method: /// /// ``` /// # use astra::{Response, Body}; /// let response = Response::new(Body::new("Hello world!")); /// ``` /// /// Or with a [`ResponseBuilder`]: /// /// ``` /// # use astra::{ResponseBuilder, Body}; /// let response = ResponseBuilder::new() /// .status(404) /// .header("X-Custom-Foo", "Bar") /// .body(Body::new("Page not found.")) /// .unwrap(); /// ``` /// /// See [`http::Response`](hyper::Response) and [`Body`] for details. pub type Response = hyper::Response; /// A builder for an HTTP response. /// /// ``` /// use astra::{ResponseBuilder, Body}; /// /// let response = ResponseBuilder::new() /// .status(404) /// .header("X-Custom-Foo", "Bar") /// .body(Body::new("Page not found.")) /// .unwrap(); /// ``` /// /// See [`http::Response`](hyper::Response) and [`Body`] for details. pub type ResponseBuilder = hyper::http::response::Builder; /// The streaming body of an HTTP request or response. /// /// Data is streamed by iterating over the body, which /// yields chunks as [`Bytes`](hyper::body::Bytes). /// /// ```rust /// use astra::{Request, Response, Body}; /// /// fn handle(mut req: Request) -> Response { /// for chunk in req.body_mut() { /// println!("body chunk {:?}", chunk); /// } /// /// Response::new(Body::new("Hello World!")) /// } /// ``` pub struct Body(pub(crate) BoxBody); type BoxBody = http_body_util::combinators::UnsyncBoxBody; impl Body { /// Create a body from a string or bytes. /// /// ```rust /// # use astra::Body; /// let string = Body::new("Hello world!"); /// let bytes = Body::new(vec![0, 1, 0, 1, 0]); /// ``` pub fn new(data: impl Into) -> Body { Body(BoxBody::new( Full::new(data.into()).map_err(|err| match err {}), )) } /// Create an empty body. pub fn empty() -> Body { Body(BoxBody::default()) } /// Create a body from an implementor of [`io::Read`]. /// /// ```rust /// use astra::{Request, Response, ResponseBuilder, Body}; /// use std::fs::File; /// /// fn handle(_request: Request) -> Response { /// let file = File::open("index.html").unwrap(); /// /// ResponseBuilder::new() /// .header("Content-Type", "text/html") /// .body(Body::wrap_reader(file)) /// .unwrap() /// } /// ``` pub fn wrap_reader(reader: R) -> Body where R: io::Read + Send + 'static, { Body(BoxBody::new(ReaderBody::new(reader))) } /// Create a [`BodyReader`] that implements [`std::io::Read`]. pub fn reader(&mut self) -> BodyReader<'_> { BodyReader { body: self, prev_bytes: Bytes::new(), } } } impl From for Body where Bytes: From, { fn from(data: T) -> Body { Body::new(data) } } impl Iterator for Body { type Item = io::Result; fn next(&mut self) -> Option { struct FrameFuture<'body>(Pin<&'body mut BoxBody>); impl Future for FrameFuture<'_> { type Output = Option, io::Error>>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { hyper::body::Body::poll_frame(self.0.as_mut(), cx) } } loop { let result = executor::Parker::new().block_on(FrameFuture(Pin::new(&mut self.0)))?; return match result { Ok(frame) => match frame.into_data() { Ok(bytes) => Some(Ok(bytes)), Err(_) => continue, }, Err(err) => Some(Err(err)), }; } } fn size_hint(&self) -> (usize, Option) { let size_hint = hyper::body::Body::size_hint(&self.0); ( size_hint.lower() as _, size_hint.upper().map(|size| size as _), ) } } impl fmt::Debug for Body { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.0.fmt(f) } } impl Default for Body { fn default() -> Self { Self::empty() } } impl hyper::body::Body for Body { type Data = Bytes; type Error = io::Error; fn poll_frame( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, Self::Error>>> { Pin::new(&mut self.0).poll_frame(cx) } } /// Implements [`std::io::Read`] for [`Body`]. pub struct BodyReader<'body> { body: &'body mut Body, prev_bytes: Bytes, } impl std::io::Read for BodyReader<'_> { fn read(&mut self, mut buf: &mut [u8]) -> io::Result { let mut written = 0; loop { if buf.is_empty() { return Ok(written); } if !self.prev_bytes.is_empty() { let chunk_size = cmp::min(buf.len(), self.prev_bytes.len()); let prev_bytes_start = self.prev_bytes.split_to(chunk_size); buf[..chunk_size].copy_from_slice(&prev_bytes_start[..]); buf = &mut buf[chunk_size..]; written += chunk_size; continue; } if written != 0 { // Pulling from the iterator can block and we have something to return // already, so return it. return Ok(written); } debug_assert!(self.prev_bytes.is_empty()); debug_assert!(written == 0); self.prev_bytes = if let Some(next) = self.body.next() { next? } else { return Ok(written); } } } } /// Implements `hyper::Body` for an implementor of `io::Read`. struct ReaderBody { reader: Option, buf: BytesMut, } impl ReaderBody { /// Create a new `ReaderBody` from an `io::Read`. fn new(reader: R) -> Self { Self { reader: Some(reader), buf: BytesMut::zeroed(CHUNK), } } } /// The size of the read buffer. const CHUNK: usize = 4096; impl Unpin for ReaderBody {} impl hyper::body::Body for ReaderBody where R: io::Read, { type Data = Bytes; type Error = io::Error; fn poll_frame( mut self: Pin<&mut Self>, _: &mut Context<'_>, ) -> Poll, Self::Error>>> { let ReaderBody { reader, buf } = &mut *self; let reader = match reader { Some(reader) => reader, None => return Poll::Ready(None), }; if buf.capacity() == 0 { buf.extend_from_slice(&[0; CHUNK]); } match reader.read(buf) { Err(err) => Poll::Ready(Some(Err(err))), Ok(0) => { self.reader.take(); Poll::Ready(None) } Ok(n) => { let remaining = buf.split_off(n); let chunk = mem::replace(buf, remaining); Poll::Ready(Some(Ok(Frame::data(Bytes::from(chunk))))) } } } }