// Copyright (C) Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Polkadot is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . //! Utilities for handling distribution of backed candidates along the grid (outside the group to //! the rest of the network). //! //! The grid uses the gossip topology defined in [`polkadot_node_network_protocol::grid_topology`]. //! It defines how messages and statements are forwarded between validators. //! //! # Protocol //! //! - Once the candidate is backed, produce a 'backed candidate packet' `(CommittedCandidateReceipt, //! Statements)`. //! - Members of a backing group produce an announcement of a fully-backed candidate (aka "full //! manifest") when they are finished. //! - `BackedCandidateManifest` //! - Manifests are sent along the grid topology to peers who have the relay-parent in their //! implicit view. //! - Only sent by 1st-hop nodes after downloading the backed candidate packet. //! - The grid topology is a 2-dimensional grid that provides either a 1 or 2-hop path from any //! originator to any recipient - 1st-hop nodes are those which share either a row or column //! with the originator, and 2nd-hop nodes are those which share a column or row with that //! 1st-hop node. //! - Note that for the purposes of statement distribution, we actually take the union of the //! routing paths from each validator in a group to the local node to determine the sending //! and receiving paths. //! - Ignored when received out-of-topology //! - On every local view change, members of the backing group rebroadcast the manifest for all //! candidates under every new relay-parent across the grid. //! - Nodes should send a `BackedCandidateAcknowledgement(CandidateHash, StatementFilter)` //! notification to any peer which has sent a manifest, and the candidate has been acquired by //! other means. //! - Request/response for the candidate + votes. //! - Ignore if they are inconsistent with the manifest. //! - A malicious backing group is capable of producing an unbounded number of backed candidates. //! - We request the candidate only if the candidate is a hypothetical member in any of our //! fragment chains, and: //! - All members of the group attempt to circulate all statements (in compact form) from the rest //! of the group on candidates that have already been backed. //! - They do this via the grid topology. //! - They add the statements to their backed candidate packet for future requestors, and also: //! - send the statement to any peer, which: //! - we advertised the backed candidate to (sent manifest), and: //! - has previously & successfully requested the backed candidate packet, or: //! - which has sent a `BackedCandidateAcknowledgement` //! - 1st-hop nodes do the same thing use polkadot_node_network_protocol::{grid_topology::SessionGridTopology, v2::StatementFilter}; use polkadot_primitives::{CandidateHash, CompactStatement, GroupIndex, Hash, ValidatorIndex}; use std::collections::{ hash_map::{Entry, HashMap}, HashSet, }; use bitvec::{order::Lsb0, slice::BitSlice}; use super::{groups::Groups, LOG_TARGET}; /// Our local view of a subset of the grid topology organized around a specific validator /// group. /// /// This tracks which authorities we expect to communicate with concerning /// candidates from the group. This includes both the authorities we are /// expected to send to as well as the authorities we expect to receive from. /// /// In the case that this group is the group that we are locally assigned to, /// the 'receiving' side will be empty. #[derive(Debug, PartialEq)] struct GroupSubView { // validators we are 'sending' to. sending: HashSet, // validators we are 'receiving' from. receiving: HashSet, } /// Our local view of the topology for a session, as it pertains to backed /// candidate distribution. #[derive(Debug)] pub struct SessionTopologyView { group_views: HashMap, } impl SessionTopologyView { /// Returns an iterator over all validator indices from the group who are allowed to /// send us manifests of the given kind. pub fn iter_sending_for_group( &self, group: GroupIndex, kind: ManifestKind, ) -> impl Iterator + '_ { self.group_views.get(&group).into_iter().flat_map(move |sub| match kind { ManifestKind::Full => sub.receiving.iter().cloned(), ManifestKind::Acknowledgement => sub.sending.iter().cloned(), }) } } /// Build a view of the topology for the session. /// For groups that we are part of: we receive from nobody and send to our X/Y peers. /// For groups that we are not part of: we receive from any validator in the group we share a slice /// with and send to the corresponding X/Y slice in the other dimension. /// For any validators we don't share a slice with, we receive from the nodes /// which share a slice with them. pub fn build_session_topology<'a>( groups: impl IntoIterator>, topology: &SessionGridTopology, our_index: Option, ) -> SessionTopologyView { let mut view = SessionTopologyView { group_views: HashMap::new() }; let our_index = match our_index { None => return view, Some(i) => i, }; let our_neighbors = match topology.compute_grid_neighbors_for(our_index) { None => { gum::warn!(target: LOG_TARGET, ?our_index, "our index unrecognized in topology?"); return view }, Some(n) => n, }; for (i, group) in groups.into_iter().enumerate() { let mut sub_view = GroupSubView { sending: HashSet::new(), receiving: HashSet::new() }; if group.contains(&our_index) { sub_view.sending.extend(our_neighbors.validator_indices_x.iter().cloned()); sub_view.sending.extend(our_neighbors.validator_indices_y.iter().cloned()); // remove all other same-group validators from this set, they are // in the cluster. // TODO [now]: test this behavior. for v in group { sub_view.sending.remove(v); } } else { for &group_val in group { // If the validator shares a slice with us, we expect to // receive from them and send to our neighbors in the other // dimension. if our_neighbors.validator_indices_x.contains(&group_val) { sub_view.receiving.insert(group_val); sub_view.sending.extend( our_neighbors .validator_indices_y .iter() .filter(|v| !group.contains(v)) .cloned(), ); continue } if our_neighbors.validator_indices_y.contains(&group_val) { sub_view.receiving.insert(group_val); sub_view.sending.extend( our_neighbors .validator_indices_x .iter() .filter(|v| !group.contains(v)) .cloned(), ); continue } // If they don't share a slice with us, we don't send to anybody // but receive from any peers sharing a dimension with both of us let their_neighbors = match topology.compute_grid_neighbors_for(group_val) { None => { gum::warn!( target: LOG_TARGET, index = ?group_val, "validator index unrecognized in topology?" ); continue }, Some(n) => n, }; // their X, our Y for potential_link in &their_neighbors.validator_indices_x { if our_neighbors.validator_indices_y.contains(potential_link) { sub_view.receiving.insert(*potential_link); break // one max } } // their Y, our X for potential_link in &their_neighbors.validator_indices_y { if our_neighbors.validator_indices_x.contains(potential_link) { sub_view.receiving.insert(*potential_link); break // one max } } } } view.group_views.insert(GroupIndex(i as _), sub_view); } view } /// The kind of backed candidate manifest we should send to a remote peer. #[derive(Debug, Clone, Copy, PartialEq)] pub enum ManifestKind { /// Full manifests contain information about the candidate and should be sent /// to peers which aren't guaranteed to have the candidate already. Full, /// Acknowledgement manifests omit information which is implicit in the candidate /// itself, and should be sent to peers which are guaranteed to have the candidate /// already. Acknowledgement, } /// A tracker of knowledge from authorities within the grid for a particular /// relay-parent. #[derive(Default)] pub struct GridTracker { received: HashMap, confirmed_backed: HashMap, unconfirmed: HashMap>, pending_manifests: HashMap>, // maps target to (originator, statement) pairs. pending_statements: HashMap>, } impl GridTracker { /// Attempt to import a manifest advertised by a remote peer. /// /// This checks whether the peer is allowed to send us manifests /// about this group at this relay-parent. This also does sanity /// checks on the format of the manifest and the amount of votes /// it contains. It assumes that the votes from disabled validators /// are already filtered out. /// It has effects on the stored state only when successful. /// /// This returns a `bool` on success, which if true indicates that an acknowledgement is /// to be sent in response to the received manifest. This only occurs when the /// candidate is already known to be confirmed and backed. pub fn import_manifest( &mut self, session_topology: &SessionTopologyView, groups: &Groups, candidate_hash: CandidateHash, seconding_limit: usize, manifest: ManifestSummary, kind: ManifestKind, sender: ValidatorIndex, ) -> Result { let claimed_group_index = manifest.claimed_group_index; let group_topology = match session_topology.group_views.get(&manifest.claimed_group_index) { None => return Err(ManifestImportError::Disallowed), Some(g) => g, }; let receiving_from = group_topology.receiving.contains(&sender); let sending_to = group_topology.sending.contains(&sender); let manifest_allowed = match kind { // Peers can send manifests _if_: // * They are in the receiving set for the group AND the manifest is full OR // * They are in the sending set for the group AND we have sent them a manifest AND // the received manifest is partial. ManifestKind::Full => receiving_from, ManifestKind::Acknowledgement => sending_to && self.confirmed_backed .get(&candidate_hash) .map_or(false, |c| c.has_sent_manifest_to(sender)), }; if !manifest_allowed { return Err(ManifestImportError::Disallowed) } let (group_size, backing_threshold) = match groups.get_size_and_backing_threshold(manifest.claimed_group_index) { Some(x) => x, None => return Err(ManifestImportError::Malformed), }; let remote_knowledge = manifest.statement_knowledge.clone(); if !remote_knowledge.has_len(group_size) { return Err(ManifestImportError::Malformed) } if !remote_knowledge.has_seconded() { return Err(ManifestImportError::Malformed) } // ensure votes are sufficient to back. let votes = remote_knowledge.backing_validators(); if votes < backing_threshold { return Err(ManifestImportError::Insufficient) } self.received.entry(sender).or_default().import_received( group_size, seconding_limit, candidate_hash, manifest, )?; let mut ack = false; if let Some(confirmed) = self.confirmed_backed.get_mut(&candidate_hash) { if receiving_from && !confirmed.has_sent_manifest_to(sender) { // due to checks above, the manifest `kind` is guaranteed to be `Full` self.pending_manifests .entry(sender) .or_default() .insert(candidate_hash, ManifestKind::Acknowledgement); ack = true; } // add all statements in local_knowledge & !remote_knowledge // to `pending_statements` for this validator. confirmed.manifest_received_from(sender, remote_knowledge); if let Some(pending_statements) = confirmed.pending_statements(sender) { self.pending_statements.entry(sender).or_default().extend( decompose_statement_filter( groups, claimed_group_index, candidate_hash, &pending_statements, ), ); } } else { // `received` prevents conflicting manifests so this is max 1 per validator. self.unconfirmed .entry(candidate_hash) .or_default() .push((sender, claimed_group_index)) } Ok(ack) } /// Add a new backed candidate to the tracker. This yields /// a list of validators which we should either advertise to /// or signal that we know the candidate, along with the corresponding /// type of manifest we should send. pub fn add_backed_candidate( &mut self, session_topology: &SessionTopologyView, candidate_hash: CandidateHash, group_index: GroupIndex, local_knowledge: StatementFilter, ) -> Vec<(ValidatorIndex, ManifestKind)> { let c = match self.confirmed_backed.entry(candidate_hash) { Entry::Occupied(_) => return Vec::new(), Entry::Vacant(v) => v.insert(KnownBackedCandidate { group_index, mutual_knowledge: HashMap::new(), local_knowledge, }), }; // Populate the entry with previously unconfirmed manifests. for (v, claimed_group_index) in self.unconfirmed.remove(&candidate_hash).into_iter().flatten() { if claimed_group_index != group_index { // This is misbehavior, but is handled more comprehensively elsewhere continue } let statement_filter = self .received .get(&v) .and_then(|r| r.candidate_statement_filter(&candidate_hash)) .expect("unconfirmed is only populated by validators who have sent manifest; qed"); // No need to send direct statements, because our local knowledge is `None` c.manifest_received_from(v, statement_filter); } let group_topology = match session_topology.group_views.get(&group_index) { None => return Vec::new(), Some(g) => g, }; // advertise onwards and accept received advertisements let sending_group_manifests = group_topology.sending.iter().map(|v| (*v, ManifestKind::Full)); let receiving_group_manifests = group_topology.receiving.iter().filter_map(|v| { if c.has_received_manifest_from(*v) { Some((*v, ManifestKind::Acknowledgement)) } else { None } }); // Note that order is important: if a validator is part of both the sending // and receiving groups, we may overwrite a `Full` manifest with a `Acknowledgement` // one. for (v, manifest_mode) in sending_group_manifests.chain(receiving_group_manifests) { gum::trace!( target: LOG_TARGET, validator_index = ?v, ?manifest_mode, "Preparing to send manifest/acknowledgement" ); self.pending_manifests .entry(v) .or_default() .insert(candidate_hash, manifest_mode); } self.pending_manifests .iter() .filter_map(|(v, x)| x.get(&candidate_hash).map(|k| (*v, *k))) .collect() } /// Note that a backed candidate has been advertised to a /// given validator. pub fn manifest_sent_to( &mut self, groups: &Groups, validator_index: ValidatorIndex, candidate_hash: CandidateHash, local_knowledge: StatementFilter, ) { if let Some(c) = self.confirmed_backed.get_mut(&candidate_hash) { c.manifest_sent_to(validator_index, local_knowledge); if let Some(pending_statements) = c.pending_statements(validator_index) { self.pending_statements.entry(validator_index).or_default().extend( decompose_statement_filter( groups, c.group_index, candidate_hash, &pending_statements, ), ); } } if let Some(x) = self.pending_manifests.get_mut(&validator_index) { x.remove(&candidate_hash); } } /// Returns a vector of all candidates pending manifests for the specific validator, and /// the type of manifest we should send. pub fn pending_manifests_for( &self, validator_index: ValidatorIndex, ) -> Vec<(CandidateHash, ManifestKind)> { self.pending_manifests .get(&validator_index) .into_iter() .flat_map(|pending| pending.iter().map(|(c, m)| (*c, *m))) .collect() } /// Returns a statement filter indicating statements that a given peer /// is awaiting concerning the given candidate, constrained by the statements /// we have ourselves. pub fn pending_statements_for( &self, validator_index: ValidatorIndex, candidate_hash: CandidateHash, ) -> Option { self.confirmed_backed .get(&candidate_hash) .and_then(|x| x.pending_statements(validator_index)) } /// Returns a vector of all pending statements to the validator, sorted with /// `Seconded` statements at the front. /// /// Statements are in the form `(Originator, Statement Kind)`. pub fn all_pending_statements_for( &self, validator_index: ValidatorIndex, ) -> Vec<(ValidatorIndex, CompactStatement)> { let mut v = self .pending_statements .get(&validator_index) .map(|x| x.iter().cloned().collect()) .unwrap_or(Vec::new()); v.sort_by_key(|(_, s)| match s { CompactStatement::Seconded(_) => 0u32, CompactStatement::Valid(_) => 1u32, }); v } /// Whether a validator can request a manifest from us. pub fn can_request(&self, validator: ValidatorIndex, candidate_hash: CandidateHash) -> bool { self.confirmed_backed.get(&candidate_hash).map_or(false, |c| { c.has_sent_manifest_to(validator) && !c.has_received_manifest_from(validator) }) } /// Determine the validators which can send a statement to us by direct broadcast. /// /// Returns a list of tuples representing each potential sender(ValidatorIndex) /// and if the sender should already know about the statement, because we just /// sent it to it. pub fn direct_statement_providers( &self, groups: &Groups, originator: ValidatorIndex, statement: &CompactStatement, ) -> Vec<(ValidatorIndex, bool)> { let (g, c_h, kind, in_group) = match extract_statement_and_group_info(groups, originator, statement) { None => return Vec::new(), Some(x) => x, }; self.confirmed_backed .get(&c_h) .map(|k| k.direct_statement_senders(g, in_group, kind)) .unwrap_or_default() } /// Determine the validators which can receive a statement from us by direct /// broadcast. pub fn direct_statement_targets( &self, groups: &Groups, originator: ValidatorIndex, statement: &CompactStatement, ) -> Vec { let (g, c_h, kind, in_group) = match extract_statement_and_group_info(groups, originator, statement) { None => return Vec::new(), Some(x) => x, }; self.confirmed_backed .get(&c_h) .map(|k| k.direct_statement_recipients(g, in_group, kind)) .unwrap_or_default() } /// Note that we have learned about a statement. This will update /// `pending_statements_for` for any relevant validators if actually /// fresh. pub fn learned_fresh_statement( &mut self, groups: &Groups, session_topology: &SessionTopologyView, originator: ValidatorIndex, statement: &CompactStatement, ) { let (g, c_h, kind, in_group) = match extract_statement_and_group_info(groups, originator, statement) { None => return, Some(x) => x, }; let known = match self.confirmed_backed.get_mut(&c_h) { None => return, Some(x) => x, }; if !known.note_fresh_statement(in_group, kind) { return } // Add to `pending_statements` for all validators we communicate with // who have exchanged manifests. let all_group_validators = session_topology .group_views .get(&g) .into_iter() .flat_map(|g| g.sending.iter().chain(g.receiving.iter())); for v in all_group_validators { if known.is_pending_statement(*v, in_group, kind) { self.pending_statements .entry(*v) .or_default() .insert((originator, statement.clone())); } } } /// Note that a direct statement about a given candidate was sent to or /// received from the given validator. pub fn sent_or_received_direct_statement( &mut self, groups: &Groups, originator: ValidatorIndex, counterparty: ValidatorIndex, statement: &CompactStatement, received: bool, ) { if let Some((_, c_h, kind, in_group)) = extract_statement_and_group_info(groups, originator, statement) { if let Some(known) = self.confirmed_backed.get_mut(&c_h) { known.sent_or_received_direct_statement(counterparty, in_group, kind, received); if let Some(pending) = self.pending_statements.get_mut(&counterparty) { pending.remove(&(originator, statement.clone())); } } } } /// Get the advertised statement filter of a validator for a candidate. pub fn advertised_statements( &self, validator: ValidatorIndex, candidate_hash: &CandidateHash, ) -> Option { self.received.get(&validator)?.candidate_statement_filter(candidate_hash) } #[cfg(test)] fn is_manifest_pending_for( &self, validator: ValidatorIndex, candidate_hash: &CandidateHash, ) -> Option { self.pending_manifests .get(&validator) .and_then(|m| m.get(candidate_hash)) .map(|x| *x) } } fn extract_statement_and_group_info( groups: &Groups, originator: ValidatorIndex, statement: &CompactStatement, ) -> Option<(GroupIndex, CandidateHash, StatementKind, usize)> { let (statement_kind, candidate_hash) = match statement { CompactStatement::Seconded(h) => (StatementKind::Seconded, h), CompactStatement::Valid(h) => (StatementKind::Valid, h), }; let group = match groups.by_validator_index(originator) { None => return None, Some(g) => g, }; let index_in_group = groups.get(group)?.iter().position(|v| v == &originator)?; Some((group, *candidate_hash, statement_kind, index_in_group)) } fn decompose_statement_filter<'a>( groups: &'a Groups, group_index: GroupIndex, candidate_hash: CandidateHash, statement_filter: &'a StatementFilter, ) -> impl Iterator + 'a { groups.get(group_index).into_iter().flat_map(move |g| { let s = statement_filter .seconded_in_group .iter_ones() .map(|i| g[i]) .map(move |i| (i, CompactStatement::Seconded(candidate_hash))); let v = statement_filter .validated_in_group .iter_ones() .map(|i| g[i]) .map(move |i| (i, CompactStatement::Valid(candidate_hash))); s.chain(v) }) } /// A summary of a manifest being sent by a counterparty. #[derive(Debug, Clone)] pub struct ManifestSummary { /// The claimed parent head data hash of the candidate. pub claimed_parent_hash: Hash, /// The claimed group index assigned to the candidate. pub claimed_group_index: GroupIndex, /// A statement filter sent alongisde the candidate, communicating /// knowledge. pub statement_knowledge: StatementFilter, } /// Errors in importing a manifest. #[derive(Debug, Clone)] pub enum ManifestImportError { /// The manifest conflicts with another, previously sent manifest. Conflicting, /// The manifest has overflowed beyond the limits of what the /// counterparty was allowed to send us. Overflow, /// The manifest claims insufficient attestations to achieve the backing /// threshold. Insufficient, /// The manifest is malformed. Malformed, /// The manifest was not allowed to be sent. Disallowed, } /// The knowledge we are aware of counterparties having of manifests. #[derive(Default)] struct ReceivedManifests { received: HashMap, // group -> seconded counts. seconded_counts: HashMap>, } impl ReceivedManifests { fn candidate_statement_filter( &self, candidate_hash: &CandidateHash, ) -> Option { self.received.get(candidate_hash).map(|m| m.statement_knowledge.clone()) } /// Attempt to import a received manifest from a counterparty. /// /// This will reject manifests which are either duplicate, conflicting, /// or imply an irrational amount of `Seconded` statements. /// /// This assumes that the manifest has already been checked for /// validity - i.e. that the bitvecs match the claimed group in size /// and that the manifest includes at least one `Seconded` /// attestation and includes enough attestations for the candidate /// to be backed. /// /// This also should only be invoked when we are intended to track /// the knowledge of this peer as determined by the [`SessionTopology`]. fn import_received( &mut self, group_size: usize, seconding_limit: usize, candidate_hash: CandidateHash, manifest_summary: ManifestSummary, ) -> Result<(), ManifestImportError> { match self.received.entry(candidate_hash) { Entry::Occupied(mut e) => { // occupied entry. // filter out clearly conflicting data. { let prev = e.get(); if prev.claimed_group_index != manifest_summary.claimed_group_index { return Err(ManifestImportError::Conflicting) } if prev.claimed_parent_hash != manifest_summary.claimed_parent_hash { return Err(ManifestImportError::Conflicting) } if !manifest_summary .statement_knowledge .seconded_in_group .contains(&prev.statement_knowledge.seconded_in_group) { return Err(ManifestImportError::Conflicting) } if !manifest_summary .statement_knowledge .validated_in_group .contains(&prev.statement_knowledge.validated_in_group) { return Err(ManifestImportError::Conflicting) } let mut fresh_seconded = manifest_summary.statement_knowledge.seconded_in_group.clone(); fresh_seconded |= &prev.statement_knowledge.seconded_in_group; let within_limits = updating_ensure_within_seconding_limit( &mut self.seconded_counts, manifest_summary.claimed_group_index, group_size, seconding_limit, &fresh_seconded, ); if !within_limits { return Err(ManifestImportError::Overflow) } } // All checks passed. Overwrite: guaranteed to be // superset. *e.get_mut() = manifest_summary; Ok(()) }, Entry::Vacant(e) => { let within_limits = updating_ensure_within_seconding_limit( &mut self.seconded_counts, manifest_summary.claimed_group_index, group_size, seconding_limit, &manifest_summary.statement_knowledge.seconded_in_group, ); if within_limits { e.insert(manifest_summary); Ok(()) } else { Err(ManifestImportError::Overflow) } }, } } } // updates validator-seconded records but only if the new statements // are OK. returns `true` if alright and `false` otherwise. // // The seconding limit is a per-validator limit. It ensures an upper bound on the total number of // candidates entering the system. fn updating_ensure_within_seconding_limit( seconded_counts: &mut HashMap>, group_index: GroupIndex, group_size: usize, seconding_limit: usize, new_seconded: &BitSlice, ) -> bool { if seconding_limit == 0 { return false } // due to the check above, if this was non-existent this function will // always return `true`. let counts = seconded_counts.entry(group_index).or_insert_with(|| vec![0; group_size]); for i in new_seconded.iter_ones() { if counts[i] == seconding_limit { return false } } for i in new_seconded.iter_ones() { counts[i] += 1; } true } #[derive(Debug, Clone, Copy)] enum StatementKind { Seconded, Valid, } trait FilterQuery { fn contains(&self, index: usize, statement_kind: StatementKind) -> bool; fn set(&mut self, index: usize, statement_kind: StatementKind); } impl FilterQuery for StatementFilter { fn contains(&self, index: usize, statement_kind: StatementKind) -> bool { match statement_kind { StatementKind::Seconded => self.seconded_in_group.get(index).map_or(false, |x| *x), StatementKind::Valid => self.validated_in_group.get(index).map_or(false, |x| *x), } } fn set(&mut self, index: usize, statement_kind: StatementKind) { let b = match statement_kind { StatementKind::Seconded => self.seconded_in_group.get_mut(index), StatementKind::Valid => self.validated_in_group.get_mut(index), }; if let Some(mut b) = b { *b = true; } } } /// Knowledge that we have about a remote peer concerning a candidate, and that they have about us /// concerning the candidate. #[derive(Debug, Clone)] struct MutualKnowledge { /// Knowledge the remote peer has about the candidate, as far as we're aware. /// `Some` only if they have advertised, acknowledged, or requested the candidate. remote_knowledge: Option, /// Knowledge we have indicated to the remote peer about the candidate. /// `Some` only if we have advertised, acknowledged, or requested the candidate /// from them. local_knowledge: Option, /// Knowledge peer circulated to us, this is different from `local_knowledge` and /// `remote_knowledge`, through the fact that includes only statements that we received from /// peer while the other two, after manifest exchange part will include both what we sent to /// the peer and what we received from peer, see `sent_or_received_direct_statement` for more /// details. received_knowledge: Option, } // A utility struct for keeping track of metadata about candidates // we have confirmed as having been backed. #[derive(Debug, Clone)] struct KnownBackedCandidate { group_index: GroupIndex, local_knowledge: StatementFilter, mutual_knowledge: HashMap, } impl KnownBackedCandidate { fn has_received_manifest_from(&self, validator: ValidatorIndex) -> bool { self.mutual_knowledge .get(&validator) .map_or(false, |k| k.remote_knowledge.is_some()) } fn has_sent_manifest_to(&self, validator: ValidatorIndex) -> bool { self.mutual_knowledge .get(&validator) .map_or(false, |k| k.local_knowledge.is_some()) } fn manifest_sent_to(&mut self, validator: ValidatorIndex, local_knowledge: StatementFilter) { let k = self.mutual_knowledge.entry(validator).or_insert_with(|| MutualKnowledge { remote_knowledge: None, local_knowledge: None, received_knowledge: None, }); k.received_knowledge = Some(StatementFilter::blank(local_knowledge.seconded_in_group.len())); k.local_knowledge = Some(local_knowledge); } fn manifest_received_from( &mut self, validator: ValidatorIndex, remote_knowledge: StatementFilter, ) { let k = self.mutual_knowledge.entry(validator).or_insert_with(|| MutualKnowledge { remote_knowledge: None, local_knowledge: None, received_knowledge: None, }); k.remote_knowledge = Some(remote_knowledge); } /// Returns a list of tuples representing each potential sender(ValidatorIndex) /// and if the sender should already know about the statement, because we just /// sent it to it. fn direct_statement_senders( &self, group_index: GroupIndex, originator_index_in_group: usize, statement_kind: StatementKind, ) -> Vec<(ValidatorIndex, bool)> { if group_index != self.group_index { return Vec::new() } self.mutual_knowledge .iter() .filter(|(_, k)| k.remote_knowledge.is_some()) .filter(|(_, k)| { k.received_knowledge .as_ref() .map_or(false, |r| !r.contains(originator_index_in_group, statement_kind)) }) .map(|(v, k)| { ( *v, k.local_knowledge .as_ref() .map_or(false, |r| r.contains(originator_index_in_group, statement_kind)), ) }) .collect() } fn direct_statement_recipients( &self, group_index: GroupIndex, originator_index_in_group: usize, statement_kind: StatementKind, ) -> Vec { if group_index != self.group_index { return Vec::new() } self.mutual_knowledge .iter() .filter(|(_, k)| k.local_knowledge.is_some()) .filter(|(_, k)| { k.remote_knowledge .as_ref() .map_or(false, |r| !r.contains(originator_index_in_group, statement_kind)) }) .map(|(v, _)| *v) .collect() } fn note_fresh_statement( &mut self, statement_index_in_group: usize, statement_kind: StatementKind, ) -> bool { let really_fresh = !self.local_knowledge.contains(statement_index_in_group, statement_kind); self.local_knowledge.set(statement_index_in_group, statement_kind); really_fresh } fn sent_or_received_direct_statement( &mut self, validator: ValidatorIndex, statement_index_in_group: usize, statement_kind: StatementKind, received: bool, ) { if let Some(k) = self.mutual_knowledge.get_mut(&validator) { if let (Some(r), Some(l)) = (k.remote_knowledge.as_mut(), k.local_knowledge.as_mut()) { r.set(statement_index_in_group, statement_kind); l.set(statement_index_in_group, statement_kind); } if received { k.received_knowledge .as_mut() .map(|knowledge| knowledge.set(statement_index_in_group, statement_kind)); } } } fn is_pending_statement( &self, validator: ValidatorIndex, statement_index_in_group: usize, statement_kind: StatementKind, ) -> bool { // existence of both remote & local knowledge indicate we have exchanged // manifests. // then, everything that is not in the remote knowledge is pending self.mutual_knowledge .get(&validator) .filter(|k| k.local_knowledge.is_some()) .and_then(|k| k.remote_knowledge.as_ref()) .map(|k| !k.contains(statement_index_in_group, statement_kind)) .unwrap_or(false) } fn pending_statements(&self, validator: ValidatorIndex) -> Option { // existence of both remote & local knowledge indicate we have exchanged // manifests. // then, everything that is not in the remote knowledge is pending, and we // further limit this by what is in the local knowledge itself. we use the // full local knowledge, as the local knowledge stored here may be outdated. let full_local = &self.local_knowledge; self.mutual_knowledge .get(&validator) .filter(|k| k.local_knowledge.is_some()) .and_then(|k| k.remote_knowledge.as_ref()) .map(|remote| StatementFilter { seconded_in_group: full_local.seconded_in_group.clone() & !remote.seconded_in_group.clone(), validated_in_group: full_local.validated_in_group.clone() & !remote.validated_in_group.clone(), }) } } #[cfg(test)] mod tests { use super::*; use assert_matches::assert_matches; use polkadot_node_network_protocol::grid_topology::TopologyPeerInfo; use sp_authority_discovery::AuthorityPair as AuthorityDiscoveryPair; use sp_core::crypto::Pair as PairT; fn dummy_groups(group_size: usize) -> Groups { let groups = vec![(0..(group_size as u32)).map(ValidatorIndex).collect()].into(); Groups::new(groups, 2) } #[test] fn topology_empty_for_no_index() { let base_topology = SessionGridTopology::new( vec![0, 1, 2], vec![ TopologyPeerInfo { peer_ids: Vec::new(), validator_index: ValidatorIndex(0), discovery_id: AuthorityDiscoveryPair::generate().0.public(), }, TopologyPeerInfo { peer_ids: Vec::new(), validator_index: ValidatorIndex(1), discovery_id: AuthorityDiscoveryPair::generate().0.public(), }, TopologyPeerInfo { peer_ids: Vec::new(), validator_index: ValidatorIndex(2), discovery_id: AuthorityDiscoveryPair::generate().0.public(), }, ], ); let t = build_session_topology( &[vec![ValidatorIndex(0)], vec![ValidatorIndex(1)], vec![ValidatorIndex(2)]], &base_topology, None, ); assert!(t.group_views.is_empty()); } #[test] fn topology_setup() { let base_topology = SessionGridTopology::new( (0..9).collect(), (0..9) .map(|i| TopologyPeerInfo { peer_ids: Vec::new(), validator_index: ValidatorIndex(i), discovery_id: AuthorityDiscoveryPair::generate().0.public(), }) .collect(), ); let t = build_session_topology( &[ vec![ValidatorIndex(0), ValidatorIndex(3), ValidatorIndex(6)], vec![ValidatorIndex(4), ValidatorIndex(2), ValidatorIndex(7)], vec![ValidatorIndex(8), ValidatorIndex(5), ValidatorIndex(1)], ], &base_topology, Some(ValidatorIndex(0)), ); assert_eq!(t.group_views.len(), 3); // 0 1 2 // 3 4 5 // 6 7 8 // our group: we send to all row/column neighbors which are not in our // group and receive nothing. assert_eq!( t.group_views.get(&GroupIndex(0)).unwrap().sending, vec![1, 2].into_iter().map(ValidatorIndex).collect::>(), ); assert_eq!(t.group_views.get(&GroupIndex(0)).unwrap().receiving, HashSet::new(),); // we share a row with '2' and have indirect connections to '4' and '7'. assert_eq!( t.group_views.get(&GroupIndex(1)).unwrap().sending, vec![3, 6].into_iter().map(ValidatorIndex).collect::>(), ); assert_eq!( t.group_views.get(&GroupIndex(1)).unwrap().receiving, vec![1, 2, 3, 6].into_iter().map(ValidatorIndex).collect::>(), ); // we share a row with '1' and have indirect connections to '5' and '8'. assert_eq!( t.group_views.get(&GroupIndex(2)).unwrap().sending, vec![3, 6].into_iter().map(ValidatorIndex).collect::>(), ); assert_eq!( t.group_views.get(&GroupIndex(2)).unwrap().receiving, vec![1, 2, 3, 6].into_iter().map(ValidatorIndex).collect::>(), ); } #[test] fn knowledge_rejects_conflicting_manifest() { let mut knowledge = ReceivedManifests::default(); let expected_manifest_summary = ManifestSummary { claimed_parent_hash: Hash::repeat_byte(2), claimed_group_index: GroupIndex(0), statement_knowledge: StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 1, 1, 0], validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 1], }, }; knowledge .import_received( 3, 2, CandidateHash(Hash::repeat_byte(1)), expected_manifest_summary.clone(), ) .unwrap(); // conflicting group let mut s = expected_manifest_summary.clone(); s.claimed_group_index = GroupIndex(1); assert_matches!( knowledge.import_received(3, 2, CandidateHash(Hash::repeat_byte(1)), s,), Err(ManifestImportError::Conflicting) ); // conflicting parent hash let mut s = expected_manifest_summary.clone(); s.claimed_parent_hash = Hash::repeat_byte(3); assert_matches!( knowledge.import_received(3, 2, CandidateHash(Hash::repeat_byte(1)), s,), Err(ManifestImportError::Conflicting) ); // conflicting seconded statements bitfield let mut s = expected_manifest_summary.clone(); s.statement_knowledge.seconded_in_group = bitvec::bitvec![u8, Lsb0; 0, 1, 0]; assert_matches!( knowledge.import_received(3, 2, CandidateHash(Hash::repeat_byte(1)), s,), Err(ManifestImportError::Conflicting) ); // conflicting valid statements bitfield let mut s = expected_manifest_summary.clone(); s.statement_knowledge.validated_in_group = bitvec::bitvec![u8, Lsb0; 0, 1, 0]; assert_matches!( knowledge.import_received(3, 2, CandidateHash(Hash::repeat_byte(1)), s,), Err(ManifestImportError::Conflicting) ); } // Make sure we don't import manifests that would put a validator in a group over the limit of // candidates they are allowed to second (aka seconding limit). #[test] fn reject_overflowing_manifests() { let mut knowledge = ReceivedManifests::default(); knowledge .import_received( 3, 2, CandidateHash(Hash::repeat_byte(1)), ManifestSummary { claimed_parent_hash: Hash::repeat_byte(0xA), claimed_group_index: GroupIndex(0), statement_knowledge: StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 1, 1, 0], validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 1], }, }, ) .unwrap(); knowledge .import_received( 3, 2, CandidateHash(Hash::repeat_byte(2)), ManifestSummary { claimed_parent_hash: Hash::repeat_byte(0xB), claimed_group_index: GroupIndex(0), statement_knowledge: StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1], validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 1], }, }, ) .unwrap(); // Reject a seconding validator that is already at the seconding limit. Seconding counts for // the validators should not be applied. assert_matches!( knowledge.import_received( 3, 2, CandidateHash(Hash::repeat_byte(3)), ManifestSummary { claimed_parent_hash: Hash::repeat_byte(0xC), claimed_group_index: GroupIndex(0), statement_knowledge: StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 1, 1, 1], validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 1], } }, ), Err(ManifestImportError::Overflow) ); // Don't reject validators that have seconded less than the limit so far. knowledge .import_received( 3, 2, CandidateHash(Hash::repeat_byte(3)), ManifestSummary { claimed_parent_hash: Hash::repeat_byte(0xC), claimed_group_index: GroupIndex(0), statement_knowledge: StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 1], validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 1], }, }, ) .unwrap(); } #[test] fn reject_disallowed_manifest() { let mut tracker = GridTracker::default(); let session_topology = SessionTopologyView { group_views: vec![( GroupIndex(0), GroupSubView { sending: HashSet::new(), receiving: vec![ValidatorIndex(0)].into_iter().collect(), }, )] .into_iter() .collect(), }; let groups = dummy_groups(3); let candidate_hash = CandidateHash(Hash::repeat_byte(42)); assert_eq!(groups.get_size_and_backing_threshold(GroupIndex(0)), Some((3, 2)),); // Known group, disallowed receiving validator. assert_matches!( tracker.import_manifest( &session_topology, &groups, candidate_hash, 3, ManifestSummary { claimed_parent_hash: Hash::repeat_byte(0), claimed_group_index: GroupIndex(0), statement_knowledge: StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0], validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1], } }, ManifestKind::Full, ValidatorIndex(1), ), Err(ManifestImportError::Disallowed) ); // Unknown group assert_matches!( tracker.import_manifest( &session_topology, &groups, candidate_hash, 3, ManifestSummary { claimed_parent_hash: Hash::repeat_byte(0), claimed_group_index: GroupIndex(1), statement_knowledge: StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0], validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1], } }, ManifestKind::Full, ValidatorIndex(0), ), Err(ManifestImportError::Disallowed) ); } #[test] fn reject_malformed_wrong_group_size() { let mut tracker = GridTracker::default(); let session_topology = SessionTopologyView { group_views: vec![( GroupIndex(0), GroupSubView { sending: HashSet::new(), receiving: vec![ValidatorIndex(0)].into_iter().collect(), }, )] .into_iter() .collect(), }; let groups = dummy_groups(3); let candidate_hash = CandidateHash(Hash::repeat_byte(42)); assert_eq!(groups.get_size_and_backing_threshold(GroupIndex(0)), Some((3, 2)),); assert_matches!( tracker.import_manifest( &session_topology, &groups, candidate_hash, 3, ManifestSummary { claimed_parent_hash: Hash::repeat_byte(0), claimed_group_index: GroupIndex(0), statement_knowledge: StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0, 1], validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1], } }, ManifestKind::Full, ValidatorIndex(0), ), Err(ManifestImportError::Malformed) ); assert_matches!( tracker.import_manifest( &session_topology, &groups, candidate_hash, 3, ManifestSummary { claimed_parent_hash: Hash::repeat_byte(0), claimed_group_index: GroupIndex(0), statement_knowledge: StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0], validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1, 0], } }, ManifestKind::Full, ValidatorIndex(0), ), Err(ManifestImportError::Malformed) ); } #[test] fn reject_malformed_no_seconders() { let mut tracker = GridTracker::default(); let session_topology = SessionTopologyView { group_views: vec![( GroupIndex(0), GroupSubView { sending: HashSet::new(), receiving: vec![ValidatorIndex(0)].into_iter().collect(), }, )] .into_iter() .collect(), }; let groups = dummy_groups(3); let candidate_hash = CandidateHash(Hash::repeat_byte(42)); assert_eq!(groups.get_size_and_backing_threshold(GroupIndex(0)), Some((3, 2)),); assert_matches!( tracker.import_manifest( &session_topology, &groups, candidate_hash, 3, ManifestSummary { claimed_parent_hash: Hash::repeat_byte(0), claimed_group_index: GroupIndex(0), statement_knowledge: StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0], validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 1, 1], } }, ManifestKind::Full, ValidatorIndex(0), ), Err(ManifestImportError::Malformed) ); } #[test] fn reject_insufficient_below_threshold() { let mut tracker = GridTracker::default(); let session_topology = SessionTopologyView { group_views: vec![( GroupIndex(0), GroupSubView { sending: HashSet::new(), receiving: HashSet::from([ValidatorIndex(0)]), }, )] .into_iter() .collect(), }; let groups = dummy_groups(3); let candidate_hash = CandidateHash(Hash::repeat_byte(42)); assert_eq!(groups.get_size_and_backing_threshold(GroupIndex(0)), Some((3, 2)),); // only one vote assert_matches!( tracker.import_manifest( &session_topology, &groups, candidate_hash, 3, ManifestSummary { claimed_parent_hash: Hash::repeat_byte(0), claimed_group_index: GroupIndex(0), statement_knowledge: StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 1], validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0], } }, ManifestKind::Full, ValidatorIndex(0), ), Err(ManifestImportError::Insufficient) ); // seconding + validating still not enough to reach '2' threshold assert_matches!( tracker.import_manifest( &session_topology, &groups, candidate_hash, 3, ManifestSummary { claimed_parent_hash: Hash::repeat_byte(0), claimed_group_index: GroupIndex(0), statement_knowledge: StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 1], validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 1], } }, ManifestKind::Full, ValidatorIndex(0), ), Err(ManifestImportError::Insufficient) ); // finally good. assert_matches!( tracker.import_manifest( &session_topology, &groups, candidate_hash, 3, ManifestSummary { claimed_parent_hash: Hash::repeat_byte(0), claimed_group_index: GroupIndex(0), statement_knowledge: StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 1], validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0], } }, ManifestKind::Full, ValidatorIndex(0), ), Ok(false) ); } // Test that when we add a candidate as backed and advertise it to the sending group, they can // provide an acknowledgement manifest in response. #[test] fn senders_can_provide_manifests_in_acknowledgement() { let validator_index = ValidatorIndex(0); let mut tracker = GridTracker::default(); let session_topology = SessionTopologyView { group_views: vec![( GroupIndex(0), GroupSubView { sending: HashSet::from([validator_index]), receiving: HashSet::from([ValidatorIndex(1)]), }, )] .into_iter() .collect(), }; let candidate_hash = CandidateHash(Hash::repeat_byte(42)); let group_index = GroupIndex(0); let group_size = 3; let local_knowledge = StatementFilter::blank(group_size); let groups = dummy_groups(group_size); // Add the candidate as backed. let receivers = tracker.add_backed_candidate( &session_topology, candidate_hash, group_index, local_knowledge.clone(), ); // Validator 0 is in the sending group. Advertise onward to it. // // Validator 1 is in the receiving group, but we have not received from it, so we're not // expected to send it an acknowledgement. assert_eq!(receivers, vec![(validator_index, ManifestKind::Full)]); // Note the manifest as 'sent' to validator 0. tracker.manifest_sent_to(&groups, validator_index, candidate_hash, local_knowledge); // Import manifest of kind `Acknowledgement` from validator 0. let ack = tracker.import_manifest( &session_topology, &groups, candidate_hash, 3, ManifestSummary { claimed_parent_hash: Hash::repeat_byte(0), claimed_group_index: group_index, statement_knowledge: StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0], validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1], }, }, ManifestKind::Acknowledgement, validator_index, ); assert_matches!(ack, Ok(false)); } // Check that pending communication is set correctly when receiving a manifest on a confirmed // candidate. // // It should also overwrite any existing `Full` ManifestKind. #[test] fn pending_communication_receiving_manifest_on_confirmed_candidate() { let validator_index = ValidatorIndex(0); let mut tracker = GridTracker::default(); let session_topology = SessionTopologyView { group_views: vec![( GroupIndex(0), GroupSubView { sending: HashSet::from([validator_index]), receiving: HashSet::from([ValidatorIndex(1)]), }, )] .into_iter() .collect(), }; let candidate_hash = CandidateHash(Hash::repeat_byte(42)); let group_index = GroupIndex(0); let group_size = 3; let local_knowledge = StatementFilter::blank(group_size); let groups = dummy_groups(group_size); // Manifest should not be pending yet. let pending_manifest = tracker.is_manifest_pending_for(validator_index, &candidate_hash); assert_eq!(pending_manifest, None); // Add the candidate as backed. tracker.add_backed_candidate( &session_topology, candidate_hash, group_index, local_knowledge.clone(), ); // Manifest should be pending as `Full`. let pending_manifest = tracker.is_manifest_pending_for(validator_index, &candidate_hash); assert_eq!(pending_manifest, Some(ManifestKind::Full)); // Note the manifest as 'sent' to validator 0. tracker.manifest_sent_to(&groups, validator_index, candidate_hash, local_knowledge); // Import manifest. // // Should overwrite existing `Full` manifest. let ack = tracker.import_manifest( &session_topology, &groups, candidate_hash, 3, ManifestSummary { claimed_parent_hash: Hash::repeat_byte(0), claimed_group_index: group_index, statement_knowledge: StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0], validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1], }, }, ManifestKind::Acknowledgement, validator_index, ); assert_matches!(ack, Ok(false)); let pending_manifest = tracker.is_manifest_pending_for(validator_index, &candidate_hash); assert_eq!(pending_manifest, None); } // Check that pending communication is cleared correctly in `manifest_sent_to` // // Also test a scenario where manifest import returns `Ok(true)` (should acknowledge). #[test] fn pending_communication_is_cleared() { let validator_index = ValidatorIndex(0); let mut tracker = GridTracker::default(); let session_topology = SessionTopologyView { group_views: vec![( GroupIndex(0), GroupSubView { sending: HashSet::new(), receiving: HashSet::from([validator_index]), }, )] .into_iter() .collect(), }; let candidate_hash = CandidateHash(Hash::repeat_byte(42)); let group_index = GroupIndex(0); let group_size = 3; let local_knowledge = StatementFilter::blank(group_size); let groups = dummy_groups(group_size); // Add the candidate as backed. tracker.add_backed_candidate( &session_topology, candidate_hash, group_index, local_knowledge.clone(), ); // Manifest should not be pending yet. let pending_manifest = tracker.is_manifest_pending_for(validator_index, &candidate_hash); assert_eq!(pending_manifest, None); // Import manifest. The candidate is confirmed backed and we are expected to receive from // validator 0, so send it an acknowledgement. let ack = tracker.import_manifest( &session_topology, &groups, candidate_hash, 3, ManifestSummary { claimed_parent_hash: Hash::repeat_byte(0), claimed_group_index: group_index, statement_knowledge: StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0], validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1], }, }, ManifestKind::Full, validator_index, ); assert_matches!(ack, Ok(true)); // Acknowledgement manifest should be pending. let pending_manifest = tracker.is_manifest_pending_for(validator_index, &candidate_hash); assert_eq!(pending_manifest, Some(ManifestKind::Acknowledgement)); // Note the candidate as advertised. tracker.manifest_sent_to(&groups, validator_index, candidate_hash, local_knowledge); // Pending manifest should be cleared. let pending_manifest = tracker.is_manifest_pending_for(validator_index, &candidate_hash); assert_eq!(pending_manifest, None); } /// A manifest exchange means that both `manifest_sent_to` and `manifest_received_from` have /// been invoked. /// /// In practice, it means that one of three things have happened: /// /// - They announced, we acknowledged /// /// - We announced, they acknowledged /// /// - We announced, they announced (not sure if this can actually happen; it would happen if 2 /// nodes had each other in their sending set and they sent manifests at the same time. The /// code accounts for this anyway) #[test] fn pending_statements_are_updated_after_manifest_exchange() { let send_to = ValidatorIndex(0); let receive_from = ValidatorIndex(1); let mut tracker = GridTracker::default(); let session_topology = SessionTopologyView { group_views: vec![( GroupIndex(0), GroupSubView { sending: HashSet::from([send_to]), receiving: HashSet::from([receive_from]), }, )] .into_iter() .collect(), }; let candidate_hash = CandidateHash(Hash::repeat_byte(42)); let group_index = GroupIndex(0); let group_size = 3; let local_knowledge = StatementFilter::blank(group_size); let groups = dummy_groups(group_size); // Confirm the candidate. let receivers = tracker.add_backed_candidate( &session_topology, candidate_hash, group_index, local_knowledge.clone(), ); assert_eq!(receivers, vec![(send_to, ManifestKind::Full)]); // Learn a statement from a different validator. tracker.learned_fresh_statement( &groups, &session_topology, ValidatorIndex(2), &CompactStatement::Seconded(candidate_hash), ); // Test receiving followed by sending an ack. { // Should start with no pending statements. assert_eq!(tracker.pending_statements_for(receive_from, candidate_hash), None); assert_eq!(tracker.all_pending_statements_for(receive_from), vec![]); let ack = tracker.import_manifest( &session_topology, &groups, candidate_hash, 3, ManifestSummary { claimed_parent_hash: Hash::repeat_byte(0), claimed_group_index: group_index, statement_knowledge: StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0], validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1], }, }, ManifestKind::Full, receive_from, ); assert_matches!(ack, Ok(true)); // Send ack now. tracker.manifest_sent_to( &groups, receive_from, candidate_hash, local_knowledge.clone(), ); // There should be pending statements now. assert_eq!( tracker.pending_statements_for(receive_from, candidate_hash), Some(StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 1], validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0], }) ); assert_eq!( tracker.all_pending_statements_for(receive_from), vec![(ValidatorIndex(2), CompactStatement::Seconded(candidate_hash))] ); } // Test sending followed by receiving an ack. { // Should start with no pending statements. assert_eq!(tracker.pending_statements_for(send_to, candidate_hash), None); assert_eq!(tracker.all_pending_statements_for(send_to), vec![]); tracker.manifest_sent_to(&groups, send_to, candidate_hash, local_knowledge.clone()); let ack = tracker.import_manifest( &session_topology, &groups, candidate_hash, 3, ManifestSummary { claimed_parent_hash: Hash::repeat_byte(0), claimed_group_index: group_index, statement_knowledge: StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0], validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 1], }, }, ManifestKind::Acknowledgement, send_to, ); assert_matches!(ack, Ok(false)); // There should be pending statements now. assert_eq!( tracker.pending_statements_for(send_to, candidate_hash), Some(StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 1], validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0], }) ); assert_eq!( tracker.all_pending_statements_for(send_to), vec![(ValidatorIndex(2), CompactStatement::Seconded(candidate_hash))] ); } } #[test] fn invalid_fresh_statement_import() { let validator_index = ValidatorIndex(0); let mut tracker = GridTracker::default(); let session_topology = SessionTopologyView { group_views: vec![( GroupIndex(0), GroupSubView { sending: HashSet::new(), receiving: HashSet::from([validator_index]), }, )] .into_iter() .collect(), }; let candidate_hash = CandidateHash(Hash::repeat_byte(42)); let group_index = GroupIndex(0); let group_size = 3; let local_knowledge = StatementFilter::blank(group_size); let groups = dummy_groups(group_size); // Should start with no pending statements. assert_eq!(tracker.pending_statements_for(validator_index, candidate_hash), None); assert_eq!(tracker.all_pending_statements_for(validator_index), vec![]); // Try to import fresh statement. Candidate not backed. let statement = CompactStatement::Seconded(candidate_hash); tracker.learned_fresh_statement(&groups, &session_topology, validator_index, &statement); assert_eq!(tracker.pending_statements_for(validator_index, candidate_hash), None); assert_eq!(tracker.all_pending_statements_for(validator_index), vec![]); // Add the candidate as backed. tracker.add_backed_candidate( &session_topology, candidate_hash, group_index, local_knowledge.clone(), ); // Try to import fresh statement. Unknown group for validator index. let statement = CompactStatement::Seconded(candidate_hash); tracker.learned_fresh_statement(&groups, &session_topology, ValidatorIndex(1), &statement); assert_eq!(tracker.pending_statements_for(validator_index, candidate_hash), None); assert_eq!(tracker.all_pending_statements_for(validator_index), vec![]); } #[test] fn pending_statements_updated_when_importing_fresh_statement() { let validator_index = ValidatorIndex(0); let mut tracker = GridTracker::default(); let session_topology = SessionTopologyView { group_views: vec![( GroupIndex(0), GroupSubView { sending: HashSet::new(), receiving: HashSet::from([validator_index]), }, )] .into_iter() .collect(), }; let candidate_hash = CandidateHash(Hash::repeat_byte(42)); let group_index = GroupIndex(0); let group_size = 3; let local_knowledge = StatementFilter::blank(group_size); let groups = dummy_groups(group_size); // Should start with no pending statements. assert_eq!(tracker.pending_statements_for(validator_index, candidate_hash), None); assert_eq!(tracker.all_pending_statements_for(validator_index), vec![]); // Add the candidate as backed. tracker.add_backed_candidate( &session_topology, candidate_hash, group_index, local_knowledge.clone(), ); // Import fresh statement. let ack = tracker.import_manifest( &session_topology, &groups, candidate_hash, 3, ManifestSummary { claimed_parent_hash: Hash::repeat_byte(0), claimed_group_index: group_index, statement_knowledge: StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0], validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1], }, }, ManifestKind::Full, validator_index, ); assert_matches!(ack, Ok(true)); tracker.manifest_sent_to(&groups, validator_index, candidate_hash, local_knowledge); let statement = CompactStatement::Seconded(candidate_hash); tracker.learned_fresh_statement(&groups, &session_topology, validator_index, &statement); // There should be pending statements now. let statements = StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 0], validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0], }; assert_eq!( tracker.pending_statements_for(validator_index, candidate_hash), Some(statements.clone()) ); assert_eq!( tracker.all_pending_statements_for(validator_index), vec![(ValidatorIndex(0), CompactStatement::Seconded(candidate_hash))] ); // After successful import, try importing again. Nothing should change. tracker.learned_fresh_statement(&groups, &session_topology, validator_index, &statement); assert_eq!( tracker.pending_statements_for(validator_index, candidate_hash), Some(statements) ); assert_eq!( tracker.all_pending_statements_for(validator_index), vec![(ValidatorIndex(0), CompactStatement::Seconded(candidate_hash))] ); } // After learning fresh statements, we should not generate pending statements for knowledge that // the validator already has. #[test] fn pending_statements_respect_remote_knowledge() { let validator_index = ValidatorIndex(0); let mut tracker = GridTracker::default(); let session_topology = SessionTopologyView { group_views: vec![( GroupIndex(0), GroupSubView { sending: HashSet::new(), receiving: HashSet::from([validator_index]), }, )] .into_iter() .collect(), }; let candidate_hash = CandidateHash(Hash::repeat_byte(42)); let group_index = GroupIndex(0); let group_size = 3; let local_knowledge = StatementFilter::blank(group_size); let groups = dummy_groups(group_size); // Should start with no pending statements. assert_eq!(tracker.pending_statements_for(validator_index, candidate_hash), None); assert_eq!(tracker.all_pending_statements_for(validator_index), vec![]); // Add the candidate as backed. tracker.add_backed_candidate( &session_topology, candidate_hash, group_index, local_knowledge.clone(), ); // Import fresh statement. let ack = tracker.import_manifest( &session_topology, &groups, candidate_hash, 3, ManifestSummary { claimed_parent_hash: Hash::repeat_byte(0), claimed_group_index: group_index, statement_knowledge: StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1], validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0], }, }, ManifestKind::Full, validator_index, ); assert_matches!(ack, Ok(true)); tracker.manifest_sent_to(&groups, validator_index, candidate_hash, local_knowledge); tracker.learned_fresh_statement( &groups, &session_topology, validator_index, &CompactStatement::Seconded(candidate_hash), ); tracker.learned_fresh_statement( &groups, &session_topology, validator_index, &CompactStatement::Valid(candidate_hash), ); // The pending statements should respect the remote knowledge (meaning the Seconded // statement is ignored, but not the Valid statement). let statements = StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0], validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 0], }; assert_eq!( tracker.pending_statements_for(validator_index, candidate_hash), Some(statements.clone()) ); assert_eq!( tracker.all_pending_statements_for(validator_index), vec![(ValidatorIndex(0), CompactStatement::Valid(candidate_hash))] ); } #[test] fn pending_statements_cleared_when_sending() { let validator_index = ValidatorIndex(0); let counterparty = ValidatorIndex(1); let mut tracker = GridTracker::default(); let session_topology = SessionTopologyView { group_views: vec![( GroupIndex(0), GroupSubView { sending: HashSet::new(), receiving: HashSet::from([validator_index, counterparty]), }, )] .into_iter() .collect(), }; let candidate_hash = CandidateHash(Hash::repeat_byte(42)); let group_index = GroupIndex(0); let group_size = 3; let local_knowledge = StatementFilter::blank(group_size); let groups = dummy_groups(group_size); // Should start with no pending statements. assert_eq!(tracker.pending_statements_for(validator_index, candidate_hash), None); assert_eq!(tracker.all_pending_statements_for(validator_index), vec![]); // Add the candidate as backed. tracker.add_backed_candidate( &session_topology, candidate_hash, group_index, local_knowledge.clone(), ); // Import statement for originator. tracker .import_manifest( &session_topology, &groups, candidate_hash, 3, ManifestSummary { claimed_parent_hash: Hash::repeat_byte(0), claimed_group_index: group_index, statement_knowledge: StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0], validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1], }, }, ManifestKind::Full, validator_index, ) .ok() .unwrap(); tracker.manifest_sent_to(&groups, validator_index, candidate_hash, local_knowledge.clone()); let statement = CompactStatement::Seconded(candidate_hash); tracker.learned_fresh_statement(&groups, &session_topology, validator_index, &statement); // Import statement for counterparty. tracker .import_manifest( &session_topology, &groups, candidate_hash, 3, ManifestSummary { claimed_parent_hash: Hash::repeat_byte(0), claimed_group_index: group_index, statement_knowledge: StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 0, 1, 0], validated_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 1], }, }, ManifestKind::Full, counterparty, ) .ok() .unwrap(); tracker.manifest_sent_to(&groups, counterparty, candidate_hash, local_knowledge); let statement = CompactStatement::Seconded(candidate_hash); tracker.learned_fresh_statement(&groups, &session_topology, counterparty, &statement); // There should be pending statements now. let statements = StatementFilter { seconded_in_group: bitvec::bitvec![u8, Lsb0; 1, 0, 0], validated_in_group: bitvec::bitvec![u8, Lsb0; 0, 0, 0], }; assert_eq!( tracker.pending_statements_for(validator_index, candidate_hash), Some(statements.clone()) ); assert_eq!( tracker.all_pending_statements_for(validator_index), vec![(ValidatorIndex(0), CompactStatement::Seconded(candidate_hash))] ); assert_eq!( tracker.pending_statements_for(counterparty, candidate_hash), Some(statements.clone()) ); assert_eq!( tracker.all_pending_statements_for(counterparty), vec![(ValidatorIndex(0), CompactStatement::Seconded(candidate_hash))] ); tracker.learned_fresh_statement(&groups, &session_topology, validator_index, &statement); tracker.sent_or_received_direct_statement( &groups, validator_index, counterparty, &statement, false, ); // There should be no pending statements now (for the counterparty). assert_eq!( tracker.pending_statements_for(counterparty, candidate_hash), Some(StatementFilter::blank(group_size)) ); assert_eq!(tracker.all_pending_statements_for(counterparty), vec![]); } #[test] fn session_grid_topology_consistent() { let n_validators = 300; let group_size = 5; let validator_indices = (0..n_validators).map(|i| ValidatorIndex(i as u32)).collect::>(); let groups = validator_indices.chunks(group_size).map(|x| x.to_vec()).collect::>(); let topology = SessionGridTopology::new( (0..n_validators).collect::>(), (0..n_validators) .map(|i| TopologyPeerInfo { peer_ids: Vec::new(), validator_index: ValidatorIndex(i as u32), discovery_id: AuthorityDiscoveryPair::generate().0.public(), }) .collect(), ); let computed_topologies = validator_indices .iter() .cloned() .map(|v| build_session_topology(groups.iter(), &topology, Some(v))) .collect::>(); let pairwise_check_topologies = |i, j| { let v_i = ValidatorIndex(i); let v_j = ValidatorIndex(j); for group in (0..groups.len()).map(|i| GroupIndex(i as u32)) { let g_i = computed_topologies[i as usize].group_views.get(&group).unwrap(); let g_j = computed_topologies[j as usize].group_views.get(&group).unwrap(); if g_i.sending.contains(&v_j) { assert!( g_j.receiving.contains(&v_i), "{:?}: {:?}, sending but not receiving", group, &(i, j) ); } if g_j.sending.contains(&v_i) { assert!( g_i.receiving.contains(&v_j), "{:?}: {:?}, sending but not receiving", group, &(j, i) ); } if g_i.receiving.contains(&v_j) { assert!(g_j.sending.contains(&v_i), "{:?}, receiving but not sending", &(i, j)); } if g_j.receiving.contains(&v_i) { assert!(g_i.sending.contains(&v_j), "{:?}, receiving but not sending", &(j, i)); } } }; for i in 0..n_validators { for j in (i + 1)..n_validators { pairwise_check_topologies(i as u32, j as u32); } } } }