use async_trait::async_trait;
use pipebase::{
common::{ConfigInto, FromConfig, FromPath, GroupAs, Pair},
map::Map,
};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::path::Path;
#[derive(Deserialize)]
pub struct JsonSerConfig {}
#[async_trait]
impl FromPath for JsonSerConfig {
async fn from_path
(_path: P) -> anyhow::Result
where
P: AsRef + Send,
{
Ok(JsonSerConfig {})
}
}
#[async_trait]
impl ConfigInto for JsonSerConfig {}
pub struct JsonSer {}
#[async_trait]
impl FromConfig for JsonSer {
async fn from_config(_config: JsonSerConfig) -> anyhow::Result {
Ok(JsonSer {})
}
}
impl JsonSer {
fn serialize(t: &T) -> anyhow::Result> {
match serde_json::to_vec(t) {
Ok(r) => Ok(r),
Err(err) => Err(err.into()),
}
}
}
#[async_trait]
impl Map, JsonSerConfig> for JsonSer
where
T: Serialize + Send + Sync + 'static,
{
async fn map(&mut self, t: T) -> anyhow::Result> {
JsonSer::serialize(&t)
}
}
#[derive(Deserialize)]
pub struct JsonDeserConfig {}
#[async_trait]
impl FromPath for JsonDeserConfig {
async fn from_path(_path: P) -> anyhow::Result
where
P: AsRef + Send,
{
Ok(JsonDeserConfig {})
}
}
#[async_trait]
impl ConfigInto for JsonDeserConfig {}
pub struct JsonDeser {}
#[async_trait]
impl FromConfig for JsonDeser {
async fn from_config(_config: JsonDeserConfig) -> anyhow::Result {
Ok(JsonDeser {})
}
}
impl JsonDeser {
fn deserialize(bytes: &[u8]) -> anyhow::Result {
let t: T = serde_json::from_slice::(bytes)?;
Ok(t)
}
}
#[async_trait]
impl Map, T, JsonDeserConfig> for JsonDeser
where
T: DeserializeOwned + Sync,
{
async fn map(&mut self, bytes: Vec) -> anyhow::Result {
JsonDeser::deserialize(bytes.as_slice())
}
}
#[derive(Deserialize)]
pub struct JsonRecordSerConfig {}
#[async_trait]
impl FromPath for JsonRecordSerConfig {
async fn from_path(_path: P) -> anyhow::Result
where
P: AsRef + Send,
{
Ok(JsonRecordSerConfig {})
}
}
impl ConfigInto for JsonRecordSerConfig {}
#[async_trait]
impl FromConfig for JsonRecordSer {
async fn from_config(_config: JsonRecordSerConfig) -> anyhow::Result {
Ok(JsonRecordSer {})
}
}
pub struct JsonRecordSer {}
impl JsonRecordSer {
fn serialize(record: &R) -> anyhow::Result>>
where
R: GroupAs + Serialize,
{
let bytes = serde_json::to_vec(record)?;
let key = record.group();
Ok(Pair::new(key, bytes))
}
}
#[async_trait]
impl Map>, JsonRecordSerConfig> for JsonRecordSer
where
R: GroupAs + Serialize + Send + 'static,
{
async fn map(&mut self, data: R) -> anyhow::Result>> {
Self::serialize(&data)
}
}