mod test_service; use log::{debug}; use runng_thrift::*; #[cfg(test)] mod tests { use super::*; use runng::{ *, msg::NngMsg, protocol::Subscribe, }; use thrift::{ protocol::{ TMultiplexedOutputProtocol, }, server::{ TMultiplexedProcessor, } }; #[test] fn it_works() -> NngReturn { let url = "inproc://test"; let factory = runng::Latest::default(); let publisher = factory.publisher_open()?.listen(url)?; let subscriber = factory.subscriber_open()?.dial(url)?; let topic: Vec = vec![0]; subscriber.subscribe(&topic); let mut msg = NngMsg::new()?; msg.append_u32(0)?; publisher.send(msg)?; subscriber.recv()?; Ok(()) } use std::{ thread, sync::Arc, }; use thrift::{ server::{TProcessor}, transport::{TIoChannel}, } ; #[derive(Debug)] pub struct TServer where PRC: TProcessor + Send + Sync,// + 'static, { processor: Arc } impl TServer where PRC: TProcessor + Send + Sync, { pub fn new(processor: PRC) -> TServer { TServer { processor: Arc::new(processor) } } } use crate::test_service::{ TestServiceSyncHandler, TestServiceSyncProcessor, TTestServiceSyncClient, }; struct Handler; impl TestServiceSyncHandler for Handler { fn handle_test(&self) -> thrift::Result { debug!("HANDLED!"); Ok(true) } } #[test] fn basic_thrift_works() -> NngReturn { let url = "inproc://test2"; let factory = runng::Latest::default(); let replier = factory.replier_open()?.listen(url)?; thread::spawn(move || { let mut channel = TNngChannel::new(replier.clone_socket()).unwrap(); let (readable, writable) = channel.split().unwrap(); let mut in_proto = TNngInputProtocol::new(readable); let mut out_proto = TNngOutputProtocol::new(writable); let handler = Handler{}; let processor = TestServiceSyncProcessor::new(handler); processor.process(&mut in_proto, &mut out_proto).unwrap(); debug!("Server done!"); }); let requester = factory.requester_open()?.dial(url)?; let mut channel = TNngChannel::new(requester.clone_socket()).unwrap(); let (readable, writable) = channel.split().unwrap(); let in_proto = TNngInputProtocol::new(readable); let out_proto = TNngOutputProtocol::new(writable); let mut client = test_service::TestServiceSyncClient::new(in_proto, out_proto); client.test().unwrap(); Ok(()) } //#[test] fn thrift_works() -> NngReturn { let url = "inproc://test3"; let serviceName = "blah"; let factory = runng::Latest::default(); let replier = factory.replier_open()?.listen(url)?; let mut channel = TNngChannel::new(replier.clone_socket())?; let (readable, writable) = channel.split().unwrap(); let in_proto = TNngInputProtocol::new(readable); let out_proto = TNngOutputProtocol::new(writable); let mut muxer = TMultiplexedProcessor::new(); let handler = Handler{}; let processor = TestServiceSyncProcessor::new(handler); muxer.register(serviceName, Box::new(processor), false); thread::spawn(move || { }); let requester = factory.requester_open()?.dial(url)?; let mut channel = TNngChannel::new(requester.clone_socket()).unwrap(); let (readable, writable) = channel.split().unwrap(); let in_proto = TNngInputProtocol::new(readable); let out_proto = TNngOutputProtocol::new(writable); let out_proto = TMultiplexedOutputProtocol::new(serviceName, out_proto); use crate::test_service::TTestServiceSyncClient; let mut client = test_service::TestServiceSyncClient::new(in_proto, out_proto); client.test().unwrap(); Ok(()) } }