pub(crate) mod messages; mod event; mod disconnect; mod discover; mod heartbeat; mod info; mod ping; mod pong; mod request; mod response; use std::collections::HashMap; use std::sync::Arc; use act_zero::runtimes::tokio::spawn_actor; use act_zero::*; use async_trait::async_trait; use log::{debug, error}; use serde_json::Value; use thiserror::Error; use tokio::sync::oneshot::Sender; use crate::{ broker::ServiceBroker, config, config::{Channel, Config, Transporter}, nats, }; use self::{ disconnect::Disconnect, discover::{Discover, DiscoverTargeted}, event::Event, heartbeat::Heartbeat, info::{Info, InfoTargeted}, messages::outgoing::DisconnectMessage, ping::{Ping, PingTargeted}, pong::Pong, request::Request, response::Response, }; #[derive(Error, Debug)] pub(crate) enum Error { #[error("Unable to start listeners actor")] UnableToStartListeners, #[error(transparent)] Nats(#[from] nats::Error), #[error(transparent)] Deserialize(#[from] config::DeserializeError), #[error(transparent)] Serialize(#[from] config::SerializeError), } #[async_trait] impl Actor for ChannelSupervisor { async fn started(&mut self, pid: Addr) -> ActorResult<()> { self.pid = pid.downgrade(); Produces::ok(()) } async fn error(&mut self, error: ActorError) -> bool { error!("ChannelSupervisor Actor Error: {:?}", error); // do not stop on actor error false } } pub(crate) struct ChannelSupervisor { broker: Addr, conn: nats::Conn, config: Arc, pid: WeakAddr, channels: HashMap, // channels event: Addr, request: Addr, response: Addr, discover: Addr, discover_targeted: Addr, info: Addr, info_targeted: Addr, heartbeat: Addr, ping: Addr, ping_targeted: Addr, pong: Addr, disconnect: Addr, } impl ChannelSupervisor { async fn new(broker: Addr, config: Arc) -> Self { let channels = Channel::build_hashmap(&config); let conn = match &config.transporter { Transporter::Nats(nats_address) => nats::Conn::new(nats_address) .await .expect("NATS should connect"), }; Self { broker, conn, config, channels, pid: WeakAddr::detached(), event: Addr::detached(), request: Addr::detached(), response: Addr::detached(), discover: Addr::detached(), discover_targeted: Addr::detached(), info: Addr::detached(), info_targeted: Addr::detached(), heartbeat: Addr::detached(), ping: Addr::detached(), ping_targeted: Addr::detached(), pong: Addr::detached(), disconnect: Addr::detached(), } } async fn start_listeners(&mut self) -> ActorResult<()> { let broker_pid = self.broker.clone().downgrade(); self.heartbeat = spawn_actor( Heartbeat::new( self.pid.clone(), broker_pid.clone(), &self.config, &self.conn, ) .await, ); self.ping = spawn_actor(Ping::new(self.pid.clone(), &self.config, &self.conn).await); self.ping_targeted = spawn_actor(PingTargeted::new(self.pid.clone(), &self.config, &self.conn).await); self.pong = spawn_actor(Pong::new(self.pid.clone(), &self.config, &self.conn).await); self.disconnect = spawn_actor(Disconnect::new(broker_pid.clone(), &self.config, &self.conn).await); self.discover = spawn_actor( Discover::new( broker_pid.clone(), self.pid.clone(), &self.config, &self.conn, ) .await, ); self.discover_targeted = spawn_actor(DiscoverTargeted::new(broker_pid.clone(), &self.config, &self.conn).await); self.info = spawn_actor(Info::new(broker_pid.clone(), &self.config, &self.conn).await); self.info_targeted = spawn_actor(InfoTargeted::new(broker_pid.clone(), &self.config, &self.conn).await); self.event = spawn_actor(Event::new(broker_pid.clone(), &self.config, &self.conn).await); self.request = spawn_actor(Request::new(broker_pid, &self.config, &self.conn).await); self.response = spawn_actor(Response::new(&self.config, &self.conn).await); Produces::ok(()) } pub(crate) async fn broadcast_discover(&self) { send!(self.discover.broadcast()); } pub(crate) async fn publish_to_channel( &self, channel: T, message: Vec, ) -> ActorResult<()> where T: AsRef, { let res = self.conn.send(channel.as_ref(), message).await; if let Err(err) = res { error!("Unable to send message: {}", err) } Produces::ok(()) } pub(crate) async fn start_response_waiter( &self, node_name: String, request_id: String, tx: Sender, ) -> ActorResult<()> { call!(self.response.start_response_waiter( self.config.request_timeout, node_name, request_id, tx )) .await?; Produces::ok(()) } async fn publish(&self, channel: Channel, message: Vec) -> ActorResult<()> { let channel = self .channels .get(&channel) .expect("should always find channel"); let _ = self.publish_to_channel(channel.as_str(), message).await; debug!("Message published to channel: {}", channel); Produces::ok(()) } async fn send_disconnect(&self) -> ActorResult<()> { let msg = DisconnectMessage::new(&self.config.node_id); let _ = self .publish(Channel::Disconnect, self.config.serializer.serialize(msg)?) .await; debug!("Disconnect message sent"); Produces::ok(()) } } pub(crate) async fn start_supervisor( broker: Addr, config: Arc, ) -> Result, Error> { let channel_supervisor = spawn_actor(ChannelSupervisor::new(broker, config).await); call!(channel_supervisor.start_listeners()) .await .map_err(|_| Error::UnableToStartListeners)?; Ok(channel_supervisor) } pub(crate) async fn listen_for_disconnect(supervisor: Addr) { // detects SIGTERM and sends disconnect package let _ = ctrlc::set_handler(move || { send!(supervisor.send_disconnect()); println!("Exiting molecular...."); std::thread::sleep(std::time::Duration::from_millis(100)); std::process::exit(1); }); }