use async_trait::async_trait; use pipebase::{ common::{ConfigInto, FromConfig, FromPath, GroupAs, Pair}, map::Map, }; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use std::path::Path; #[derive(Deserialize)] pub struct JsonSerConfig {} #[async_trait] impl FromPath for JsonSerConfig { async fn from_path

(_path: P) -> anyhow::Result where P: AsRef + Send, { Ok(JsonSerConfig {}) } } #[async_trait] impl ConfigInto for JsonSerConfig {} pub struct JsonSer {} #[async_trait] impl FromConfig for JsonSer { async fn from_config(_config: JsonSerConfig) -> anyhow::Result { Ok(JsonSer {}) } } impl JsonSer { fn serialize(t: &T) -> anyhow::Result> { match serde_json::to_vec(t) { Ok(r) => Ok(r), Err(err) => Err(err.into()), } } } #[async_trait] impl Map, JsonSerConfig> for JsonSer where T: Serialize + Send + Sync + 'static, { async fn map(&mut self, t: T) -> anyhow::Result> { JsonSer::serialize(&t) } } #[derive(Deserialize)] pub struct JsonDeserConfig {} #[async_trait] impl FromPath for JsonDeserConfig { async fn from_path

(_path: P) -> anyhow::Result where P: AsRef + Send, { Ok(JsonDeserConfig {}) } } #[async_trait] impl ConfigInto for JsonDeserConfig {} pub struct JsonDeser {} #[async_trait] impl FromConfig for JsonDeser { async fn from_config(_config: JsonDeserConfig) -> anyhow::Result { Ok(JsonDeser {}) } } impl JsonDeser { fn deserialize(bytes: &[u8]) -> anyhow::Result { let t: T = serde_json::from_slice::(bytes)?; Ok(t) } } #[async_trait] impl Map, T, JsonDeserConfig> for JsonDeser where T: DeserializeOwned + Sync, { async fn map(&mut self, bytes: Vec) -> anyhow::Result { JsonDeser::deserialize(bytes.as_slice()) } } #[derive(Deserialize)] pub struct JsonRecordSerConfig {} #[async_trait] impl FromPath for JsonRecordSerConfig { async fn from_path

(_path: P) -> anyhow::Result where P: AsRef + Send, { Ok(JsonRecordSerConfig {}) } } impl ConfigInto for JsonRecordSerConfig {} #[async_trait] impl FromConfig for JsonRecordSer { async fn from_config(_config: JsonRecordSerConfig) -> anyhow::Result { Ok(JsonRecordSer {}) } } pub struct JsonRecordSer {} impl JsonRecordSer { fn serialize(record: &R) -> anyhow::Result>> where R: GroupAs + Serialize, { let bytes = serde_json::to_vec(record)?; let key = record.group(); Ok(Pair::new(key, bytes)) } } #[async_trait] impl Map>, JsonRecordSerConfig> for JsonRecordSer where R: GroupAs + Serialize + Send + 'static, { async fn map(&mut self, data: R) -> anyhow::Result>> { Self::serialize(&data) } }