// Copyright 2020-2022 Farcaster Devs & LNP/BP Standards Association // // Use of this source code is governed by an MIT-style // license that can be found in the LICENSE file or at // https://opensource.org/licenses/MIT. use crate::bus::ctl::CtlMsg; use crate::bus::info::InfoMsg; use crate::bus::BusMsg; use crate::bus::{Failure, Progress, ServiceBus}; use std::fmt::{self, Display, Formatter}; use std::str::FromStr; use bitcoin::hashes::hex::{self, ToHex}; use colored::Colorize; use farcaster_core::role::{SwapRole, TradeRole}; use farcaster_core::Uuid; use internet2::addr::NodeId; use internet2::{ addr::{NodeAddr, ServiceAddr}, zeromq, zeromq::ZmqSocketType, }; use lazy_static::lazy_static; use microservices::esb; #[cfg(feature = "node")] use microservices::node::TryService; use strict_encoding::{strict_deserialize, strict_serialize}; use strict_encoding::{StrictDecode, StrictEncode}; use farcaster_core::{ blockchain::{Blockchain, Network}, swap::SwapId, }; use crate::opts::Opts; use crate::Error; lazy_static! { pub static ref ZMQ_CONTEXT: zmq::Context = zmq::Context::new(); } #[derive( Wrapper, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, From, Default, StrictEncode, StrictDecode, )] #[cfg_attr( feature = "serde", derive(Serialize, Deserialize), serde(crate = "serde_crate") )] pub struct ClientName([u8; 32]); impl Display for ClientName { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { if f.alternate() { write!( f, "{}..{}", self.0[..4].to_hex(), self.0[(self.0.len() - 4)..].to_hex() ) } else { f.write_str(&String::from_utf8_lossy(&self.0)) } } } impl FromStr for ClientName { type Err = hex::Error; fn from_str(s: &str) -> Result { let mut me = Self::default(); if s.len() > 32 { me.0.copy_from_slice(&s.as_bytes()[0..32]); } else { let mut me = Self::default(); me.0[0..s.len()].copy_from_slice(s.as_bytes()); } Ok(me) } } #[derive(Debug, Clone, Hash)] pub struct ServiceConfig { /// ZMQ socket for peer-to-peer network message bus pub msg_endpoint: ServiceAddr, /// ZMQ socket for internal service control bus pub ctl_endpoint: ServiceAddr, /// ZMQ socket for internal info service bus pub info_endpoint: ServiceAddr, /// ZMQ socket for syncer events bus pub sync_endpoint: ServiceAddr, } #[cfg(feature = "shell")] impl From for ServiceConfig { fn from(opts: Opts) -> Self { ServiceConfig { msg_endpoint: opts.msg_socket, ctl_endpoint: opts.ctl_socket, info_endpoint: opts.info_socket, sync_endpoint: opts.sync_socket, } } } /// Identifiers of daemons participating in Farcaster Node #[derive(Clone, PartialEq, Eq, Hash, Debug, Display, From, StrictEncode, StrictDecode)] #[cfg_attr( feature = "serde", derive(Serialize, Deserialize), serde(crate = "serde_crate") )] pub enum ServiceId { #[display("loopback")] Loopback, #[display("farcasterd")] Farcasterd, #[display("peerd<{0} {1}>")] Peer(u128, NodeAddr), #[display("swap<{0}>")] #[from] Swap(SwapId), #[display("client<{0}>")] Client(u64), #[display("{0} ({1}) syncer")] Syncer(Blockchain, Network), #[display("walletd")] Wallet, #[display("grpcd")] Grpcd, #[display("grpcd_client<{0}>")] GrpcdClient(u64), #[display("databased")] Database, #[display("other<{0}>")] Other(ClientName), } impl ServiceId { pub fn router() -> ServiceId { ServiceId::Farcasterd } pub fn client() -> ServiceId { use bitcoin::secp256k1::rand; ServiceId::Client(rand::random()) } pub fn node_id(&self) -> Option { if let ServiceId::Peer(_, addr) = self { Some(addr.id) } else { None } } pub fn node_addr(&self) -> Option { if let ServiceId::Peer(_, addr) = self { Some(*addr) } else { None } } pub fn dummy_peer_service_id(node_addr: NodeAddr) -> ServiceId { ServiceId::Peer(0, node_addr) } } impl esb::ServiceAddress for ServiceId {} impl From for Vec { fn from(daemon_id: ServiceId) -> Self { strict_serialize(&daemon_id).expect("Memory-based encoding does not fail") } } impl From> for ServiceId { fn from(vec: Vec) -> Self { strict_deserialize(&vec).unwrap_or_else(|_| { ServiceId::Other( ClientName::from_str(&String::from_utf8_lossy(&vec)) .expect("ClientName conversion never fails"), ) }) } } pub struct Service where Runtime: esb::Handler, esb::Error: From, { esb: esb::Controller, broker: bool, } impl Service where Runtime: esb::Handler, esb::Error: From, { #[cfg(feature = "node")] pub fn run(config: ServiceConfig, runtime: Runtime, broker: bool) -> Result<(), Error> { let service = Self::with(config, runtime, broker)?; service.run_loop()?; unreachable!() } fn with( config: ServiceConfig, runtime: Runtime, broker: bool, ) -> Result> { let router = if !broker { Some(ServiceId::router()) } else { None }; let api_type = if broker { ZmqSocketType::RouterBind } else { ZmqSocketType::RouterConnect }; let services = map! { ServiceBus::Msg => esb::BusConfig::with_addr( config.msg_endpoint, api_type, router.clone() ), ServiceBus::Ctl => esb::BusConfig::with_addr( config.ctl_endpoint, api_type, router.clone() ), ServiceBus::Info => esb::BusConfig::with_addr( config.info_endpoint, api_type, router.clone() ), ServiceBus::Sync => esb::BusConfig::with_addr( config.sync_endpoint, api_type, router ) }; let esb = esb::Controller::with(services, runtime)?; Ok(Self { esb, broker }) } pub fn broker(config: ServiceConfig, runtime: Runtime) -> Result> { Self::with(config, runtime, true) } #[allow(clippy::self_named_constructors)] pub fn service(config: ServiceConfig, runtime: Runtime) -> Result> { Self::with(config, runtime, false) } pub fn is_broker(&self) -> bool { self.broker } pub fn add_bridge_service_bus( &mut self, socket: zmq::Socket, ) -> Result<(), esb::Error> { self.esb.add_service_bus( ServiceBus::Bridge, esb::BusConfig { // apparently this type is eventually ignored api_type: ZmqSocketType::Push, carrier: zeromq::Carrier::Socket(socket), router: None, queued: true, topic: None, }, ) } #[cfg(feature = "node")] pub fn run_loop(mut self) -> Result<(), Error> { let identity = self.esb.handler().identity(); if !self.is_broker() { std::thread::sleep(core::time::Duration::from_secs(1)); self.esb.send_to( ServiceBus::Ctl, ServiceId::Farcasterd, BusMsg::Ctl(CtlMsg::Hello), )?; } else if identity != ServiceId::Farcasterd { warn!( "Not saying hello to Farcasterd: service {} is broker", identity ); } debug!( "New service {} with PID {} started", identity, std::process::id() ); self.esb.run_or_panic(&identity.to_string()); unreachable!() } } pub type Endpoints = esb::EndpointList; pub trait TryToServiceId { fn try_to_service_id(&self) -> Option; } impl TryToServiceId for ServiceId { fn try_to_service_id(&self) -> Option { Some(self.clone()) } } impl TryToServiceId for &Option { fn try_to_service_id(&self) -> Option { (*self).clone() } } impl TryToServiceId for Option { fn try_to_service_id(&self) -> Option { self.clone() } } pub trait Reporter where Self: esb::Handler, esb::Error: From, { fn report_to(&self) -> Option; fn report_success_message( &mut self, endpoints: &mut Endpoints, msg: Option, ) -> Result<(), Error> { if let Some(dest) = self.report_to() { endpoints.send_to( ServiceBus::Ctl, self.identity(), dest, BusMsg::Ctl(CtlMsg::Success(msg.map(|m| m.to_string()).into())), )?; } Ok(()) } fn report_progress( &mut self, endpoints: &mut Endpoints, progress: Progress, ) -> Result<(), Error> { if let Some(dest) = self.report_to() { endpoints.send_to( ServiceBus::Ctl, self.identity(), dest, BusMsg::Ctl(CtlMsg::Progress(progress)), )?; } Ok(()) } fn report_progress_message( &mut self, endpoints: &mut Endpoints, msg: impl ToString, ) -> Result<(), Error> { if let Some(dest) = self.report_to() { endpoints.send_to( ServiceBus::Ctl, self.identity(), dest, BusMsg::Ctl(CtlMsg::Progress(Progress::Message(msg.to_string()))), )?; } Ok(()) } fn report_failure(&mut self, endpoints: &mut Endpoints, failure: Failure) -> Error { if let Some(dest) = self.report_to() { // Even if we fail, we still have to terminate :) let _ = endpoints.send_to( ServiceBus::Ctl, self.identity(), dest, BusMsg::Ctl(CtlMsg::Failure(failure.clone())), ); } Error::Terminate(failure.to_string()) } } pub trait CtlServer where Self: esb::Handler, esb::Error: From, { fn send_ctl( &mut self, endpoints: &mut Endpoints, dest: impl TryToServiceId, request: BusMsg, ) -> Result<(), Error> { if let Some(dest) = dest.try_to_service_id() { endpoints.send_to(ServiceBus::Ctl, self.identity(), dest, request)?; } Ok(()) } fn send_client_ctl( &mut self, endpoints: &mut Endpoints, dest: ServiceId, request: CtlMsg, ) -> Result<(), Error> { let bus = ServiceBus::Ctl; if let ServiceId::GrpcdClient(_) = dest { endpoints.send_to(bus, dest, ServiceId::Grpcd, BusMsg::Ctl(request))?; } else { endpoints.send_to(bus, self.identity(), dest, BusMsg::Ctl(request))?; } Ok(()) } fn send_client_info( &mut self, endpoints: &mut Endpoints, dest: ServiceId, request: InfoMsg, ) -> Result<(), Error> { let bus = ServiceBus::Info; if let ServiceId::GrpcdClient(_) = dest { endpoints.send_to(bus, dest, ServiceId::Grpcd, BusMsg::Info(request))?; } else { endpoints.send_to(bus, self.identity(), dest, BusMsg::Info(request))?; } Ok(()) } } pub type SwapDetails = (Option, Option, Option); pub trait SwapLogging { fn swap_details(&self) -> SwapDetails; fn log_info(&self, msg: impl std::fmt::Display) { info!("{} | {}", self.log_prefix(), msg); } fn log_error(&self, msg: impl std::fmt::Display) { error!("{} | {}", self.log_prefix(), msg); } fn log_debug(&self, msg: impl std::fmt::Display) { debug!("{} | {}", self.log_prefix(), msg); } fn log_trace(&self, msg: impl std::fmt::Display) { trace!("{} | {}", self.log_prefix(), msg); } fn log_warn(&self, msg: impl std::fmt::Display) { warn!("{} | {}", self.log_prefix(), msg); } fn log_prefix(&self) -> colored::ColoredString { match self.swap_details() { (Some(swap_id), Some(swap_role), Some(trade_role)) => { format!("{} as {} {}", swap_id, swap_role, trade_role).bright_blue_italic() } (Some(swap_id), None, Some(trade_role)) => { format!("{} as {}", swap_id, trade_role).bright_blue_italic() } (Some(swap_id), Some(swap_role), None) => { format!("{} as {}", swap_id, swap_role).bright_blue_italic() } (None, Some(swap_role), Some(trade_role)) => { format!("… as {} {}", swap_role, trade_role).bright_blue_italic() } (None, None, Some(trade_role)) => format!("… as {}", trade_role).bright_blue_italic(), (None, Some(swap_role), None) => format!("… as {}", swap_role).bright_blue_italic(), _ => "…".bright_blue_italic(), } } } pub trait LogStyle: ToString { fn bright_blue_bold(&self) -> colored::ColoredString { self.to_string().bold().bright_blue() } fn bright_blue_italic(&self) -> colored::ColoredString { self.to_string().italic().bright_blue() } fn green_bold(&self) -> colored::ColoredString { self.to_string().bold().green() } fn red_bold(&self) -> colored::ColoredString { self.to_string().bold().red() } fn bright_green_bold(&self) -> colored::ColoredString { self.to_string().bold().bright_green() } fn bright_green_italic(&self) -> colored::ColoredString { self.to_string().italic().bright_green() } fn bright_yellow_italic(&self) -> colored::ColoredString { self.to_string().italic().bright_yellow() } fn bright_yellow_bold(&self) -> colored::ColoredString { self.to_string().bold().bright_yellow() } fn bright_white_italic(&self) -> colored::ColoredString { self.to_string().italic().bright_white() } fn bright_white_bold(&self) -> colored::ColoredString { self.to_string().bold().bright_white() } // Typed log styles // This is used to standardize color and fonts across the codebase // ---------------- fn swap_id(&self) -> colored::ColoredString { self.to_string().italic().bright_blue() } fn label(&self) -> colored::ColoredString { self.to_string().bold().bright_white() } fn addr(&self) -> colored::ColoredString { self.to_string().bold().bright_yellow() } fn tx_hash(&self) -> colored::ColoredString { self.to_string().italic().bright_yellow() } fn err(&self) -> colored::ColoredString { self.to_string().bold().bright_red() } fn err_details(&self) -> colored::ColoredString { self.to_string().bold().red() } } impl LogStyle for T where T: ToString {}