use actix::prelude::*; use actix::{Actor, ActorContext, AsyncContext, Addr, Handler, StreamHandler}; use actix_http::ws::Codec; use actix_web::{web, get, middleware, App, Error, HttpRequest, HttpResponse, HttpServer, Responder}; use actix_web_actors::ws; use serde::Deserialize; use lazy_static::lazy_static; use std::sync::{Arc, RwLock}; use std::collections::{HashMap, HashSet}; use simple_logger::SimpleLogger; lazy_static! { // static ref RESPONSE_BUILDERS : HashMap = Arc::new(Mutex::new(HashMap::::new())); // static ref RESPONSE_BUILDERS : dyn Send + Arc>>> = Arc::new(Mutex::new(HashMap::>::new())); // static ref RESPONSE_BUILDERS : Arc>> = Arc::new(Mutex::new(HashMap::::new())); static ref RESPONSE_BUILDERS : Arc>> = Arc::new(RwLock::new(HashMap::::new())); static ref COUNTER : RwLock = RwLock::new(Counter(0)); static ref ROOM : Arc>> = Arc::new(RwLock::new(HashMap::::new())); static ref ADDR : Arc>>>> = Arc::new(RwLock::new(HashMap::>>::new())); static ref ADDR2 : Arc>>> = Arc::new(RwLock::new(HashMap::>::new())); static ref APIRPC : Arc>> = Arc::new(RwLock::new(HashMap::::new())); // static ref AGENTS: Arc>>>> = Arc::new(RwLock::new(HashMap::>>::new())); static ref AGENTS: Arc>>> = Arc::new(RwLock::new(HashMap::>::new())); static ref PENDING_CLIENTS: Arc>>> = Arc::new(RwLock::new(HashMap::>::new())); static ref CLIENT_TERMINAL_PAIRS: Arc, Addr>>> = Arc::new(RwLock::new(HashMap::, Addr>::new())); } const VERSION: &str = env!("CARGO_PKG_VERSION"); struct Agent { // seq: u32, // agent_id: String, agent_id: Option, } #[derive(Message)] #[rtype(result = "()")] struct NewTerminal; impl Handler for Agent { type Result = (); fn handle(&mut self, _msg: NewTerminal, ctx: &mut Self::Context) { // self.handle_msg(msg.0, ctx); println!("received NewTerminal"); ctx.binary("TERMINAL\n"); } } impl Agent { fn is_initialized(&mut self) -> bool { // self.seq == 0 self.agent_id.is_some() } fn on_init(&mut self, ctx: &mut ::Context) { let mut agents = AGENTS.write().unwrap(); let id = self.agent_id.as_ref().unwrap().clone(); // let set = agents.entry(id).or_insert(HashSet::>::new()); // set.insert(ctx.address()); // agents[id] = ctx.address(); *agents.entry(id).or_insert(ctx.address()) = ctx.address(); } fn on_stop(&mut self, _ctx: &mut ::Context) { let mut agents = AGENTS.write().unwrap(); let id = self.agent_id.as_ref().unwrap().clone(); // let set = agents.entry(id).or_insert(HashSet::>::new()); // set.remove(&ctx.address()); agents.remove(&id); } fn init(&mut self, msg: ws::Message, ctx: &mut ::Context) { let mut agent_info = "".to_string(); match msg { ws::Message::Text(m) => { println!("agent_info text: {:?}", m); agent_info = m.to_string(); // ctx.text(m); }, ws::Message::Binary(m) => { println!("agent_info binary: {:?}", m); agent_info = String::from_utf8_lossy(&m).to_string(); // ctx.binary(m); }, _ => (), }; println!("agent_info: {}", agent_info); // let deserialized: Result = serde_json::from_str::(&agent_info); let deserialized = serde_json::from_str::(&agent_info); if let Ok(id) = deserialized { if let Some(id) = id.id { println!("id: {}", id); self.agent_id = Some(id); self.on_init(ctx); } } } fn handle_cmd(&mut self, msg: ws::Message, _ctx: &mut ::Context) { let mut cmd = "".to_string(); match msg { ws::Message::Text(m) => { println!("agent_info text: {:?}", m); cmd = m.to_string(); // ctx.text(m); }, ws::Message::Binary(m) => { println!("agent_info binary: {:?}", m); cmd = String::from_utf8_lossy(&m).to_string(); // ctx.binary(m); }, _ => (), }; println!("cmd: {}", cmd); } } impl Actor for Agent { type Context = ws::WebsocketContext; fn stopped(&mut self, ctx: &mut Self::Context) { self.on_stop(ctx) } } impl StreamHandler> for Agent { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { if self.is_initialized() { // terminal output self.handle_cmd(msg.unwrap(), ctx) } else { // msg contains json line formatted agent info, hopefully self.init(msg.unwrap(), ctx); // ctx.binary("TERMINAL\n"); } } } #[get("/api/rpc")] async fn api_agent(req: HttpRequest, stream: web::Payload, id: web::Query) -> Result { // let id = id.id.clone(); // let id = "todo" // let seq = agent_count(id); let handler = Agent{ // seq: 0, // agent_id: id, agent_id: id.id.clone(), }; ws::start(handler, &req, stream) } struct Terminal { agent_id: String, client_addr: Option>, } impl Handler for Terminal { type Result = (); fn handle(&mut self, msg: WsMessage, ctx: &mut Self::Context) { self.handle_msg(msg.0, ctx); } } impl Actor for Terminal { type Context = ws::WebsocketContext; fn started(&mut self, ctx: &mut Self::Context) { let mut pending_clients = PENDING_CLIENTS.write().unwrap(); let id = self.agent_id.clone(); let client_addr = pending_clients[&id].clone(); self.client_addr = Some(client_addr.clone()); pending_clients.remove(&id); let mut client_terminal_pairs = CLIENT_TERMINAL_PAIRS.write().unwrap(); *client_terminal_pairs.entry(client_addr).or_insert(ctx.address()) = ctx.address(); println!("agent started"); } fn stopped(&mut self, _ctx: &mut Self::Context) { // close client too } } impl Terminal { fn forward(&mut self, msg: ws::Message, _ctx: &mut ::Context) { // println!("agent forward"); if let Some(client_addr) = self.client_addr.clone() { client_addr.do_send(WsMessage(msg)) } } fn handle_msg(&mut self, msg: ws::Message, ctx: &mut ::Context) { let mut message = "".to_string(); match msg { ws::Message::Text(m) => { message = m.to_string(); ctx.text(m); }, ws::Message::Binary(m) => { message = String::from_utf8_lossy(&m).to_string(); ctx.binary(m); }, _ => (), }; println!("terminal::handle_msg({}): {}", message.len(), message); } } impl StreamHandler> for Terminal { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { match msg { Ok(m) => { self.forward(m, ctx); } _ => { }, } } } #[get("/api/terminal")] async fn api_terminal(req: HttpRequest, stream: web::Payload, id: web::Query) -> Result { // let id = id.id.clone(); // let id = "todo" // let seq = agent_count(id); let agent_id = id.id.clone().unwrap(); let handler = Terminal{ agent_id: agent_id.clone(), client_addr: None, }; println!("/api/terminal?id={}", agent_id); ws::start(handler, &req, stream) } struct Client { agent_id: String, agent_addr: Addr, } impl Handler for Client { type Result = (); fn handle(&mut self, msg: WsMessage, ctx: &mut Self::Context) { self.handle_msg(msg.0, ctx); } } impl Actor for Client { type Context = ws::WebsocketContext; fn started(&mut self, ctx: &mut Self::Context) { let mut pending_clients = PENDING_CLIENTS.write().unwrap(); let id = self.agent_id.clone(); *pending_clients.entry(id).or_insert(ctx.address()) = ctx.address(); self.agent_addr.do_send(NewTerminal); println!("client started"); } fn stopped(&mut self, _ctx: &mut Self::Context) { } } impl Client { fn forward(&mut self, msg: ws::Message, ctx: &mut ::Context) { // println!("client forward"); loop { let client_terminal_pairs = CLIENT_TERMINAL_PAIRS.read().unwrap(); if client_terminal_pairs.get(&ctx.address()).is_some() { break } else { std::thread::sleep(std::time::Duration::from_millis(10)); } } let client_terminal_pairs = CLIENT_TERMINAL_PAIRS.read().unwrap(); let terminal_addr = client_terminal_pairs.get(&ctx.address()).unwrap(); terminal_addr.do_send(WsMessage(msg)); } fn handle_msg(&mut self, msg: ws::Message, ctx: &mut ::Context) { let mut message = "".to_string(); match msg { ws::Message::Text(m) => { message = m.to_string(); ctx.text(m); }, ws::Message::Binary(m) => { message = String::from_utf8_lossy(&m).to_string(); println!("bin({}): {:?}", m.len(), m); ctx.binary(m); }, _ => (), }; println!("client::handle_msg({}): {}", message.len(), message); } } impl StreamHandler> for Client { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { match msg { Ok(m) => { self.forward(m, ctx); } _ => { }, } } } #[get("/api/agent/{id}/terminal")] async fn api_client(req: HttpRequest, stream: web::Payload, id: web::Path) -> impl Responder { println!("path: {}\n", req.path()); let agent_id = id.id.clone().unwrap(); let agents = AGENTS.read().unwrap(); let agent_addr = agents.get(&agent_id); if agent_addr.is_none() { return Ok(HttpResponse::NotFound().body("not found")) } let handler = Client{ agent_id: agent_id, agent_addr: agent_addr.unwrap().clone(), }; ws::start(handler, &req, stream) } struct Counter(u128); impl Iterator for Counter { type Item = u128; fn next(&mut self) -> Option { self.0 += 1; Some(self.0) } } #[derive(Clone,Debug,PartialEq,Eq,Hash)] struct GlobalEcho{ id: u128, name: String, addr: Option>, } impl Actor for GlobalEcho { type Context = ws::WebsocketContext; fn started(&mut self, ctx: &mut Self::Context) { self.addr = Some(ctx.address()); println!("addr: {:?}", ctx.address()); let name = self.name.as_str(); let mut room = ROOM.write().unwrap(); let counter = room.entry(name.to_string()).or_insert(0); *counter += 1; println!("room {} has {} members", name, room[name]); let mut addr = ADDR.write().unwrap(); let set = addr.entry(name.to_string()).or_insert(HashSet::>::new()); set.insert(ctx.address()); /* let mut addr2 = ADDR2.write().unwrap(); let map = addr2.entry(name.to_string()).or_insert(HashMap::::new()); map[&self.id] = self; */ } fn stopped(&mut self, ctx: &mut Self::Context) { let name = self.name.as_str(); let mut room = ROOM.write().unwrap(); let counter = room.entry(name.to_string()).or_insert(1); *counter -= 1; println!("room {} has {} members", name, room[name]); let mut addr = ADDR.write().unwrap(); let set = addr.entry(name.to_string()).or_insert(HashSet::>::new()); set.remove(&ctx.address()); } } #[derive(Message)] #[rtype(result = "()")] struct WsMessage (ws::Message); impl Handler for GlobalEcho { type Result = (); fn handle(&mut self, msg: WsMessage, ctx: &mut Self::Context) { self.handle_msg(msg.0, ctx); } } impl StreamHandler> for GlobalEcho { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { match msg { Ok(m) => self.handle_msg(m, ctx), _ => (), } } } impl GlobalEcho { fn handle_msg(&mut self, msg: ws::Message, ctx: &mut ::Context) { match msg { ws::Message::Ping(m) => { ctx.pong(&m); }, ws::Message::Text(m) => { println!("text: {}", m); if m == "addr\n" { println!("{:?}", ctx.address()); } if m == "id\n" { println!("{}", self.id); } if m == "name\n" { println!("{}", self.name); } if m == "room\n" { let mut addr = ADDR.write().unwrap(); let set = addr.entry(self.name.to_string()).or_insert(HashSet::>::new()); let addrs : Vec<&Addr<_>> = set.iter().collect(); println!("len: {}", set.len()); for (i, a) in addrs.iter().enumerate() { let is_self = ctx.address() == **a; println!("{} {} {:?}", i, is_self, a); if !is_self { a.do_send(WsMessage(ws::Message::Text("hello".into()))); // println!("{:?}", resp); } } } ctx.text(m); }, ws::Message::Binary(m) => { ctx.binary(m); }, ws::Message::Pong(_m) => (), ws::Message::Close(_m) => { ctx.stop() }, ws::Message::Continuation(_m) => (), ws::Message::Nop => (), } } } #[derive(Deserialize)] struct ID { id: Option, } #[get("/ws/rpc")] async fn ws_rpc(req: HttpRequest, stream: web::Payload, id: web::Query) -> Result { let mut res = ws::handshake(&req)?; let idx = { let mut c = COUNTER.write().unwrap(); c.next().unwrap() }; let mut idn = String::new(); if let Some(x) = &id.id { idn.push_str(x); } println!("idx: {}", idx); println!("id: {}", idn); Ok(res.streaming(ws::WebsocketContext::with_codec( GlobalEcho{id: idx, name: idn, addr: None}, stream, Codec::new().max_size(10 * 1024 * 1024), // 10mb frame limit ))) } struct Echo; impl Actor for Echo { type Context = ws::WebsocketContext; } impl StreamHandler> for Echo { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { match msg { Ok(ws::Message::Ping(msg)) => ctx.pong(&msg), Ok(ws::Message::Text(text)) => ctx.text(text), Ok(ws::Message::Binary(bin)) => ctx.binary(bin), _ => { println!("disconnected") }, } } } /// Entry point for connecting nodes #[get("/api/terminal/debug")] async fn api_terminal_debug( req: HttpRequest, stream: web::Payload, ) -> Result { let mut res = ws::handshake(&req)?; println!("path: {}\n", req.path()); Ok(res.streaming(ws::WebsocketContext::with_codec( Echo{}, stream, Codec::new().max_size(10 * 1024 * 1024), // 10mb frame limit ))) } /// Entry point for connecting nodes #[get("/ws")] async fn ws_echo( req: HttpRequest, stream: web::Payload, ) -> Result { let mut res = ws::handshake(&req)?; Ok(res.streaming(ws::WebsocketContext::with_codec( Echo{}, stream, Codec::new().max_size(10 * 1024 * 1024), // 10mb frame limit ))) } struct Sink; impl Actor for Sink { type Context = ws::WebsocketContext; } impl StreamHandler> for Sink { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { match msg { Ok(ws::Message::Ping(msg)) => { ctx.pong(&msg) }, Ok(ws::Message::Text(text)) => { if text == "q" { ctx.stop() } else { println!("text: {}", text) } }, Ok(ws::Message::Binary(bin)) => { let data = String::from_utf8_lossy(&bin); println!("{}", data) }, _ => (), } } } /// Entry point for connecting nodes /// /// substrate --dev --ws-external --telemetry-url 'ws://127.0.0.1:8000/submit 0' /// #[get("/submit")] async fn ws_submit( req: HttpRequest, stream: web::Payload, ) -> Result { let mut res = ws::handshake(&req)?; Ok(res.streaming(ws::WebsocketContext::with_codec( Sink{}, stream, Codec::new().max_size(10 * 1024 * 1024), // 10mb frame limit ))) } /// Entry point for version info #[get("/version")] async fn version() -> impl Responder { let body = format!("{}", VERSION); body } /// Entry point for health check monitoring bots #[get("/health")] async fn health() -> impl Responder { let body = format!("Connected chains: {}", 0); body } /// Entry point for health check monitoring bots #[get("/echo/{name}")] async fn echo_name(path: web::Path) -> impl Responder { format!("name: {}", path.into_inner()) } #[derive(Deserialize)] struct Name { name: String, } /// Entry point for health check monitoring bots #[get("/name/{name}")] async fn struct_name(name: web::Path) -> impl Responder { format!("name: {}", name.name) } #[derive(Deserialize)] struct OptionalName { name: Option, } /// Entry point for health check monitoring bots #[get("/name")] async fn struct_name_query(name: web::Query) -> impl Responder { format!("name: {:?}", name.name) } /// Entry point for health check monitoring bots #[get("/echo/{first}/{last}")] async fn echo_first_last(path: web::Path<(String, String)>) -> impl Responder { let (first, last) = path.into_inner(); format!("first: {}, last: {}", first, last) } #[derive(Deserialize)] struct FullName { first: String, last: String, } /// Entry point for health check monitoring bots #[get("/fullname/{first}/{last}")] async fn struct_fullname(fullname: web::Path) -> impl Responder { format!("first: {}, last: {}", fullname.first, fullname.last) } #[derive(Deserialize)] struct OptionalFullName { first: Option, last: Option, } /// Entry point for health check monitoring bots #[get("/fullname")] async fn struct_fullname_query(fullname: web::Query) -> impl Responder { format!("first: {:?}, last: {:?}", fullname.first, fullname.last) } #[derive(Deserialize)] struct HybridQuery { id: Option, } /// Entry point for health check monitoring bots #[get("/hybrid/{name}")] async fn hybrid_name(name: web::Path, id: web::Query) -> impl Responder { format!("name: {}, id: {:?}", name.name, id.id) } /// Entry point for health check monitoring bots #[get("/qmap")] async fn query_map(qmap: web::Query>) -> impl Responder { format!("{:?}", qmap) } /// Entry point for health check monitoring bots #[get("/qvec")] async fn query_vec(qvec: web::Query>) -> impl Responder { format!("{:?}", qvec) } /// Entry point for index #[get("/*")] async fn index(req: HttpRequest) -> impl Responder { println!("path: {}\n", req.path()); let mut body = "".to_string(); body.push_str(&format!("path: {}\n", req.path())); body.push_str(&format!("query_string: {}\n", req.query_string())); body.push_str(&format!("headers: {:?}\n", req.headers())); body } /// Telemetry entry point. Listening by default on 127.0.0.1:8000. /// This can be changed using the `PORT` and `BIND` ENV variables. #[actix_web::main] async fn main() -> std::io::Result<()> { SimpleLogger::new() .with_level(log::LevelFilter::Info) .init() .expect("Must be able to start a logger"); log::info!("Starting telemetry version: {}", env!("CARGO_PKG_VERSION")); HttpServer::new(move || { App::new() .wrap(middleware::NormalizePath::default()) .service(health) .service(version) .service(ws_echo) .service(ws_submit) .service(echo_name) .service(echo_first_last) .service(struct_name) .service(struct_name_query) .service(struct_fullname) .service(struct_fullname_query) .service(hybrid_name) .service(query_map) .service(query_vec) .service(ws_rpc) .service(api_client) .service(api_terminal) .service(api_agent) .service(index) }) .bind("127.0.0.1:8000")? .run() .await }