use std::future::Future; use anyhow::ensure; use bytes::Bytes; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use crate::codec::MessageCodec; pub trait IntoBody { type Body; type Error; fn into_body(self, codec: &MessageCodec) -> Result; } pub trait FromBody: Sized { type Error; fn from_body(body: T, codec: &MessageCodec) -> impl Future>; } pub struct JsonBody(pub T); impl<'a, T: Serialize> IntoBody for JsonBody<&'a T> { type Body = String; type Error = anyhow::Error; fn into_body(self, codec: &MessageCodec) -> Result { ensure!( codec == &MessageCodec::Json, "JsonBody doesn't support the {codec} codec" ); let json = serde_json::to_string(&self.0)?; Ok(json) } } impl<'a, T: Deserialize<'a>> FromBody<&'a [u8]> for JsonBody { type Error = anyhow::Error; async fn from_body(body: &'a [u8], codec: &MessageCodec) -> Result { ensure!( codec == &MessageCodec::Json, "JsonBody doesn't support the {codec} codec" ); let msg: T = serde_json::from_slice(body)?; Ok(Self(msg)) } } impl FromBody for JsonBody { type Error = anyhow::Error; async fn from_body(body: Bytes, codec: &MessageCodec) -> Result { ensure!( codec == &MessageCodec::Json, "JsonBody doesn't support the {codec} codec" ); let msg: T = serde_json::from_slice(&body).inspect_err(|err| { let bod = String::from_utf8_lossy(&body); eprintln!("decoding {} {bod:?}: {err:?}", std::any::type_name::()); })?; Ok(Self(msg)) } } #[cfg(feature = "proto")] mod message { pub struct MessageBody(pub T); use anyhow::bail; use super::*; impl<'a, T: prost::Message + serde::Serialize> IntoBody for MessageBody<&'a T> { type Body = Vec; type Error = anyhow::Error; fn into_body(self, codec: &MessageCodec) -> Result { Ok(match codec { MessageCodec::Json => serde_json::to_vec(&self.0)?, MessageCodec::Proto => self.0.encode_to_vec(), other => bail!("MessageBody doesn't support the {other} codec"), }) } } impl> FromBody for MessageBody { type Error = anyhow::Error; async fn from_body(body: U, codec: &MessageCodec) -> Result { Ok(Self(match codec { MessageCodec::Json => serde_json::from_slice(body.as_ref())?, MessageCodec::Proto => T::decode(body.as_ref())?, other => bail!("MessageBody doesn't support the {other} codec"), })) } } } #[cfg(feature = "proto")] pub use message::MessageBody;