use core::time; use std::{convert::TryFrom, io::Error, str::FromStr, thread}; use actix::{ io::{SinkWrite, WriteHandler}, Actor, ActorContext, Arbiter, AsyncContext, Context, Handler, Message as ActorMessage, StreamHandler, System, }; use actix_codec::Framed; use actix_web::{ client::{Client, WsProtocolError}, web::Bytes, }; use awc::{ ws::{Codec, Frame, Message}, BoxedSocket, }; use futures::{stream::SplitSink, StreamExt}; use infotainer::{websocket::{ClientCommand, ServerMessage}}; use itertools::Itertools; use uuid::Uuid; static CLI_COMMANDS: &[&str] = &["PublishText", "Subscribe", "Unsubscribe"]; struct Connection(SinkWrite, Message>>); impl Actor for Connection { type Context = Context; fn started(&mut self, ctx: &mut Context) { self.hb(ctx) } fn stopped(&mut self, _: &mut Context) { System::current().stop(); } } impl StreamHandler> for Connection { fn handle(&mut self, msg: Result, _: &mut Context) { match msg { Ok(frame) => match frame { Frame::Binary(data) => { if let Ok(iss) = serde_cbor::from_slice::(&data) { match iss { ServerMessage::Issue(i) => { let cmd = ClientCommand::GetLogEntries { log_id: i.0, entries: vec![i.1], }; self.0.write(Message::Binary(Bytes::from( serde_cbor::to_vec(&cmd).unwrap(), ))); } ServerMessage::LogEntry(e) => { for p in e { let data: String = String::from_utf8(p.data).unwrap(); println!( "Received publication {} for Subscription {}:\n{}", p.publication_id, p.subscription_id, data ) } } ServerMessage::LogIndex(i) => println!("{:?}", i), } } else { println!("Unable to handle received message"); } } _ => (), }, Err(e) => println!("{:?}", e), } } fn started(&mut self, _: &mut Context) { println!("Connected"); } fn finished(&mut self, ctx: &mut Context) { println!("Disconnected"); ctx.stop(); } } impl WriteHandler for Connection {} impl Handler for Connection { type Result = (); fn handle(&mut self, msg: CliCommand, _: &mut Self::Context) -> Self::Result { self.0.write(Message::Binary(Bytes::from( serde_cbor::to_vec(&ClientCommand::from(msg.into())).unwrap(), ))); } } impl Connection { fn hb(&self, ctx: &mut Context) { ctx.run_interval(time::Duration::new(5, 0), |act, _| { act.0.write(Message::Ping(Bytes::new())); }); } } #[derive(Debug, ActorMessage)] #[rtype("()")] enum CliCommand { PublishText(Uuid, String), Subscribe(Uuid), Unsubscribe(Uuid), } impl TryFrom for CliCommand { type Error = Box; fn try_from(cmdline: String) -> Result { let mut cli_input = cmdline.split(" "); let cmd = cli_input.next().ok_or(Error::new( std::io::ErrorKind::InvalidInput, "Missing command", ))?; if !CLI_COMMANDS.contains(&cmd) {} let id = Uuid::from_str(cli_input.next().ok_or(Error::new( std::io::ErrorKind::InvalidInput, "Missing log id parameter", ))?)?; match cmd { "PublishText" => Ok(CliCommand::PublishText(id, cli_input.join(" "))), "Subscribe" => Ok(CliCommand::Subscribe(id)), "Unsubscribe" => Ok(CliCommand::Unsubscribe(id)), _ => Err(Box::new(Error::new( std::io::ErrorKind::InvalidInput, "Invalid command", ))), } } } impl Into for CliCommand { fn into(self) -> ClientCommand { match self { CliCommand::PublishText(subscription_id, submission) => { ClientCommand::SubmitPublication { subscription_id, submission: submission.into(), } } CliCommand::Subscribe(subscription_id) => ClientCommand::Subscribe { subscription_id }, CliCommand::Unsubscribe(subscription_id) => { ClientCommand::Unsubscribe { subscription_id } } } } } fn main() -> std::io::Result<()> { env_logger::init(); let client_id = Uuid::new_v4(); let sys = System::new("infotainer-client-example"); Arbiter::spawn(async move { let (response, framed) = Client::default() .ws(format!("ws://127.0.0.1:1312/ws/{}", client_id)) .connect() .await .unwrap(); println!("Response: {:?}", response); let (sink, stream) = framed.split(); let conn = Connection::create(|ctx| { Connection::add_stream(stream, ctx); Connection(SinkWrite::new(sink, ctx)) }); thread::spawn(move || loop { let mut cmd = String::default(); if let Err(e) = std::io::stdin().read_line(&mut cmd) { println!("Could not read from commandline: {:?}", e); return; } match CliCommand::try_from(cmd.strip_suffix("\n").unwrap().to_owned()) { Ok(c) => conn.do_send(c), Err(e) => println!("Error: {:?}", e), } }); }); sys.run() }