extern crate rs_jsonrpc_core; extern crate rs_jsonrpc_pubsub; #[macro_use] extern crate rs_jsonrpc_macros; extern crate rs_jsonrpc_tcp_server; use std::thread; use std::sync::{atomic, Arc, RwLock}; use std::collections::HashMap; use rs_jsonrpc_core::{Metadata, Error, ErrorCode, Result}; use rs_jsonrpc_core::futures::Future; use rs_jsonrpc_pubsub::{Session, PubSubMetadata, PubSubHandler, SubscriptionId}; use rs_jsonrpc_macros::pubsub; #[derive(Clone, Default)] struct Meta { session: Option>, } impl Metadata for Meta {} impl PubSubMetadata for Meta { fn session(&self) -> Option> { self.session.clone() } } build_rpc_trait! { pub trait Rpc { type Metadata; /// Adds two numbers and returns a result #[rpc(name = "add")] fn add(&self, u64, u64) -> Result; #[pubsub(name = "hello")] { /// Hello subscription #[rpc(name = "hello_subscribe", alias = ["hello_sub", ])] fn subscribe(&self, Self::Metadata, pubsub::Subscriber, u64); /// Unsubscribe from hello subscription. #[rpc(name = "hello_unsubscribe")] fn unsubscribe(&self, SubscriptionId) -> Result; } } } #[derive(Default)] struct RpcImpl { uid: atomic::AtomicUsize, active: Arc>>>, } impl Rpc for RpcImpl { type Metadata = Meta; fn add(&self, a: u64, b: u64) -> Result { Ok(a + b) } fn subscribe(&self, _meta: Self::Metadata, subscriber: pubsub::Subscriber, param: u64) { if param != 10 { subscriber.reject(Error { code: ErrorCode::InvalidParams, message: "Rejecting subscription - invalid parameters provided.".into(), data: None, }).unwrap(); return; } let id = self.uid.fetch_add(1, atomic::Ordering::SeqCst); let sub_id = SubscriptionId::Number(id as u64); let sink = subscriber.assign_id(sub_id.clone()).unwrap(); self.active.write().unwrap().insert(sub_id, sink); } fn unsubscribe(&self, id: SubscriptionId) -> Result { let removed = self.active.write().unwrap().remove(&id); if removed.is_some() { Ok(true) } else { Err(Error { code: ErrorCode::InvalidParams, message: "Invalid subscription.".into(), data: None, }) } } } fn main() { let mut io = PubSubHandler::default(); let rpc = RpcImpl::default(); let active_subscriptions = rpc.active.clone(); thread::spawn(move || { loop { { let subscribers = active_subscriptions.read().unwrap(); for sink in subscribers.values() { let _ = sink.notify(Ok("Hello World!".into())).wait(); } } thread::sleep(::std::time::Duration::from_secs(1)); } }); io.extend_with(rpc.to_delegate()); let server = rs_jsonrpc_tcp_server::ServerBuilder::new(io) .session_meta_extractor(|context: &rs_jsonrpc_tcp_server::RequestContext| { Meta { session: Some(Arc::new(Session::new(context.sender.clone()))), } }) .start(&"0.0.0.0:3030".parse().unwrap()) .expect("Server must start with no issues"); server.wait() }