//! This module contains the implementation of the gossiping protocol for an individual topic use std::{ collections::VecDeque, time::{Duration, Instant}, }; use bytes::Bytes; use derive_more::From; use rand::Rng; use rand_core::SeedableRng; use serde::{Deserialize, Serialize}; use super::{ hyparview::{self, InEvent as SwarmIn}, plumtree::{self, GossipEvent, InEvent as GossipIn, Scope}, state::MessageKind, PeerData, PeerIdentity, }; /// The default maximum size in bytes for a gossip message. /// This is a sane but arbitrary default and can be changed in the [`Config`]. pub const DEFAULT_MAX_MESSAGE_SIZE: usize = 4096; /// Input event to the topic state handler. #[derive(Clone, Debug)] pub enum InEvent { /// Message received from the network. RecvMessage(PI, Message), /// Execute a command from the application. Command(Command), /// Trigger a previously scheduled timer. TimerExpired(Timer), /// Peer disconnected on the network level. PeerDisconnected(PI), /// Update the opaque peer data about yourself. UpdatePeerData(PeerData), } /// An output event from the state handler. #[derive(Debug, PartialEq, Eq)] pub enum OutEvent { /// Send a message on the network SendMessage(PI, Message), /// Emit an event to the application. EmitEvent(Event), /// Schedule a timer. The runtime is responsible for sending an [InEvent::TimerExpired] /// after the duration. ScheduleTimer(Duration, Timer), /// Close the connection to a peer on the network level. DisconnectPeer(PI), /// Emitted when new [`PeerData`] was received for a peer. PeerData(PI, PeerData), } impl From> for OutEvent { fn from(event: hyparview::OutEvent) -> Self { use hyparview::OutEvent::*; match event { SendMessage(to, message) => Self::SendMessage(to, message.into()), ScheduleTimer(delay, timer) => Self::ScheduleTimer(delay, timer.into()), DisconnectPeer(peer) => Self::DisconnectPeer(peer), EmitEvent(event) => Self::EmitEvent(event.into()), PeerData(peer, data) => Self::PeerData(peer, data), } } } impl From> for OutEvent { fn from(event: plumtree::OutEvent) -> Self { use plumtree::OutEvent::*; match event { SendMessage(to, message) => Self::SendMessage(to, message.into()), ScheduleTimer(delay, timer) => Self::ScheduleTimer(delay, timer.into()), EmitEvent(event) => Self::EmitEvent(event.into()), } } } /// A trait for a concrete type to push `OutEvent`s to. /// /// The implementation is generic over this trait, which allows the upper layer to supply a /// container of their choice for `OutEvent`s emitted from the protocol state. pub trait IO { /// Push an event in the IO container fn push(&mut self, event: impl Into>); /// Push all events from an iterator into the IO container fn push_from_iter(&mut self, iter: impl IntoIterator>>) { for event in iter.into_iter() { self.push(event); } } } /// A protocol message for a particular topic #[derive(From, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub enum Message { /// A message of the swarm membership layer Swarm(hyparview::Message), /// A message of the gossip broadcast layer Gossip(plumtree::Message), } impl Message { /// Get the kind of this message pub fn kind(&self) -> MessageKind { match self { Message::Swarm(_) => MessageKind::Control, Message::Gossip(message) => match message { plumtree::Message::Gossip(_) => MessageKind::Data, _ => MessageKind::Control, }, } } } /// An event to be emitted to the application for a particular topic. #[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)] pub enum Event { /// We have a new, direct neighbor in the swarm membership layer for this topic NeighborUp(PI), /// We dropped direct neighbor in the swarm membership layer for this topic NeighborDown(PI), /// A gossip message was received for this topic Received(GossipEvent), } impl From> for Event { fn from(value: hyparview::Event) -> Self { match value { hyparview::Event::NeighborUp(peer) => Self::NeighborUp(peer), hyparview::Event::NeighborDown(peer) => Self::NeighborDown(peer), } } } impl From> for Event { fn from(value: plumtree::Event) -> Self { match value { plumtree::Event::Received(event) => Self::Received(event), } } } /// A timer to be registered for a particular topic. /// /// This should be treated as an opaque value by the implementer and, once emitted, simply returned /// to the protocol through [`InEvent::TimerExpired`]. #[derive(Clone, From, Debug, PartialEq, Eq)] pub enum Timer { /// A timer for the swarm layer Swarm(hyparview::Timer), /// A timer for the gossip layer Gossip(plumtree::Timer), } /// A command to the protocol state for a particular topic. #[derive(Clone, derive_more::Debug)] pub enum Command { /// Join this topic and connect to peers. /// /// If the list of peers is empty, will prepare the state and accept incoming join requests, /// but only become operational after the first join request by another peer. Join(Vec), /// Broadcast a message for this topic. Broadcast(#[debug("<{}b>", _0.len())] Bytes, Scope), /// Leave this topic and drop all state. Quit, } impl IO for VecDeque> { fn push(&mut self, event: impl Into>) { self.push_back(event.into()) } } /// Protocol configuration #[derive(Clone, Debug)] pub struct Config { /// Configuration for the swarm membership layer pub membership: hyparview::Config, /// Configuration for the gossip broadcast layer pub broadcast: plumtree::Config, /// Max message size in bytes. /// /// This size should be the same across a network to ensure all nodes can transmit and read large messages. /// /// At minimum, this size should be large enough to send gossip control messages. This can vary, depending on the size of the [`PeerIdentity`] you use and the size of the [`PeerData`] you transmit in your messages. /// /// The default is [`DEFAULT_MAX_MESSAGE_SIZE`]. pub max_message_size: usize, } impl Default for Config { fn default() -> Self { Self { membership: Default::default(), broadcast: Default::default(), max_message_size: DEFAULT_MAX_MESSAGE_SIZE, } } } /// The topic state maintains the swarm membership and broadcast tree for a particular topic. #[derive(Debug)] pub struct State { me: PI, pub(crate) swarm: hyparview::State, pub(crate) gossip: plumtree::State, outbox: VecDeque>, stats: Stats, } impl State { /// Initialize the local state with the default random number generator. pub fn new(me: PI, me_data: Option, config: Config) -> Self { Self::with_rng(me, me_data, config, rand::rngs::StdRng::from_entropy()) } } impl State { /// The address of your local endpoint. pub fn endpoint(&self) -> &PI { &self.me } } impl State { /// Initialize the local state with a custom random number generator. pub fn with_rng(me: PI, me_data: Option, config: Config, rng: R) -> Self { Self { swarm: hyparview::State::new(me, me_data, config.membership, rng), gossip: plumtree::State::new(me, config.broadcast), me, outbox: VecDeque::new(), stats: Stats::default(), } } /// Handle an incoming event. /// /// Returns an iterator of outgoing events that must be processed by the application. pub fn handle( &mut self, event: InEvent, now: Instant, ) -> impl Iterator> + '_ { let io = &mut self.outbox; // Process the event, store out events in outbox. match event { InEvent::Command(command) => match command { Command::Join(peers) => { for peer in peers { self.swarm.handle(SwarmIn::RequestJoin(peer), now, io); } } Command::Broadcast(data, scope) => { self.gossip .handle(GossipIn::Broadcast(data, scope), now, io) } Command::Quit => self.swarm.handle(SwarmIn::Quit, now, io), }, InEvent::RecvMessage(from, message) => { self.stats.messages_received += 1; match message { Message::Swarm(message) => { self.swarm .handle(SwarmIn::RecvMessage(from, message), now, io) } Message::Gossip(message) => { self.gossip .handle(GossipIn::RecvMessage(from, message), now, io) } } } InEvent::TimerExpired(timer) => match timer { Timer::Swarm(timer) => self.swarm.handle(SwarmIn::TimerExpired(timer), now, io), Timer::Gossip(timer) => self.gossip.handle(GossipIn::TimerExpired(timer), now, io), }, InEvent::PeerDisconnected(peer) => { self.swarm.handle(SwarmIn::PeerDisconnected(peer), now, io); self.gossip.handle(GossipIn::NeighborDown(peer), now, io); } InEvent::UpdatePeerData(data) => { self.swarm.handle(SwarmIn::UpdatePeerData(data), now, io) } } // Forward NeighborUp and NeighborDown events from hyparview to plumtree let mut io = VecDeque::new(); for event in self.outbox.iter() { match event { OutEvent::EmitEvent(Event::NeighborUp(peer)) => { self.gossip .handle(GossipIn::NeighborUp(*peer), now, &mut io) } OutEvent::EmitEvent(Event::NeighborDown(peer)) => { self.gossip .handle(GossipIn::NeighborDown(*peer), now, &mut io) } _ => {} } } // Note that this is a no-op because plumtree::handle(NeighborUp | NeighborDown) // above does not emit any OutEvents. self.outbox.extend(io.drain(..)); // Update sent message counter self.stats.messages_sent += self .outbox .iter() .filter(|event| matches!(event, OutEvent::SendMessage(_, _))) .count(); self.outbox.drain(..) } /// Get stats on how many messages were sent and received /// /// TODO: Remove/replace with metrics? pub fn stats(&self) -> &Stats { &self.stats } /// Get statistics for the gossip broadcast state /// /// TODO: Remove/replace with metrics? pub fn gossip_stats(&self) -> &plumtree::Stats { self.gossip.stats() } /// Check if this topic has any active (connected) peers. pub fn has_active_peers(&self) -> bool { !self.swarm.active_view.is_empty() } } /// Statistics for the protocol state of a topic #[derive(Clone, Debug, Default)] pub struct Stats { /// Number of messages sent pub messages_sent: usize, /// Number of messages received pub messages_received: usize, }