use std::time::Duration; use anyhow::Context; use http::{ header::{ACCEPT_ENCODING, CONTENT_ENCODING}, HeaderMap, StatusCode, }; use once_cell::sync::OnceCell; use reqwest::header::{HeaderValue, CONTENT_TYPE}; use crate::{codec::MessageCodec, metadata::Metadata, status::Code}; // Connect-Protocol-Version → "connect-protocol-version" "1" const CONNECT_PROTOCOL_VERSION: &str = "connect-protocol-version"; const CONNECT_PROTOCOL_VERSION_VALUE: HeaderValue = HeaderValue::from_static("1"); const CONNECT_ACCEPT_ENCODING: &str = "connect-accept-encoding"; const CONNECT_CONTENT_ENCODING: &str = "connect-content-encoding"; const CONNECT_TIMEOUT_MS: &str = "connect-timeout-ms"; /// Represents configuration for a Connect request #[derive(Clone, Debug, Default)] pub struct RequestOpts { /// Metadata to be included with this request pub metadata: Metadata, /// Server-side timeout for this request /// /// A duration of zero (the default) is interpreted as infinite timeout. The /// spec permits timeouts up to 9999999999 ms (~115 days). pub timeout: Duration, /// Content encoding for request messages /// /// Note that this differs from regular HTTP content-encoding for streaming /// requests. See: https://connectrpc.com/docs/protocol/#streaming-request pub content_encoding: Option, /// Acceptable content encodings for response messages /// /// Note that this differs from regular HTTP accept-encoding for streaming /// responses. See: https://connectrpc.com/docs/protocol/#streaming-response pub accept_encoding: Vec, } /// Returns an [`http::Request`] representing a Connect unary request. /// /// See: https://connectrpc.com/docs/protocol#unary-request pub fn unary_request( uri: &str, codec: MessageCodec, body: T, opts: RequestOpts, ) -> http::Result> { common_request(uri, codec, body, opts, false) } /// Returns an [`http::Request`] representing a Connect streaming request. /// /// See: https://connectrpc.com/docs/protocol#streaming-request pub fn streaming_request( uri: &str, codec: MessageCodec, body: T, opts: RequestOpts, ) -> http::Result> { common_request(uri, codec, body, opts, true) } fn common_request( uri: &str, codec: MessageCodec, body: T, opts: RequestOpts, streaming: bool, ) -> http::Result> { let mut req = http::Request::post(uri) .header(CONTENT_TYPE, codec.content_type(streaming)?) .header(CONNECT_PROTOCOL_VERSION, CONNECT_PROTOCOL_VERSION_VALUE) .body(body)?; let headers = req.headers_mut(); headers.extend(opts.metadata); if let Some(encoding) = opts.content_encoding { if streaming { headers.insert(CONNECT_CONTENT_ENCODING, encoding); } else { headers.insert(CONTENT_ENCODING, encoding); } } for encoding in opts.accept_encoding { if streaming { headers.insert(CONNECT_ACCEPT_ENCODING, encoding); } else { headers.insert(ACCEPT_ENCODING, encoding); } } if !opts.timeout.is_zero() { headers.insert( CONNECT_TIMEOUT_MS, opts.timeout.as_millis().to_string().try_into()?, ); } Ok(req) } // pub struct RequestInfo<'a> { // path_and_query: Cow<'a, PathAndQuery>, // headers: &'a HeaderMap, // } // impl<'a> RequestInfo<'a> { // pub fn new(request: &'a http::Request) -> Result> { // Self { // path_and_query: request.uri().path_and_query().ok_or("empty path")?, // } // } // pub fn path_and_query(&self) -> &PathAndQuery { // &self.path_and_query // } // pub fn metadata(&self) -> impl Iterator { // self.headers.iter().filter_map(|(key, val)| { // let key = key.as_str(); // if key.starts_with("connect-") { // return None; // } // Some((key, val.to_str().ok()?)) // }) // } // } pub struct ResponseInfo { metadata: Metadata, content_type: Option, codec_streaming: OnceCell<(MessageCodec, bool)>, } impl ResponseInfo { pub fn new(mut headers: HeaderMap) -> Self { let content_type = headers.remove(CONTENT_TYPE); Self { metadata: Metadata::from_headers(headers), content_type, codec_streaming: Default::default(), } } pub fn metadata(&self) -> &Metadata { &self.metadata } pub fn into_metadata(self) -> Metadata { self.metadata } pub fn codec(&self) -> anyhow::Result<&MessageCodec> { let (codec, _) = self .codec_streaming .get_or_try_init(|| self.codec_streaming())?; Ok(codec) } pub fn is_streaming(&self) -> bool { match self .codec_streaming .get_or_try_init(|| self.codec_streaming()) { Ok((_, streaming)) => *streaming, Err(_) => false, } } fn codec_streaming(&self) -> anyhow::Result<(MessageCodec, bool)> { let content_type = self .content_type .as_ref() .context("response missing content-type")?; MessageCodec::from_content_type(content_type) .with_context(|| format!("invalid codec content-type {content_type:?}")) } }