#![allow(missing_docs)] use std::task::Poll; use futures::future::BoxFuture; use futures::future::FutureExt; use num_traits::One; use crate::append::AppendArgs; use crate::applicable::ApplicableTo; use crate::decoration::Decoration; use crate::error::Disoriented; use crate::error::PollError; use crate::error::ShutDownOr; use crate::node::AppendResultFor; use crate::node::DelegatingNodeImpl; use crate::node::EventFor; use crate::node::InvocationOf; use crate::node::NodeIdOf; use crate::node::NodeImpl; use crate::node::NodeOf; use crate::node::Participation; use crate::node::RoundNumOf; use crate::node::SnapshotFor; use crate::node::StateOf; use crate::node_builder::ExtensibleNodeBuilder; use crate::retry::RetryPolicy; use crate::Node; /// Ensure leadership configuration. pub trait Config { /// The node type that is decorated. type Node: Node; /// Initializes this configuration. #[allow(unused_variables)] fn init(&mut self, node: &Self::Node) {} /// Updates the configuration with the given event. #[allow(unused_variables)] fn update(&mut self, event: &EventFor) {} /// Additional nodes to poll from. fn additional_nodes(&self) -> Vec> { Vec::new() } } pub struct StaticConfig(std::marker::PhantomData); impl Config for StaticConfig { type Node = N; } impl Default for StaticConfig { fn default() -> Self { Self(Default::default()) } } pub trait CatchUpBuilderExt { type Node: Node + 'static; type DecoratedBuilder + 'static>; fn catch_up(self, config: C) -> Self::DecoratedBuilder where C: Config + 'static; } impl CatchUpBuilderExt for B where B: ExtensibleNodeBuilder, B::Node: NodeImpl + 'static, { type Node = B::Node; type DecoratedBuilder + 'static> = B::DecoratedBuilder>; fn catch_up(self, config: C) -> Self::DecoratedBuilder where C: Config + 'static, { self.decorated_with(config) } } pub struct CatchUp where N: Node, C: Config, { decorated: N, config: C, // TODO allow for batched catch-up poll: Option>>>, } impl CatchUp where N: NodeImpl + 'static, C: Config, { fn initiate_poll(&mut self, round_num: RoundNumOf) { self.poll = Some( self.poll(round_num, self.config.additional_nodes()) .then(move |r| async move { match r { Ok(true) => Some(round_num + One::one()), // TODO back off in some fashion Ok(false) => { futures_timer::Delay::new(instant::Duration::from_secs(1)).await; Some(round_num) } // TODO back off properly Err(PollError::Decoration(_)) | Err(PollError::UselessResponses { .. }) => { futures_timer::Delay::new(instant::Duration::from_secs(2)).await; Some(round_num) } Err(PollError::Disoriented) | Err(PollError::LocallyConverged) | Err(PollError::NotRetained) | Err(PollError::ShutDown) => None, } }) .boxed(), ); } } impl Decoration for CatchUp where N: NodeImpl + 'static, C: Config + 'static, { type Arguments = C; type Decorated = N; fn wrap( decorated: Self::Decorated, mut arguments: Self::Arguments, ) -> Result> { arguments.init(&decorated); Ok(Self { decorated, config: arguments, poll: None, }) } fn peek_into(decorated: &Self) -> &Self::Decorated { &decorated.decorated } fn unwrap(decorated: Self) -> Self::Decorated { decorated.decorated } } impl Node for CatchUp where N: NodeImpl + 'static, C: Config, { type Invocation = InvocationOf; type Shutdown = ::Shutdown; fn id(&self) -> NodeIdOf { self.decorated.id() } fn status(&self) -> crate::NodeStatus { self.decorated.status() } fn participation(&self) -> Participation> { self.decorated.participation() } fn poll_events(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { let event = self.decorated.poll_events(cx); if let Poll::Ready(event) = &event { self.config.update(event); match event { crate::Event::Init { round, state: Some(_), .. } | crate::Event::Install { round, state: Some(_), .. } => { let next_round = *round + One::one(); self.initiate_poll(next_round); } _ => {} } } if let Some(p) = self.poll.as_mut() { match p.poll_unpin(cx) { Poll::Ready(Some(n)) => self.initiate_poll(n), Poll::Ready(None) => self.poll = None, Poll::Pending => {} } } event } fn handle(&self) -> crate::node::HandleFor { self.decorated.handle() } fn prepare_snapshot(&self) -> BoxFuture<'static, SnapshotFor> { self.decorated.prepare_snapshot() } fn affirm_snapshot( &self, snapshot: SnapshotFor, ) -> BoxFuture<'static, Result<(), crate::error::AffirmSnapshotError>> { self.decorated.affirm_snapshot(snapshot) } fn install_snapshot( &self, snapshot: SnapshotFor, ) -> BoxFuture<'static, Result<(), crate::error::InstallSnapshotError>> { self.decorated.install_snapshot(snapshot) } fn read_stale(&self, f: F) -> BoxFuture<'_, Result> where F: FnOnce(&StateOf) -> T + Send + 'static, T: Send + 'static, { self.decorated.read_stale(f) } fn read_stale_infallibly(&self, f: F) -> BoxFuture<'_, T> where F: FnOnce(Option<&StateOf>) -> T + Send + 'static, T: Send + 'static, { self.decorated.read_stale_infallibly(f) } fn read_stale_scoped<'read, F, T>(&self, f: F) -> BoxFuture<'read, Result> where F: FnOnce(&StateOf) -> T + Send + 'read, T: Send + 'static, { self.decorated.read_stale_scoped(f) } fn read_stale_scoped_infallibly<'read, F, T>(&self, f: F) -> BoxFuture<'read, T> where F: FnOnce(Option<&StateOf>) -> T + Send + 'read, T: Send + 'static, { self.decorated.read_stale_scoped_infallibly(f) } fn append( &mut self, applicable: A, args: P, ) -> futures::future::BoxFuture<'static, AppendResultFor> where A: ApplicableTo> + 'static, P: Into>, R: RetryPolicy, R::StaticError: From>, { self.decorated.append(applicable, args) } fn shut_down(self) -> Self::Shutdown { self.decorated.shut_down() } } impl DelegatingNodeImpl for CatchUp where N: NodeImpl + 'static, C: Config, { type Delegate = N; fn delegate(&mut self) -> &mut Self::Delegate { &mut self.decorated } }