#![allow(clippy::arithmetic_side_effects)] use { bincode::serialized_size, log::*, miraland_gossip::{ cluster_info, cluster_info_metrics::GossipStats, crds::GossipRoute, crds_gossip::*, crds_gossip_error::CrdsGossipError, crds_gossip_pull::{CrdsTimeouts, ProcessPullStats, CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS}, crds_gossip_push::CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS, crds_value::{CrdsData, CrdsValue, CrdsValueLabel}, legacy_contact_info::LegacyContactInfo as ContactInfo, ping_pong::PingCache, }, miraland_rayon_threadlimit::get_thread_count, miraland_streamer::socket::SocketAddrSpace, rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, serial_test::serial, miraland_sdk::{ hash::hash, pubkey::Pubkey, signature::{Keypair, Signer}, timing::timestamp, }, std::{ collections::{HashMap, HashSet}, ops::Deref, sync::{Arc, Mutex}, time::{Duration, Instant}, }, }; #[derive(Clone)] struct Node { keypair: Arc, contact_info: ContactInfo, gossip: Arc, ping_cache: Arc>, stake: u64, } impl Node { fn new(keypair: Arc, contact_info: ContactInfo, gossip: Arc) -> Self { Self::staked(keypair, contact_info, gossip, 0) } fn staked( keypair: Arc, contact_info: ContactInfo, gossip: Arc, stake: u64, ) -> Self { let ping_cache = Arc::new(new_ping_cache()); Node { keypair, contact_info, gossip, ping_cache, stake, } } } struct Network { nodes: HashMap, stake_pruned: u64, connections_pruned: HashSet<(Pubkey, Pubkey)>, } impl Network { fn new(nodes: HashMap) -> Self { Network { nodes, connections_pruned: HashSet::new(), stake_pruned: 0, } } } impl Deref for Network { type Target = HashMap; fn deref(&self) -> &Self::Target { &self.nodes } } fn stakes(network: &Network) -> HashMap { let mut stakes = HashMap::new(); for (key, Node { stake, .. }) in network.iter() { stakes.insert(*key, *stake); } stakes } fn star_network_create(num: usize) -> Network { let node_keypair = Arc::new(Keypair::new()); let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); let entry = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(contact_info.clone())); let mut network: HashMap<_, _> = (1..num) .map(|_| { let node_keypair = Arc::new(Keypair::new()); let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); let new = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(contact_info.clone())); let node = CrdsGossip::default(); { let mut node_crds = node.crds.write().unwrap(); node_crds .insert(new.clone(), timestamp(), GossipRoute::LocalMessage) .unwrap(); node_crds .insert(entry.clone(), timestamp(), GossipRoute::LocalMessage) .unwrap(); } let node = Node::new(node_keypair, contact_info, Arc::new(node)); (new.label().pubkey(), node) }) .collect(); let node = CrdsGossip::default(); let id = entry.label().pubkey(); node.crds .write() .unwrap() .insert(entry, timestamp(), GossipRoute::LocalMessage) .unwrap(); let node = Node::new(node_keypair, contact_info, Arc::new(node)); network.insert(id, node); Network::new(network) } fn rstar_network_create(num: usize) -> Network { let node_keypair = Arc::new(Keypair::new()); let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); let entry = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(contact_info.clone())); let origin = CrdsGossip::default(); let id = entry.label().pubkey(); origin .crds .write() .unwrap() .insert(entry, timestamp(), GossipRoute::LocalMessage) .unwrap(); let mut network: HashMap<_, _> = (1..num) .map(|_| { let node_keypair = Arc::new(Keypair::new()); let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); let new = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(contact_info.clone())); let node = CrdsGossip::default(); node.crds .write() .unwrap() .insert(new.clone(), timestamp(), GossipRoute::LocalMessage) .unwrap(); origin .crds .write() .unwrap() .insert(new.clone(), timestamp(), GossipRoute::LocalMessage) .unwrap(); let node = Node::new(node_keypair, contact_info, Arc::new(node)); (new.label().pubkey(), node) }) .collect(); let node = Node::new(node_keypair, contact_info, Arc::new(origin)); network.insert(id, node); Network::new(network) } fn ring_network_create(num: usize) -> Network { let mut network: HashMap<_, _> = (0..num) .map(|_| { let node_keypair = Arc::new(Keypair::new()); let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); let new = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(contact_info.clone())); let node = CrdsGossip::default(); node.crds .write() .unwrap() .insert(new.clone(), timestamp(), GossipRoute::LocalMessage) .unwrap(); let node = Node::new(node_keypair, contact_info, Arc::new(node)); (new.label().pubkey(), node) }) .collect(); let keys: Vec = network.keys().cloned().collect(); for k in 0..keys.len() { let start_info = { let start = &network[&keys[k]]; let start_id = keys[k]; let label = CrdsValueLabel::LegacyContactInfo(start_id); let gossip_crds = start.gossip.crds.read().unwrap(); gossip_crds.get::<&CrdsValue>(&label).unwrap().clone() }; let end = network.get_mut(&keys[(k + 1) % keys.len()]).unwrap(); let mut end_crds = end.gossip.crds.write().unwrap(); end_crds .insert(start_info, timestamp(), GossipRoute::LocalMessage) .unwrap(); } Network::new(network) } fn connected_staked_network_create(stakes: &[u64]) -> Network { let num = stakes.len(); let mut network: HashMap<_, _> = (0..num) .map(|n| { let node_keypair = Arc::new(Keypair::new()); let contact_info = ContactInfo::new_localhost(&node_keypair.pubkey(), 0); let new = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(contact_info.clone())); let node = CrdsGossip::default(); node.crds .write() .unwrap() .insert(new.clone(), timestamp(), GossipRoute::LocalMessage) .unwrap(); let node = Node::staked(node_keypair, contact_info, Arc::new(node), stakes[n]); (new.label().pubkey(), node) }) .collect(); let keys: Vec = network.keys().cloned().collect(); let start_entries: Vec<_> = keys .iter() .map(|k| { let start = &network[k]; let start_label = CrdsValueLabel::LegacyContactInfo(*k); let gossip_crds = start.gossip.crds.read().unwrap(); gossip_crds.get::<&CrdsValue>(&start_label).unwrap().clone() }) .collect(); for (end_pubkey, end) in network.iter_mut() { let mut end_crds = end.gossip.crds.write().unwrap(); for k in 0..keys.len() { if keys[k] != *end_pubkey { let start_info = start_entries[k].clone(); end_crds .insert(start_info, timestamp(), GossipRoute::LocalMessage) .unwrap(); } } } Network::new(network) } fn network_simulator_pull_only(thread_pool: &ThreadPool, network: &Network) { let num = network.len(); let (converged, bytes_tx) = network_run_pull(thread_pool, network, 0, num * 2, 0.9); trace!( "network_simulator_pull_{}: converged: {} total_bytes: {}", num, converged, bytes_tx ); assert!(converged >= 0.9); } fn network_simulator(thread_pool: &ThreadPool, network: &mut Network, max_convergance: f64) { let num = network.len(); // run for a small amount of time let (converged, bytes_tx) = network_run_pull(thread_pool, network, 0, 10, 1.0); trace!("network_simulator_push_{}: converged: {}", num, converged); // make sure there is someone in the active set let network_values: Vec = network.values().cloned().collect(); network_values.par_iter().for_each(|node| { node.gossip.refresh_push_active_set( &node.keypair, 0, // shred version &HashMap::new(), // stakes None, // gossip validators &node.ping_cache, &mut Vec::new(), // pings &SocketAddrSpace::Unspecified, ); }); let mut total_bytes = bytes_tx; let mut ts = timestamp(); for _ in 1..num { let start = ((ts + 99) / 100) as usize; let end = start + 10; let now = (start * 100) as u64; ts += 1000; // push a message to the network network_values.par_iter().for_each(|node| { let node_pubkey = node.keypair.pubkey(); let mut m = { let node_crds = node.gossip.crds.read().unwrap(); node_crds.get::<&ContactInfo>(node_pubkey).cloned().unwrap() }; m.set_wallclock(now); node.gossip.process_push_message( vec![( Pubkey::default(), vec![CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(m))], )], now, ); }); // push for a bit let (queue_size, bytes_tx) = network_run_push(thread_pool, network, start, end); total_bytes += bytes_tx; trace!( "network_simulator_push_{}: queue_size: {} bytes: {}", num, queue_size, bytes_tx ); // pull for a bit let (converged, bytes_tx) = network_run_pull(thread_pool, network, start, end, 1.0); total_bytes += bytes_tx; trace!( "network_simulator_push_{}: converged: {} bytes: {} total_bytes: {}", num, converged, bytes_tx, total_bytes ); if converged > max_convergance { break; } } } fn network_run_push( thread_pool: &ThreadPool, network: &mut Network, start: usize, end: usize, ) -> (usize, usize) { let mut bytes: usize = 0; let mut num_msgs: usize = 0; let mut total: usize = 0; let num = network.len(); let mut prunes: usize = 0; let mut delivered: usize = 0; let mut stake_pruned: u64 = 0; let network_values: Vec = network.values().cloned().collect(); let stakes = stakes(network); for t in start..end { let now = t as u64 * 100; let requests: Vec<_> = network_values .par_iter() .map(|node| { let node_pubkey = node.keypair.pubkey(); let timeouts = node.gossip.make_timeouts( node_pubkey, &stakes, Duration::from_millis(node.gossip.pull.crds_timeout), ); node.gossip.purge(&node_pubkey, thread_pool, now, &timeouts); ( node_pubkey, node.gossip.new_push_messages(&node_pubkey, now, &stakes).0, ) }) .collect(); let transfered: Vec<_> = requests .into_par_iter() .map(|(from, push_messages)| { let mut bytes: usize = 0; let mut delivered: usize = 0; let mut num_msgs: usize = 0; let mut pruned: HashSet<(Pubkey, Pubkey)> = HashSet::new(); for (to, msgs) in push_messages { bytes += serialized_size(&msgs).unwrap() as usize; num_msgs += 1; let origins: HashSet<_> = network .get(&to) .unwrap() .gossip .process_push_message(vec![(from, msgs.clone())], now) .into_iter() .collect(); let prunes_map = network .get(&to) .map(|node| { let node_pubkey = node.keypair.pubkey(); node.gossip .prune_received_cache(&node_pubkey, origins, &stakes) }) .unwrap(); for (from, prune_set) in prunes_map { let prune_keys: Vec<_> = prune_set.into_iter().collect(); for prune_key in &prune_keys { pruned.insert((from, *prune_key)); } bytes += serialized_size(&prune_keys).unwrap() as usize; delivered += 1; network .get(&from) .map(|node| { let node_pubkey = node.keypair.pubkey(); let destination = node_pubkey; let now = timestamp(); node.gossip .process_prune_msg( &node_pubkey, &to, &destination, &prune_keys, now, now, &stakes, ) .unwrap() }) .unwrap(); } } (bytes, delivered, num_msgs, pruned) }) .collect(); for (b, d, m, p) in transfered { bytes += b; delivered += d; num_msgs += m; for (from, to) in p { let from_stake = stakes.get(&from).unwrap(); if network.connections_pruned.insert((from, to)) { prunes += 1; stake_pruned += *from_stake; } } } if now % CRDS_GOSSIP_PUSH_MSG_TIMEOUT_MS == 0 && now > 0 { network_values.par_iter().for_each(|node| { node.gossip.refresh_push_active_set( &node.keypair, 0, // shred version &HashMap::new(), // stakes None, // gossip validators &node.ping_cache, &mut Vec::new(), // pings &SocketAddrSpace::Unspecified, ); }); } total = network_values .par_iter() .map(|node| node.gossip.push.num_pending(&node.gossip.crds)) .sum(); trace!( "network_run_push_{}: now: {} queue: {} bytes: {} num_msgs: {} prunes: {} stake_pruned: {} delivered: {}", num, now, total, bytes, num_msgs, prunes, stake_pruned, delivered, ); } network.stake_pruned += stake_pruned; (total, bytes) } fn network_run_pull( thread_pool: &ThreadPool, network: &Network, start: usize, end: usize, max_convergance: f64, ) -> (f64, usize) { let mut bytes: usize = 0; let mut msgs: usize = 0; let mut overhead: usize = 0; let mut convergance = 0f64; let num = network.len(); let network_values: Vec = network.values().cloned().collect(); let stakes = stakes(network); for node in &network_values { let mut ping_cache = node.ping_cache.lock().unwrap(); for other in &network_values { if node.keypair.pubkey() != other.keypair.pubkey() { ping_cache.mock_pong( other.keypair.pubkey(), other.contact_info.gossip().unwrap(), Instant::now(), ); } } } for t in start..end { let now = t as u64 * 100; let requests: Vec<_> = { network_values .par_iter() .flat_map_iter(|from| { let mut pings = Vec::new(); let requests = from .gossip .new_pull_request( thread_pool, from.keypair.deref(), 0, // shred version. now, None, &HashMap::new(), cluster_info::MAX_BLOOM_SIZE, from.ping_cache.deref(), &mut pings, &SocketAddrSpace::Unspecified, ) .unwrap_or_default(); let from_pubkey = from.keypair.pubkey(); let label = CrdsValueLabel::LegacyContactInfo(from_pubkey); let gossip_crds = from.gossip.crds.read().unwrap(); let self_info = gossip_crds.get::<&CrdsValue>(&label).unwrap().clone(); requests .into_iter() .map(move |(peer, filters)| (*peer.pubkey(), filters, self_info.clone())) }) .collect() }; let transfered: Vec<_> = requests .into_iter() .map(|(to, filters, caller_info)| { let mut bytes: usize = 0; let mut msgs: usize = 0; let mut overhead: usize = 0; let from = caller_info.label().pubkey(); bytes += filters.iter().map(|f| f.filter.keys.len()).sum::(); bytes += filters .iter() .map(|f| f.filter.bits.len() as usize / 8) .sum::(); bytes += serialized_size(&caller_info).unwrap() as usize; let filters: Vec<_> = filters .into_iter() .map(|f| (caller_info.clone(), f)) .collect(); let rsp: Vec<_> = network .get(&to) .map(|node| { let rsp = node .gossip .generate_pull_responses( thread_pool, &filters, usize::MAX, // output_size_limit now, &GossipStats::default(), ) .into_iter() .flatten() .collect(); node.gossip.process_pull_requests( filters.into_iter().map(|(caller, _)| caller), now, ); rsp }) .unwrap(); bytes += serialized_size(&rsp).unwrap() as usize; msgs += rsp.len(); if let Some(node) = network.get(&from) { let mut stats = ProcessPullStats::default(); let timeouts = CrdsTimeouts::new( node.keypair.pubkey(), CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, // default_timeout Duration::from_secs(48 * 3600), // epoch_duration &stakes, ); let (vers, vers_expired_timeout, failed_inserts) = node .gossip .filter_pull_responses(&timeouts, rsp, now, &mut stats); node.gossip.process_pull_responses( vers, vers_expired_timeout, failed_inserts, now, &mut stats, ); overhead += stats.failed_insert; overhead += stats.failed_timeout; } (bytes, msgs, overhead) }) .collect(); for (b, m, o) in transfered { bytes += b; msgs += m; overhead += o; } let total: usize = network_values .par_iter() .map(|v| v.gossip.crds.read().unwrap().len()) .sum(); convergance = total as f64 / ((num * num) as f64); if convergance > max_convergance { break; } trace!( "network_run_pull_{}: now: {} connections: {} convergance: {} bytes: {} msgs: {} overhead: {}", num, now, total, convergance, bytes, msgs, overhead ); } (convergance, bytes) } fn build_gossip_thread_pool() -> ThreadPool { ThreadPoolBuilder::new() .num_threads(get_thread_count().min(2)) .thread_name(|i| format!("gossipTest{i:02}")) .build() .unwrap() } fn new_ping_cache() -> Mutex { let ping_cache = PingCache::new( Duration::from_secs(20 * 60), // ttl Duration::from_secs(20 * 60) / 64, // rate_limit_delay 2048, // capacity ); Mutex::new(ping_cache) } #[test] #[serial] fn test_star_network_pull_50() { let network = star_network_create(50); let thread_pool = build_gossip_thread_pool(); network_simulator_pull_only(&thread_pool, &network); } #[test] #[serial] fn test_star_network_pull_100() { let network = star_network_create(100); let thread_pool = build_gossip_thread_pool(); network_simulator_pull_only(&thread_pool, &network); } #[test] #[serial] fn test_star_network_push_star_200() { let mut network = star_network_create(200); let thread_pool = build_gossip_thread_pool(); network_simulator(&thread_pool, &mut network, 0.9); } #[ignore] #[test] fn test_star_network_push_rstar_200() { let mut network = rstar_network_create(200); let thread_pool = build_gossip_thread_pool(); network_simulator(&thread_pool, &mut network, 0.9); } #[test] #[serial] fn test_star_network_push_ring_200() { let mut network = ring_network_create(200); let thread_pool = build_gossip_thread_pool(); network_simulator(&thread_pool, &mut network, 0.9); } // With the new pruning logic, this test is no longer valid and can be deleted. // Ignoring it for now until the pruning code is stable. #[test] #[ignore] #[serial] fn test_connected_staked_network() { miraland_logger::setup(); let thread_pool = build_gossip_thread_pool(); let stakes = [ [1000; 2].to_vec(), [100; 3].to_vec(), [10; 5].to_vec(), [1; 15].to_vec(), ] .concat(); let mut network = connected_staked_network_create(&stakes); network_simulator(&thread_pool, &mut network, 1.0); let stake_sum: u64 = stakes.iter().sum(); let avg_stake: u64 = stake_sum / stakes.len() as u64; let avg_stake_pruned = network.stake_pruned / network.connections_pruned.len() as u64; trace!( "connected staked networks, connections_pruned: {}, avg_stake: {}, avg_stake_pruned: {}", network.connections_pruned.len(), avg_stake, avg_stake_pruned ); assert!( avg_stake_pruned < avg_stake, "network should prune lower stakes more often" ) } #[test] #[ignore] fn test_star_network_large_pull() { miraland_logger::setup(); let network = star_network_create(2000); let thread_pool = build_gossip_thread_pool(); network_simulator_pull_only(&thread_pool, &network); } #[test] #[ignore] fn test_rstar_network_large_push() { miraland_logger::setup(); let mut network = rstar_network_create(4000); let thread_pool = build_gossip_thread_pool(); network_simulator(&thread_pool, &mut network, 0.9); } #[test] #[ignore] fn test_ring_network_large_push() { miraland_logger::setup(); let mut network = ring_network_create(4001); let thread_pool = build_gossip_thread_pool(); network_simulator(&thread_pool, &mut network, 0.9); } #[test] #[ignore] fn test_star_network_large_push() { miraland_logger::setup(); let mut network = star_network_create(4002); let thread_pool = build_gossip_thread_pool(); network_simulator(&thread_pool, &mut network, 0.9); } #[test] fn test_prune_errors() { let crds_gossip = CrdsGossip::default(); let keypair = Keypair::new(); let id = keypair.pubkey(); let ci = ContactInfo::new_localhost(&Pubkey::from([1; 32]), 0); let prune_pubkey = Pubkey::from([2; 32]); crds_gossip .crds .write() .unwrap() .insert( CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ci.clone())), 0, GossipRoute::LocalMessage, ) .unwrap(); let ping_cache = new_ping_cache(); crds_gossip.refresh_push_active_set( &keypair, 0, // shred version &HashMap::new(), // stakes None, // gossip validators &ping_cache, &mut Vec::new(), // pings &SocketAddrSpace::Unspecified, ); let now = timestamp(); let stakes = HashMap::::default(); //incorrect dest let mut res = crds_gossip.process_prune_msg( &id, // self_pubkey ci.pubkey(), // peer &Pubkey::from(hash(&[1; 32]).to_bytes()), // destination &[prune_pubkey], // origins now, now, &stakes, ); assert_eq!(res.err(), Some(CrdsGossipError::BadPruneDestination)); //correct dest res = crds_gossip.process_prune_msg( &id, // self_pubkey ci.pubkey(), // peer &id, // destination &[prune_pubkey], // origins now, now, &stakes, ); res.unwrap(); //test timeout let timeout = now + crds_gossip.push.prune_timeout * 2; res = crds_gossip.process_prune_msg( &id, // self_pubkey ci.pubkey(), // peer &id, // destination &[prune_pubkey], // origins now, timeout, &stakes, ); assert_eq!(res.err(), Some(CrdsGossipError::PruneMessageTimeout)); }