#![allow(clippy::arithmetic_side_effects)] #[macro_use] extern crate log; use { miraland_gossip::{ cluster_info::{ClusterInfo, Node}, contact_info::{LegacyContactInfo as ContactInfo, Protocol}, crds::Cursor, gossip_service::GossipService, }, miraland_perf::packet::Packet, miraland_streamer::{ sendmmsg::{multi_target_send, SendPktsError}, socket::SocketAddrSpace, }, rayon::iter::*, miraland_runtime::bank_forks::BankForks, miraland_sdk::{ hash::Hash, pubkey::Pubkey, signature::{Keypair, Signer}, timing::timestamp, transaction::Transaction, }, miraland_vote_program::{vote_instruction, vote_state::Vote}, std::{ net::UdpSocket, sync::{ atomic::{AtomicBool, Ordering}, Arc, RwLock, }, thread::sleep, time::Duration, }, }; fn test_node(exit: Arc) -> (Arc, GossipService, UdpSocket) { let keypair = Arc::new(Keypair::new()); let mut test_node = Node::new_localhost_with_pubkey(&keypair.pubkey()); let cluster_info = Arc::new(ClusterInfo::new( test_node.info.clone(), keypair, SocketAddrSpace::Unspecified, )); let gossip_service = GossipService::new( &cluster_info, None, test_node.sockets.gossip, None, true, // should_check_duplicate_instance None, exit, ); let _ = cluster_info.my_contact_info(); ( cluster_info, gossip_service, test_node.sockets.tvu.pop().unwrap(), ) } fn test_node_with_bank( node_keypair: Arc, exit: Arc, bank_forks: Arc>, ) -> (Arc, GossipService, UdpSocket) { let mut test_node = Node::new_localhost_with_pubkey(&node_keypair.pubkey()); let cluster_info = Arc::new(ClusterInfo::new( test_node.info.clone(), node_keypair, SocketAddrSpace::Unspecified, )); let gossip_service = GossipService::new( &cluster_info, Some(bank_forks), test_node.sockets.gossip, None, true, // should_check_duplicate_instance None, exit, ); let _ = cluster_info.my_contact_info(); ( cluster_info, gossip_service, test_node.sockets.tvu.pop().unwrap(), ) } /// Test that the network converges. /// Run until every node in the network has a full ContactInfo set. /// Check that nodes stop sending updates after all the ContactInfo has been shared. /// tests that actually use this function are below fn run_gossip_topo(num: usize, topo: F) where F: Fn(&Vec<(Arc, GossipService, UdpSocket)>), { let exit = Arc::new(AtomicBool::new(false)); let listen: Vec<_> = (0..num).map(|_| test_node(exit.clone())).collect(); topo(&listen); let mut done = false; for i in 0..(num * 32) { let total: usize = listen.iter().map(|v| v.0.gossip_peers().len()).sum(); if (total + num) * 10 > num * num * 9 { done = true; break; } else { trace!("not converged {} {} {}", i, total + num, num * num); } sleep(Duration::from_secs(1)); } exit.store(true, Ordering::Relaxed); for (_, dr, _) in listen { dr.join().unwrap(); } assert!(done); } /// retransmit messages to a list of nodes fn retransmit_to( peers: &[&ContactInfo], data: &[u8], socket: &UdpSocket, forwarded: bool, socket_addr_space: &SocketAddrSpace, ) { trace!("retransmit orders {}", peers.len()); let dests: Vec<_> = if forwarded { peers .iter() .filter_map(|peer| peer.tvu(Protocol::UDP).ok()) .filter(|addr| socket_addr_space.check(addr)) .collect() } else { peers .iter() .filter_map(|peer| peer.tvu(Protocol::UDP).ok()) .filter(|addr| socket_addr_space.check(addr)) .collect() }; match multi_target_send(socket, data, &dests) { Ok(()) => (), Err(SendPktsError::IoError(ioerr, num_failed)) => { error!( "retransmit_to multi_target_send error: {:?}, {}/{} packets failed", ioerr, num_failed, dests.len(), ); } } } /// ring a -> b -> c -> d -> e -> a #[test] fn gossip_ring() { miraland_logger::setup(); run_gossip_topo(50, |listen| { let num = listen.len(); for n in 0..num { let y = n % listen.len(); let x = (n + 1) % listen.len(); let yv = &listen[y].0; let mut d = yv.lookup_contact_info(&yv.id(), |ci| ci.clone()).unwrap(); d.set_wallclock(timestamp()); listen[x].0.insert_legacy_info(d); } }); } /// ring a -> b -> c -> d -> e -> a #[test] #[ignore] fn gossip_ring_large() { miraland_logger::setup(); run_gossip_topo(600, |listen| { let num = listen.len(); for n in 0..num { let y = n % listen.len(); let x = (n + 1) % listen.len(); let yv = &listen[y].0; let mut d = yv.lookup_contact_info(&yv.id(), |ci| ci.clone()).unwrap(); d.set_wallclock(timestamp()); listen[x].0.insert_legacy_info(d); } }); } /// star a -> (b,c,d,e) #[test] fn gossip_star() { miraland_logger::setup(); run_gossip_topo(10, |listen| { let num = listen.len(); for n in 0..(num - 1) { let x = 0; let y = (n + 1) % listen.len(); let yv = &listen[y].0; let mut yd = yv.lookup_contact_info(&yv.id(), |ci| ci.clone()).unwrap(); yd.set_wallclock(timestamp()); let xv = &listen[x].0; xv.insert_legacy_info(yd); trace!("star leader {}", &xv.id()); } }); } /// rstar a <- (b,c,d,e) #[test] fn gossip_rstar() { miraland_logger::setup(); run_gossip_topo(10, |listen| { let num = listen.len(); let xd = { let xv = &listen[0].0; xv.lookup_contact_info(&xv.id(), |ci| ci.clone()).unwrap() }; trace!("rstar leader {}", xd.pubkey()); for n in 0..(num - 1) { let y = (n + 1) % listen.len(); let yv = &listen[y].0; yv.insert_legacy_info(xd.clone()); trace!("rstar insert {} into {}", xd.pubkey(), yv.id()); } }); } #[test] pub fn cluster_info_retransmit() { miraland_logger::setup(); let exit = Arc::new(AtomicBool::new(false)); trace!("c1:"); let (c1, dr1, tn1) = test_node(exit.clone()); trace!("c2:"); let (c2, dr2, tn2) = test_node(exit.clone()); trace!("c3:"); let (c3, dr3, tn3) = test_node(exit.clone()); let c1_contact_info = c1.my_contact_info(); c2.insert_info(c1_contact_info.clone()); c3.insert_info(c1_contact_info); let num = 3; //wait to converge trace!("waiting to converge:"); let mut done = false; for _ in 0..30 { done = c1.gossip_peers().len() == num - 1 && c2.gossip_peers().len() == num - 1 && c3.gossip_peers().len() == num - 1; if done { break; } sleep(Duration::from_secs(1)); } assert!(done); let mut p = Packet::default(); p.meta_mut().size = 10; let peers = c1.tvu_peers(); let retransmit_peers: Vec<_> = peers.iter().collect(); retransmit_to( &retransmit_peers, p.data(..).unwrap(), &tn1, false, &SocketAddrSpace::Unspecified, ); let res: Vec<_> = [tn1, tn2, tn3] .into_par_iter() .map(|s| { let mut p = Packet::default(); s.set_read_timeout(Some(Duration::from_secs(1))).unwrap(); let res = s.recv_from(p.buffer_mut()); res.is_err() //true if failed to receive the retransmit packet }) .collect(); //true if failed receive the retransmit packet, r2, and r3 should succeed //r1 was the sender, so it should fail to receive the packet assert_eq!(res, [true, false, false]); exit.store(true, Ordering::Relaxed); dr1.join().unwrap(); dr2.join().unwrap(); dr3.join().unwrap(); } #[test] #[ignore] pub fn cluster_info_scale() { use { miraland_measure::measure::Measure, miraland_perf::test_tx::test_tx, miraland_runtime::{ bank::Bank, genesis_utils::{create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs}, }, }; miraland_logger::setup(); let exit = Arc::new(AtomicBool::new(false)); let num_nodes: usize = std::env::var("NUM_NODES") .unwrap_or_else(|_| "10".to_string()) .parse() .expect("could not parse NUM_NODES as a number"); let vote_keypairs: Vec<_> = (0..num_nodes) .map(|_| ValidatorVoteKeypairs::new_rand()) .collect(); let genesis_config_info = create_genesis_config_with_vote_accounts( 10_000, &vote_keypairs, vec![100; vote_keypairs.len()], ); let bank0 = Bank::new_for_tests(&genesis_config_info.genesis_config); let bank_forks = BankForks::new_rw_arc(bank0); let nodes: Vec<_> = vote_keypairs .into_iter() .map(|keypairs| { test_node_with_bank( Arc::new(keypairs.node_keypair), exit.clone(), bank_forks.clone(), ) }) .collect(); let ci0 = nodes[0].0.my_contact_info(); for node in &nodes[1..] { node.0.insert_info(ci0.clone()); } let mut time = Measure::start("time"); let mut done; let mut success = false; for _ in 0..30 { done = true; for (i, node) in nodes.iter().enumerate() { warn!("node {} peers: {}", i, node.0.gossip_peers().len()); if node.0.gossip_peers().len() != num_nodes - 1 { done = false; break; } } if done { success = true; break; } sleep(Duration::from_secs(1)); } time.stop(); warn!("found {} nodes in {} success: {}", num_nodes, time, success); for num_votes in 1..1000 { let mut time = Measure::start("votes"); let tx = test_tx(); warn!("tx.message.account_keys: {:?}", tx.message.account_keys); let vote = Vote::new( vec![1, 3, num_votes + 5], // slots Hash::default(), ); let ix = vote_instruction::vote( &Pubkey::new_unique(), // vote_pubkey &Pubkey::new_unique(), // authorized_voter_pubkey vote, ); let tx = Transaction::new_with_payer( &[ix], // instructions None, // payer ); let tower = vec![num_votes + 5]; nodes[0].0.push_vote(&tower, tx.clone()); let mut success = false; for _ in 0..(30 * 5) { let mut not_done = 0; let mut num_old = 0; let mut num_push_total = 0; let mut num_pushes = 0; let mut num_pulls = 0; for (node, _, _) in nodes.iter() { //if node.0.get_votes(0).1.len() != (num_nodes * num_votes) { let has_tx = node .get_votes(&mut Cursor::default()) .iter() .filter(|v| v.message.account_keys == tx.message.account_keys) .count(); num_old += node.gossip.push.num_old.load(Ordering::Relaxed); num_push_total += node.gossip.push.num_total.load(Ordering::Relaxed); num_pushes += node.gossip.push.num_pushes.load(Ordering::Relaxed); num_pulls += node.gossip.pull.num_pulls.load(Ordering::Relaxed); if has_tx == 0 { not_done += 1; } } warn!("not_done: {}/{}", not_done, nodes.len()); warn!("num_old: {}", num_old); warn!("num_push_total: {}", num_push_total); warn!("num_pushes: {}", num_pushes); warn!("num_pulls: {}", num_pulls); success = not_done < (nodes.len() / 20); if success { break; } sleep(Duration::from_millis(200)); } time.stop(); warn!( "propagated vote {} in {} success: {}", num_votes, time, success ); sleep(Duration::from_millis(200)); for (node, _, _) in nodes.iter() { node.gossip.push.num_old.store(0, Ordering::Relaxed); node.gossip.push.num_total.store(0, Ordering::Relaxed); node.gossip.push.num_pushes.store(0, Ordering::Relaxed); node.gossip.pull.num_pulls.store(0, Ordering::Relaxed); } } exit.store(true, Ordering::Relaxed); for node in nodes { node.1.join().unwrap(); } }