#![feature(prelude_import)] #[prelude_import] use std::prelude::rust_2021::*; #[macro_use] extern crate std; use std::hash::Hash; use std::sync::Arc; use dashmap::DashMap; use serde::de::DeserializeOwned; use serde::Serialize; use canary::providers::ServiceAddr; use canary::routes::{Route, GLOBAL_ROUTE}; use canary::{Addr, Channel, Result}; use srpc::IntoClient; const ID: &'static str = "sail"; pub struct InnerSailDB { map: DashMap, } impl ::srpc::canary::routes::RegisterEndpoint for InnerSailDB { const ENDPOINT: &'static str = "inner_sail_db"; } pub struct InnerSailDBPeer( pub ::srpc::canary::Channel, ::core::marker::PhantomData<(K, V)>, ); impl From<::srpc::canary::Channel> for InnerSailDBPeer { fn from(c: ::srpc::canary::Channel) -> Self { InnerSailDBPeer(c, ::core::marker::PhantomData::default()) } } impl ::srpc::Peer for InnerSailDB { type Struct = InnerSailDBPeer; } impl InnerSailDB { pub fn new() -> Self { InnerSailDB { map: DashMap::new(), } } } const _: () = { impl< K: Hash + Eq + ::srpc::__private::Serialize + ::srpc::__private::DeserializeOwned, V: ::srpc::__private::Serialize + ::srpc::__private::DeserializeOwned, > InnerSailDB { pub async fn get(&self, mut chan: Channel) -> Result { let key: K = chan.receive().await?; let val = match self.map.get(&key) { Some(val) => { let val = val.value(); Some(val) } None => None, }; chan.send(val).await?; Ok(chan) } pub async fn insert(&self, mut chan: Channel) -> Result { let (key, value) = chan.receive().await?; let value = self.map.insert(key, value); chan.send(value).await?; Ok(chan) } pub async fn remove(&self, mut chan: Channel) -> Result { let key: K = chan.receive().await?; let value = self.map.remove(&key).and_then(|s| Some(s.1)); chan.send(value).await?; Ok(chan) } } #[allow(non_camel_case_types)] #[repr(u8)] enum __srpc_action { get, insert, remove, } impl serde::Serialize for __srpc_action { #[allow(clippy::use_self)] fn serialize(&self, serializer: S) -> core::result::Result where S: serde::Serializer, { let value: u8 = match *self { __srpc_action::get => __srpc_action::get as u8, __srpc_action::insert => __srpc_action::insert as u8, __srpc_action::remove => __srpc_action::remove as u8, }; serde::Serialize::serialize(&value, serializer) } } impl<'de> serde::Deserialize<'de> for __srpc_action { #[allow(clippy::use_self)] fn deserialize(deserializer: D) -> core::result::Result where D: serde::Deserializer<'de>, { struct discriminant; #[allow(non_upper_case_globals)] impl discriminant { const get: u8 = __srpc_action::get as u8; const insert: u8 = __srpc_action::insert as u8; const remove: u8 = __srpc_action::remove as u8; } match ::deserialize(deserializer)? { discriminant::get => core::result::Result::Ok(__srpc_action::get), discriminant::insert => core::result::Result::Ok(__srpc_action::insert), discriminant::remove => core::result::Result::Ok(__srpc_action::remove), other => core::result::Result::Err(serde::de::Error::custom( ::core::fmt::Arguments::new_v1( &["invalid value: ", ", expected one of: ", ", ", ", "], &match ( &other, &discriminant::get, &discriminant::insert, &discriminant::remove, ) { _args => [ ::core::fmt::ArgumentV1::new(_args.0, ::core::fmt::Display::fmt), ::core::fmt::ArgumentV1::new(_args.1, ::core::fmt::Display::fmt), ::core::fmt::ArgumentV1::new(_args.2, ::core::fmt::Display::fmt), ::core::fmt::ArgumentV1::new(_args.3, ::core::fmt::Display::fmt), ], }, ), )), } } } impl< K: Hash + Eq + Send + Sync + 'static + ::srpc::__private::Serialize + ::srpc::__private::DeserializeOwned, V: Send + Sync + 'static + ::srpc::__private::Serialize + ::srpc::__private::DeserializeOwned, > ::srpc::canary::service::Service for InnerSailDB { const ENDPOINT: &'static str = "inner_sail_db"; type Pipeline = (); type Meta = ::std::sync::Arc>; fn service( __srpc_inner_meta: ::std::sync::Arc>, ) -> Box { ::canary::service::run_metadata( __srpc_inner_meta, |__srpc_inner_meta: ::std::sync::Arc>, mut __srpc_inner_channel: ::srpc::canary::Channel| async move { loop { match __srpc_inner_channel.receive::<__srpc_action>().await? { __srpc_action::get => { match __srpc_inner_meta.get(__srpc_inner_channel).await { Ok(chan) => __srpc_inner_channel = chan, Err(e) => return Err(e), } } __srpc_action::insert => { match __srpc_inner_meta.insert(__srpc_inner_channel).await { Ok(chan) => __srpc_inner_channel = chan, Err(e) => return Err(e), } } __srpc_action::remove => { match __srpc_inner_meta.remove(__srpc_inner_channel).await { Ok(chan) => __srpc_inner_channel = chan, Err(e) => return Err(e), } } } } }, ) } } impl< K: Hash + Eq + Send + Sync + 'static + ::srpc::__private::Serialize + ::srpc::__private::DeserializeOwned, V: Send + Sync + 'static + ::srpc::__private::Serialize + ::srpc::__private::DeserializeOwned, > ::srpc::canary::service::StaticService for InnerSailDB { type Meta = ::std::sync::Arc>; type Chan = ::srpc::canary::Channel; fn introduce( __srpc_inner_meta: ::std::sync::Arc>, mut __srpc_inner_channel: ::srpc::canary::Channel, ) -> ::srpc::canary::runtime::JoinHandle<::srpc::canary::Result<()>> { ::srpc::canary::runtime::spawn(async move { loop { match __srpc_inner_channel.receive::<__srpc_action>().await? { __srpc_action::get => { match __srpc_inner_meta.get(__srpc_inner_channel).await { Ok(chan) => __srpc_inner_channel = chan, Err(e) => return Err(e), } } __srpc_action::insert => { match __srpc_inner_meta.insert(__srpc_inner_channel).await { Ok(chan) => __srpc_inner_channel = chan, Err(e) => return Err(e), } } __srpc_action::remove => { match __srpc_inner_meta.remove(__srpc_inner_channel).await { Ok(chan) => __srpc_inner_channel = chan, Err(e) => return Err(e), } } } } }) } } impl< K: Hash + Eq + Send + Sync + 'static + ::srpc::__private::Serialize + ::srpc::__private::DeserializeOwned, V: Send + Sync + 'static + ::srpc::__private::Serialize + ::srpc::__private::DeserializeOwned, > InnerSailDBPeer { pub async fn get(mut self) -> ::srpc::canary::Result<::srpc::canary::Channel> { self.0.send(__srpc_action::get).await?; Ok(self.0) } pub async fn insert(mut self) -> ::srpc::canary::Result<::srpc::canary::Channel> { self.0.send(__srpc_action::insert).await?; Ok(self.0) } pub async fn remove(mut self) -> ::srpc::canary::Result<::srpc::canary::Channel> { self.0.send(__srpc_action::remove).await?; Ok(self.0) } } }; pub struct Sail { db: Option>, } impl Sail where K: Hash + Eq + Serialize + DeserializeOwned + Sync + Send + 'static, V: Serialize + DeserializeOwned + Sync + Send + 'static, { pub fn bind() -> Result<()> where K: Send + Sync + 'static, V: Send + Sync + 'static, { Self::bind_at(ID) } /// binds in global route pub fn bind_at(at: &str) -> Result<()> where K: Send + Sync + 'static, V: Send + Sync + 'static, { Self::bind_in(&GLOBAL_ROUTE, at) } pub fn bind_in(route: &Route, at: &str) -> Result<()> where K: Send + Sync + 'static, V: Send + Sync + 'static, { route.add_service_at::>(at, Arc::new(InnerSailDB::new())) } pub async fn new(addr: Addr) -> Result { let addr = addr.service(ID); Self::new_at(addr).await } pub async fn new_at(addr: ServiceAddr) -> Result { let db = addr.connect().await?.client::>(); Ok(Sail { db: Some(db) }) } pub async fn get(&mut self, k: &K) -> Result> { let db = self.db.take().unwrap(); let mut chan: Channel = db.get().await?; chan.send(k).await?; let val: Option = chan.receive().await?; self.db = Some(chan.client::>()); Ok(val) } pub async fn insert(&mut self, k: &K, v: &V) -> Result> { let db = self.db.take().unwrap(); let mut chan = db.insert().await?; chan.send((k, v)).await?; let val = chan.receive().await?; self.db = Some(chan.client::>()); Ok(val) } pub async fn remove(&mut self, k: &K) -> Result> { let db = self.db.take().unwrap(); let mut chan = db.remove().await?; chan.send(k).await?; let val = chan.receive().await?; self.db = Some(chan.client::>()); Ok(val) } } struct DistributedList { list: Vec, } #[automatically_derived] #[allow(unused_qualifications)] impl ::core::default::Default for DistributedList { #[inline] fn default() -> DistributedList { DistributedList { list: ::core::default::Default::default(), } } } impl ::srpc::canary::routes::RegisterEndpoint for DistributedList { const ENDPOINT: &'static str = "distributed_list"; } struct DistributedListPeer(pub ::srpc::canary::Channel, ::core::marker::PhantomData<(T)>); impl From<::srpc::canary::Channel> for DistributedListPeer { fn from(c: ::srpc::canary::Channel) -> Self { DistributedListPeer(c, ::core::marker::PhantomData::default()) } } impl ::srpc::Peer for DistributedList { type Struct = DistributedListPeer; } const _: () = { impl DistributedList { async fn push(&mut self, value: T) { self.list.push(value); } async fn get(&self, index: usize) -> Option { self.list.get(index).and_then(|val| Some(val.clone())) } async fn remove(&mut self, index: usize) -> T { self.list.remove(index) } } #[allow(non_camel_case_types)] #[repr(u8)] enum __srpc_action { get, push, remove, } impl serde::Serialize for __srpc_action { #[allow(clippy::use_self)] fn serialize(&self, serializer: S) -> core::result::Result where S: serde::Serializer, { let value: u8 = match *self { __srpc_action::get => __srpc_action::get as u8, __srpc_action::push => __srpc_action::push as u8, __srpc_action::remove => __srpc_action::remove as u8, }; serde::Serialize::serialize(&value, serializer) } } impl<'de> serde::Deserialize<'de> for __srpc_action { #[allow(clippy::use_self)] fn deserialize(deserializer: D) -> core::result::Result where D: serde::Deserializer<'de>, { struct discriminant; #[allow(non_upper_case_globals)] impl discriminant { const get: u8 = __srpc_action::get as u8; const push: u8 = __srpc_action::push as u8; const remove: u8 = __srpc_action::remove as u8; } match ::deserialize(deserializer)? { discriminant::get => core::result::Result::Ok(__srpc_action::get), discriminant::push => core::result::Result::Ok(__srpc_action::push), discriminant::remove => core::result::Result::Ok(__srpc_action::remove), other => core::result::Result::Err(serde::de::Error::custom( ::core::fmt::Arguments::new_v1( &["invalid value: ", ", expected one of: ", ", ", ", "], &match ( &other, &discriminant::get, &discriminant::push, &discriminant::remove, ) { _args => [ ::core::fmt::ArgumentV1::new(_args.0, ::core::fmt::Display::fmt), ::core::fmt::ArgumentV1::new(_args.1, ::core::fmt::Display::fmt), ::core::fmt::ArgumentV1::new(_args.2, ::core::fmt::Display::fmt), ::core::fmt::ArgumentV1::new(_args.3, ::core::fmt::Display::fmt), ], }, ), )), } } } impl< T: Clone + Send + Sync + 'static + ::srpc::__private::Serialize + ::srpc::__private::DeserializeOwned, > ::srpc::canary::service::Service for DistributedList { const ENDPOINT: &'static str = "distributed_list"; type Pipeline = (); type Meta = ::std::sync::Arc<::srpc::RwLock>>; fn service( __srpc_inner_meta: ::std::sync::Arc<::srpc::RwLock>>, ) -> Box { ::canary::service::run_metadata( __srpc_inner_meta, |__srpc_inner_meta: ::std::sync::Arc<::srpc::RwLock>>, mut __srpc_inner_channel: ::srpc::canary::Channel| async move { loop { match __srpc_inner_channel.receive::<__srpc_action>().await? { __srpc_action::push => { #[allow(unused_parens)] let (value): (T) = __srpc_inner_channel.receive().await?; __srpc_inner_meta.write().await.push(value).await; } __srpc_action::get => { #[allow(unused_parens)] let (index): (usize) = __srpc_inner_channel.receive().await?; let res = __srpc_inner_meta.read().await.get(index).await; __srpc_inner_channel.send(res).await?; } __srpc_action::remove => { #[allow(unused_parens)] let (index): (usize) = __srpc_inner_channel.receive().await?; let res = __srpc_inner_meta.write().await.remove(index).await; __srpc_inner_channel.send(res).await?; } } } }, ) } } impl< T: Clone + Send + Sync + 'static + ::srpc::__private::Serialize + ::srpc::__private::DeserializeOwned, > ::srpc::canary::service::StaticService for DistributedList { type Meta = ::std::sync::Arc<::srpc::RwLock>>; type Chan = ::srpc::canary::Channel; fn introduce( __srpc_inner_meta: ::std::sync::Arc<::srpc::RwLock>>, mut __srpc_inner_channel: ::srpc::canary::Channel, ) -> ::srpc::canary::runtime::JoinHandle<::srpc::canary::Result<()>> { ::srpc::canary::runtime::spawn(async move { loop { match __srpc_inner_channel.receive::<__srpc_action>().await? { __srpc_action::push => { #[allow(unused_parens)] let (value): (T) = __srpc_inner_channel.receive().await?; __srpc_inner_meta.write().await.push(value).await; } __srpc_action::get => { #[allow(unused_parens)] let (index): (usize) = __srpc_inner_channel.receive().await?; let res = __srpc_inner_meta.read().await.get(index).await; __srpc_inner_channel.send(res).await?; } __srpc_action::remove => { #[allow(unused_parens)] let (index): (usize) = __srpc_inner_channel.receive().await?; let res = __srpc_inner_meta.write().await.remove(index).await; __srpc_inner_channel.send(res).await?; } } } }) } } impl< T: Clone + Send + Sync + 'static + ::srpc::__private::Serialize + ::srpc::__private::DeserializeOwned, > DistributedListPeer { pub async fn push(&mut self, value: impl AsRef) -> ::srpc::canary::Result<()> { self.0.send(__srpc_action::push).await?; #[allow(unused_parens)] self.0.send((value.as_ref())).await?; Ok(()) } pub async fn get(&mut self, index: impl AsRef) -> ::srpc::canary::Result> { self.0.send(__srpc_action::get).await?; #[allow(unused_parens)] self.0.send((index.as_ref())).await?; self.0.receive().await } pub async fn remove(&mut self, index: impl AsRef) -> ::srpc::canary::Result { self.0.send(__srpc_action::remove).await?; #[allow(unused_parens)] self.0.send((index.as_ref())).await?; self.0.receive().await } } };