//! Defines the [`Communicator`] trait and related types. use std::convert::TryFrom; use std::convert::TryInto; use std::future::Future; use std::sync::Arc; use crate::invocation; use crate::invocation::Invocation; use crate::log_entry::LogEntry; use crate::AcceptError; use crate::CommitError; use crate::Conflict; use crate::CoordNum; use crate::NodeInfo; use crate::PrepareError; use crate::Promise; use crate::RoundNum; /// Shorthand to extract `Abstain` type out of `C`. pub type AbstainOf = ::Abstain; /// Shorthand to extract `CoordNum` type out of `C`. pub type CoordNumOf = ::CoordNum; /// Shorthand to extract `Error` type out of `C`. pub type ErrorOf = ::Error; /// Shorthand to extract `LogEntry` type out of `C`. pub type LogEntryOf = ::LogEntry; /// Shorthand to extract log entry `Id` type out of `C`. pub type LogEntryIdOf = as LogEntry>::Id; /// Shorthand to extract `Nay` type out of `C`. pub type NayOf = ::Nay; /// Shorthand to extract `Node` type (`impl NodeInfo`) out of `C`. pub type NodeOf = ::Node; /// Shorthand to extract node (`impl NodeInfo`) `Id` type out of `C`. pub type NodeIdOf = as NodeInfo>::Id; /// Shorthand to extract `RoundNum` type out of `C`. pub type RoundNumOf = ::RoundNum; /// Shorthand to extract `Yea` type out of `C`. pub type YeaOf = ::Yea; /// Invokes `Acceptance` type constructor so as to be compatible with `C`. pub type AcceptanceFor = Acceptance, LogEntryOf, YeaOf, NayOf>; /// Invokes `Conflict` type constructor so as to be compatible with `C`. pub type ConflictFor = Conflict, LogEntryOf>; /// Invokes `Promise` type constructor so as to be compatible with `C`. pub type PromiseFor = Promise, CoordNumOf, LogEntryOf>; /// Invokes `Vote` type constructor so as to be compatible with `C`. pub type VoteFor = Vote, CoordNumOf, LogEntryOf, AbstainOf>; /// Defines how [`Node`][crate::Node]s call others' /// [`RequestHandler`][crate::RequestHandler]s. /// /// The [simplest possible /// implementation][crate::prototyping::DirectCommunicator] directly calls /// `RequestHandler` methods, requiring that all nodes live in the same process. /// This is useful for prototyping and testing. Most other use cases require a /// different, custom implementation. /// /// # Soundness /// /// Implementations must be secure, i.e. prevent forgery and replay attacks. /// This implicitly means that a restarted node will not see messages intended /// for its previous run, i.e. delayed messages sent over a connectionless /// protocol. Failure to shield a node from such messages may cause it to come /// out of passive participation mode early and lead to inconsistency. pub trait Communicator: Send + Sized + 'static { /// NodeInfo type Node: NodeInfo; /// The round number type. type RoundNum: RoundNum; /// The coordination number type. type CoordNum: CoordNum; /// The log entry type. type LogEntry: LogEntry; /// The communication error type. type Error: std::fmt::Debug + Send + Sync + 'static; /// Type of future returned from `send_prepare`. type SendPrepare: Future, Self::Error>> + Send; /// Information sent along with abstentions. type Abstain: std::fmt::Debug + Send + Sync + 'static; /// Type of future returned from `send_proposal`. type SendProposal: Future, Self::Error>> + Send; /// Information sent along with yea votes. type Yea: std::fmt::Debug + Send + Sync + 'static; /// Information sent along with nay votes. type Nay: std::fmt::Debug + Send + Sync + 'static; /// Type of future returned from `send_commit`. type SendCommit: Future> + Send; /// Type of future returned from `send_commit_by_id`. type SendCommitById: Future> + Send; /// Send a prepare message to all `receivers`. /// /// Implementations should attempt to call each receivers' /// [`RequestHandler::handle_prepare`][crate::RequestHandler:: /// handle_prepare]. The return value must contain exactly one entry per /// receiver with a future of `handle_prepare`'s result. fn send_prepare<'a>( &mut self, receivers: &'a [Self::Node], round_num: Self::RoundNum, coord_num: Self::CoordNum, ) -> Vec<(&'a Self::Node, Self::SendPrepare)>; /// Send a proposal message to all `receivers`. /// /// Implementations should attempt to call each receivers' /// [`RequestHandler::handle_proposal`][crate::RequestHandler:: /// handle_proposal]. The return value must contain exactly one entry per /// receiver with a future of `handle_proposal`'s result. fn send_proposal<'a>( &mut self, receivers: &'a [Self::Node], round_num: Self::RoundNum, coord_num: Self::CoordNum, log_entry: Arc, ) -> Vec<(&'a Self::Node, Self::SendProposal)>; /// Send a commit message to all `receivers`. /// /// Implementations should attempt to call each receivers' /// [`RequestHandler::handle_commit`][crate::RequestHandler:: /// handle_commit]. The return value must contain exactly one entry per /// receiver with a future of `handle_commit`'s result. fn send_commit<'a>( &mut self, receivers: &'a [Self::Node], round_num: Self::RoundNum, coord_num: Self::CoordNum, log_entry: Arc, ) -> Vec<(&'a Self::Node, Self::SendCommit)>; /// Send a commit-by-id message to all `receivers`. /// /// Implementations should attempt to call each receivers' /// [`RequestHandler::handle_commit_by_id`][crate::RequestHandler:: /// handle_commit_by_id]. The return value must contain exactly one entry /// per receiver with a future of `handle_commit_by_id`'s result. fn send_commit_by_id<'a>( &mut self, receivers: &'a [Self::Node], round_num: Self::RoundNum, coord_num: Self::CoordNum, log_entry_id: ::Id, ) -> Vec<(&'a Self::Node, Self::SendCommitById)>; } /// A vote cast in a leader election. #[derive(Debug)] pub enum Vote { /// The node voted for the candidate. Given(Promise), /// The node couldn't vote for the candidate. Conflicted(Conflict), /// The node abstained, refusing to vote at all. Abstained(A), } impl TryFrom, PrepareError>> for Vote< invocation::RoundNumOf, invocation::CoordNumOf, invocation::LogEntryOf, invocation::AbstainOf, > { type Error = PrepareError; fn try_from( result: Result, PrepareError>, ) -> Result { result .map(Vote::Given) .or_else(|err| err.try_into().map(Vote::Conflicted)) } } impl TryFrom> for Conflict, invocation::LogEntryOf> { type Error = PrepareError; fn try_from(error: PrepareError) -> Result { match error { PrepareError::Supplanted(coord_num) => Ok(Conflict::Supplanted { coord_num }), PrepareError::Converged(coord_num, log_entry) => Ok(Conflict::Converged { coord_num, log_entry, }), _ => Err(error), } } } impl From, Conflict>> for Vote { fn from(result: Result, Conflict>) -> Self { match result { Ok(promise) => Vote::Given(promise), Err(rejection) => Vote::Conflicted(rejection), } } } /// A vote cast on a proposal. #[derive(Debug)] pub enum Acceptance { /// The node voted for the proposal. Given(Y), /// The node couldn't vote for the proposal. Conflicted(Conflict), /// The node voted against the proposal. Refused(X), } impl TryFrom, AcceptError>> for Acceptance< invocation::CoordNumOf, invocation::LogEntryOf, invocation::YeaOf, invocation::NayOf, > { type Error = AcceptError; fn try_from(result: Result, AcceptError>) -> Result { result .map(Acceptance::Given) .or_else(|err| err.try_into().map(Acceptance::Conflicted)) .or_else(|err| { if let AcceptError::Rejected(nay) = err { Ok(Acceptance::Refused(nay)) } else { Err(err) } }) } } impl TryFrom> for Conflict, invocation::LogEntryOf> { type Error = AcceptError; fn try_from(error: AcceptError) -> Result { match error { AcceptError::Supplanted(coord_num) => Ok(Conflict::Supplanted { coord_num }), AcceptError::Converged(coord_num, log_entry) => Ok(Conflict::Converged { coord_num, log_entry, }), _ => Err(error), } } } impl From>> for Acceptance { fn from(result: Result>) -> Self { result .map(Acceptance::Given) .unwrap_or_else(Acceptance::Conflicted) } } /// Node successfully committed the log entry. pub struct Committed; impl From<()> for Committed { fn from(_: ()) -> Self { Self } } impl TryFrom>> for Committed { type Error = CommitError; fn try_from(result: Result<(), CommitError>) -> Result { result.map(|_| Committed) } }