// Copyright 2020 Joyent, Inc. use std::io::{Error, ErrorKind}; use std::net::{Shutdown, SocketAddr, TcpStream}; use std::process; use std::sync::{Arc, Barrier, Mutex}; use std::thread; use serde_json::Value; use slog::{debug, error, info, o, Drain, Logger}; use tokio::net::TcpListener; use tokio::prelude::*; use fast_rpc::client; use fast_rpc::protocol::{FastMessage, FastMessageId}; use fast_rpc::server; fn echo_handler( msg: &FastMessage, mut response: Vec, log: &Logger, ) -> Result, Error> { debug!(log, "handling echo function request"); response.push(FastMessage::data(msg.id, msg.data.clone())); Ok(response) } fn msg_handler( msg: &FastMessage, log: &Logger, ) -> Result, Error> { let response: Vec = vec![]; match msg.data.m.name.as_str() { "echo" => echo_handler(msg, response, &log), _ => Err(Error::new( ErrorKind::Other, format!("Unsupported function: {}", msg.data.m.name), )), } } fn run_server(barrier: Arc) { let plain = slog_term::PlainSyncDecorator::new(std::io::stdout()); let root_log = Logger::root( Mutex::new(slog_term::FullFormat::new(plain).build()).fuse(), o!("build-id" => "0.1.0"), ); let addr_str = "127.0.0.1:56652".to_string(); match addr_str.parse::() { Ok(addr) => { let listener = TcpListener::bind(&addr).expect("failed to bind"); info!(root_log, "listening for fast requests"; "address" => addr); barrier.wait(); tokio::run({ let process_log = root_log.clone(); let err_log = root_log.clone(); listener .incoming() .map_err(move |e| error!(&err_log, "failed to accept socket"; "err" => %e)) .for_each(move |socket| { let task = server::make_task(socket, msg_handler, Some(&process_log)); tokio::spawn(task); Ok(()) }) }) } Err(e) => { eprintln!("error parsing address: {}", e); } } } fn assert_handler(expected_data_size: usize) -> impl Fn(&FastMessage) { move |msg| { let data: Vec = serde_json::from_value(msg.data.d.clone()).unwrap(); assert_eq!(data.len(), 1); assert_eq!(data[0].len(), expected_data_size); } } fn response_handler( data_size: usize, ) -> impl Fn(&FastMessage) -> Result<(), Error> { let handler = assert_handler(data_size); move |msg| { handler(msg); Ok(()) } } #[test] fn client_server_comms() { let barrier = Arc::new(Barrier::new(2)); let barrier_clone = barrier.clone(); let _h_server = thread::spawn(move || run_server(barrier_clone)); barrier.clone().wait(); let addr_str = "127.0.0.1:56652".to_string(); let addr = addr_str.parse::().unwrap(); let mut stream = TcpStream::connect(&addr).unwrap_or_else(|e| { eprintln!("Failed to connect to server: {}", e); process::exit(1) }); (1..100).for_each(|x| { let data_size = x * 1000; let method = String::from("echo"); let args_str = ["[\"", &"a".repeat(data_size), "\"]"].concat(); let args: Value = serde_json::from_str(&args_str).unwrap(); let handler = response_handler(data_size); let mut msg_id = FastMessageId::new(); let result = client::send(method, args, &mut msg_id, &mut stream) .and_then(|_bytes_written| client::receive(&mut stream, handler)); assert!(result.is_ok()); }); let shutdown_result = stream.shutdown(Shutdown::Both); assert!(shutdown_result.is_ok()); }