pub mod json; pub mod proto; use anyhow::Context; use bytes::{BufMut, Bytes, BytesMut}; use http::{header::InvalidHeaderValue, HeaderValue}; pub use bytes::Buf; use crate::error::Error; const APPLICATION_PREFIX: &str = "application/"; const APPLICATION_JSON: &str = "application/json"; const APPLICATION_PROTO: &str = "application/proto"; const APPLICATION_CONNECT_PREFIX: &str = "application/connect+"; const APPLICATION_CONNECT_JSON: &str = "application/connect+json"; const APPLICATION_CONNECT_PROTO: &str = "application/connect+proto"; /// Connect message codec /// /// See: https://connectrpc.com/docs/protocol/#unary-request #[derive(Clone, Debug, Default, PartialEq)] pub enum MessageCodec { #[default] Json, Proto, Other(OtherCodec), } #[derive(Clone, Debug, PartialEq)] pub struct OtherCodec(String); impl MessageCodec { pub fn new(codec: impl AsRef) -> Result { let codec = codec.as_ref(); if let Some(codec) = Self::common(codec) { return Ok(codec); } let codec = codec.to_ascii_lowercase(); if let Some(codec) = Self::common(&codec) { return Ok(codec); } // Check that the codec is a valid header value HeaderValue::from_str(&codec)?; Ok(Self::Other(OtherCodec(codec))) } pub(crate) fn common(name: &str) -> Option { match name { "json" => Some(Self::Json), "proto" => Some(Self::Proto), _ => None, } } pub(crate) fn from_content_type(content_type: &HeaderValue) -> Result<(Self, bool), Error> { let content_type = content_type .to_str() .context("invalid content-type") .map_err(Error::InvalidMessageCodec)?; let streaming = content_type.starts_with(APPLICATION_CONNECT_PREFIX); let codec_str = if streaming { content_type.strip_prefix(APPLICATION_CONNECT_PREFIX) } else { content_type.strip_prefix(APPLICATION_PREFIX) } .context("invalid content-type prefix") .map_err(Error::InvalidMessageCodec)?; let codec = Self::common(codec_str) .unwrap_or_else(|| Self::Other(OtherCodec(codec_str.to_string()))); Ok((codec, streaming)) } /// Returns the content-type header value for this codec. /// /// Unary-Content-Type → "content-type" "application/" Message-Codec /// Streaming-Content-Type → "content-type" "application/connect+" ("proto" / "json" / {custom}) pub fn content_type(&self, streaming: bool) -> HeaderValue { match (self, streaming) { (MessageCodec::Json, false) => HeaderValue::from_static(APPLICATION_JSON), (MessageCodec::Json, true) => HeaderValue::from_static(APPLICATION_CONNECT_JSON), (MessageCodec::Proto, false) => HeaderValue::from_static(APPLICATION_PROTO), (MessageCodec::Proto, true) => HeaderValue::from_static(APPLICATION_CONNECT_PROTO), (MessageCodec::Other(OtherCodec(codec)), streaming) => { let prefix = if streaming { APPLICATION_PREFIX } else { APPLICATION_CONNECT_PREFIX }; HeaderValue::from_str(&format!("{prefix}{codec}")).unwrap() } } } } impl std::fmt::Display for MessageCodec { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let codec = match self { MessageCodec::Json => "json", MessageCodec::Proto => "proto", MessageCodec::Other(OtherCodec(codec)) => codec, }; write!(f, "{codec}") } } // Adapted from `tonic::codec` pub trait ConnectCodec { type Encode; type Decode; type Encoder: ConnectEncoder + Send; type Decoder: ConnectDecoder + Send; fn message_codec(&self) -> MessageCodec; fn encoder(&mut self) -> Self::Encoder; fn decoder(&mut self) -> Self::Decoder; } pub trait ConnectEncoder { fn encode(&mut self, message: T, buf: &mut EncodeBuf) -> Result<(), Error>; fn size_hint(&mut self, message: &T) -> usize { let _ = message; 0 } fn encode_to_bytes(&mut self, message: T) -> Result { let mut buf = EncodeBuf::default(); self.encode(message, &mut buf)?; Ok(buf.0.freeze()) } } pub trait ConnectDecoder { /// Decodes a message from the given buffer. fn decode(&mut self, buf: impl Buf) -> Result; } #[derive(Clone, Debug, Default)] pub struct EncodeBuf(BytesMut); impl EncodeBuf { pub fn as_buf_mut(&mut self) -> &mut impl BufMut { &mut self.0 } pub fn writer(&mut self) -> EncodeWriter { EncodeWriter::new(&mut self.0) } pub(crate) fn into_inner(self) -> BytesMut { self.0 } } impl From for EncodeBuf { fn from(buf: BytesMut) -> Self { Self(buf) } } struct EncodeWriter<'a> { buf: &'a mut BytesMut, writer: Option>, } impl<'a> EncodeWriter<'a> { fn new(buf: &'a mut BytesMut) -> Self { let writer = std::mem::take(buf).writer(); Self { buf, writer: Some(writer), } } } impl std::io::Write for EncodeWriter<'_> { fn write(&mut self, buf: &[u8]) -> std::io::Result { self.writer.as_mut().unwrap().write(buf) } fn flush(&mut self) -> std::io::Result<()> { self.writer.as_mut().unwrap().flush() } } impl Drop for EncodeWriter<'_> { fn drop(&mut self) { let writer = self.writer.take().unwrap(); *self.buf = writer.into_inner(); } }