use actix::prelude::*; use futures::prelude::*; use serde::{Deserialize, Serialize}; use std::error::Error; use std::time::Duration; use std::{env, fs::OpenOptions, path::PathBuf}; use structopt::StructOpt; use ya_service_bus::{actix_rpc, Handle, RpcEnvelope, RpcMessage, RpcStreamMessage}; #[derive(Serialize, Deserialize)] struct Ping(String); impl RpcMessage for Ping { const ID: &'static str = "ping"; type Item = String; type Error = (); } #[derive(Serialize, Deserialize)] struct StreamPing(String); impl RpcStreamMessage for StreamPing { const ID: &'static str = "ping"; type Item = String; type Error = (); } const BUS_ID: &str = "/local/exeunit"; #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] enum Command { Deploy {}, Start { #[serde(default)] args: Vec, }, Run { entry_point: String, #[serde(default)] args: Vec, }, Stop {}, Transfer { from: String, to: String, }, } #[derive(Serialize, Deserialize, Debug)] struct Execute(Vec); impl RpcMessage for Execute { const ID: &'static str = "execute"; type Item = String; type Error = String; } #[derive(Default)] struct ExeUnit(Option); impl Actor for ExeUnit { type Context = Context; fn started(&mut self, ctx: &mut Self::Context) { self.0 = Some(actix_rpc::bind::( BUS_ID, ctx.address().recipient(), )) } } impl Handler> for ExeUnit { type Result = Result; fn handle(&mut self, msg: RpcEnvelope, _ctx: &mut Self::Context) -> Self::Result { eprintln!("got {:?}", msg.as_ref()); Ok(format!("{:?}", msg.into_inner())) } } #[derive(StructOpt)] enum Args { /// Starts server that waits for commands on gsb://local/exe-unit Server {}, /// Sends script to gsb://local/exe-unit service Client { script: PathBuf, }, Local { script: PathBuf, }, Ping { dst: String, msg: String, }, StreamPing { dst: String, msg: String, }, } fn run_script(script: PathBuf) -> impl Future>> { async move { let commands: Vec = serde_json::from_reader(OpenOptions::new().read(true).open(script)?)?; let result = actix_rpc::service(BUS_ID).send(Execute(commands)).await?; result.map_err(From::from) } } fn main() -> Result<(), Box> { env::set_var("RUST_LOG", env::var("RUST_LOG").unwrap_or("debug".into())); env_logger::init(); let sys = System::new(); let args = Args::from_args(); match args { Args::Server { .. } => { let _ = ExeUnit::default().start(); sys.run()?; eprintln!("done"); } Args::Client { script } => { let result = sys.block_on(run_script(script))?; eprintln!("got result: {:?}", result); } Args::Ping { dst, msg } => { let result = sys.block_on(actix_rpc::service(&dst).send(Ping(msg)))?; eprintln!("got result: {:?}", result); } Args::StreamPing { dst, msg } => { let result = sys.block_on( actix_rpc::service(&dst) .call_stream(StreamPing(msg)) .for_each(|item| future::ready(eprintln!("got={:?}", item))), ); eprintln!("got result: {:?}", result); } Args::Local { script } => { let _ = ExeUnit::default().start(); let result = sys.block_on(async { tokio::time::sleep(Duration::from_millis(500)).await; run_script(script).await })?; eprintln!("got result: {:?}", result); } } Ok(()) }