pub mod timeout; use anyhow::{ensure, Context}; use http::{ header::{self, Entry}, Extensions, HeaderMap, HeaderName, HeaderValue, }; use timeout::ConnectTimeout; use tonic::metadata::MetadataMap; use crate::{codec::MessageCodec, error::Error, metadata::HeaderMapIteratorExt}; pub(crate) const CONTENT_ENCODING_HEADERS: [HeaderName; 2] = [ header::CONTENT_ENCODING, HeaderName::from_static("connect-content-encoding"), ]; const ACCEPT_ENCODING_HEADERS: [HeaderName; 2] = [ header::ACCEPT_ENCODING, HeaderName::from_static("connect-accept-encoding"), ]; const CONNECT_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("connect-timeout-ms"); const GRPC_TIMEOUT_KEY: &str = "grpc-timeout"; const GRPC_PREFIX: &str = "grpc-"; pub struct RequestEnvelope { pub metadata: MetadataMap, pub is_streaming: bool, pub content_type: Option, pub content_encoding: Option, pub accept_encoding: Vec, pub timeout: Option, pub extensions: Extensions, } impl RequestEnvelope { pub fn from_connect_request(req: http::Request) -> Result<(RequestEnvelope, T), Error> { let (mut parts, body) = req.into_parts(); parts.extensions.insert(parts.version); Ok(( Self::from_connect_request_parts(parts.headers, parts.extensions)?, body, )) } pub fn from_connect_request_parts( mut headers: HeaderMap, mut extensions: Extensions, ) -> Result { let content_type = headers .remove(header::CONTENT_TYPE) .context("missing content-type") .map_err(Error::InvalidRequest)?; let (_, is_streaming) = MessageCodec::from_content_type(&content_type)?; let content_encoding = maybe_streaming_header(&mut headers, &CONTENT_ENCODING_HEADERS, is_streaming) .map_err(Error::InvalidRequest)? .map(|entry| entry.remove()); let accept_encoding = maybe_streaming_header(&mut headers, &ACCEPT_ENCODING_HEADERS, is_streaming) .map_err(Error::InvalidRequest)? .map(|entry| entry.remove_entry_mult().1.collect()) .unwrap_or_default(); let headers_info = extensions.get_or_insert_default(); let metadata = crate::metadata::from_headers(headers, Some(headers_info)); let timeout = headers_info .reserved .get(CONNECT_TIMEOUT_HEADER) .map(ConnectTimeout::from_connect_timeout) .transpose()?; Ok(Self { metadata, is_streaming, content_type: Some(content_type), timeout, content_encoding, accept_encoding, extensions, }) } pub fn from_tonic_request( req: tonic::Request, is_streaming: bool, ) -> Result<(Self, T), Error> { let (metadata, extensions, message) = req.into_parts(); Ok(( Self::from_tonic_request_parts(metadata, extensions, is_streaming)?, message, )) } pub fn from_tonic_request_parts( metadata: MetadataMap, mut extensions: Extensions, is_streaming: bool, ) -> Result { let mut headers = metadata.into_headers(); let content_type = headers.remove(header::CONTENT_TYPE); let content_encoding = headers.remove(header::CONTENT_ENCODING); let accept_encoding = match headers.entry(header::ACCEPT_ENCODING) { Entry::Occupied(entry) => entry.remove_entry_mult().1.collect(), Entry::Vacant(_) => Default::default(), }; // Remove other reserved gRPC headers (grpc-*) let headers_info = extensions.get_or_insert_default(); let (reserved_headers, headers) = headers .into_iter() .partition_by_name(|name| name.as_str().starts_with(GRPC_PREFIX)); let timeout = reserved_headers .get(GRPC_TIMEOUT_KEY) .map(ConnectTimeout::from_grpc_timeout) .transpose()?; let metadata = crate::metadata::from_headers(headers, Some(headers_info)); Ok(Self { metadata, is_streaming, content_type, content_encoding, accept_encoding, timeout, extensions, }) } } pub(crate) fn maybe_streaming_header<'a>( headers: &'a mut HeaderMap, [unary_name, streaming_name]: &[HeaderName; 2], is_streaming: bool, ) -> anyhow::Result>> { let entry = if is_streaming { ensure!( !headers.contains_key(unary_name), "streaming request has unary header {unary_name:?}" ); headers.entry(streaming_name) } else { ensure!( !headers.contains_key(streaming_name), "unary request has streaming header {streaming_name:?}" ); headers.entry(unary_name) }; match entry { Entry::Occupied(entry) => Ok(Some(entry)), Entry::Vacant(_) => Ok(None), } }