use bpcon::config::BPConConfig; use bpcon::error::LaunchBallotError; use bpcon::leader::{DefaultLeaderElector, LeaderElector}; use bpcon::message::{Message1bContent, MessagePacket}; use bpcon::test_mocks::{MockParty, MockValue, MockValueSelector}; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::task::JoinHandle; use tokio::time::{sleep, Duration, Instant}; use futures::future::join_all; /// Here each party/receiver/sender shall correspond at equal indexes. type PartiesWithChannels = ( Vec, Vec>, Vec>, ); /// Create test parties with predefined generics, based on config. fn create_parties(cfg: BPConConfig) -> PartiesWithChannels { (0..cfg.party_weights.len()) .map(|i| { MockParty::new( i as u64, cfg.clone(), MockValueSelector, Box::new(DefaultLeaderElector::new()), ) }) .fold( (Vec::new(), Vec::new(), Vec::new()), |(mut parties, mut receivers, mut senders), (p, r, s)| { parties.push(p); receivers.push(r); senders.push(s); (parties, receivers, senders) }, ) } /// Begin ballot process for each party. fn launch_parties( parties: Vec, ) -> Vec>> { parties .into_iter() .map(|mut party| tokio::spawn(async move { party.launch_ballot().await })) .collect() } /// Collect messages from receivers. fn collect_messages(receivers: &mut [UnboundedReceiver]) -> Vec { receivers .iter_mut() .filter_map(|receiver| receiver.try_recv().ok()) .collect() } /// Broadcast collected messages to other parties, skipping the sender. fn broadcast_messages(messages: Vec, senders: &[UnboundedSender]) { messages.iter().for_each(|msg| { senders .iter() .enumerate() .filter(|(i, _)| msg.routing.sender != *i as u64) // Skip the current party (sender). .for_each(|(_, sender_into)| { sender_into.send(msg.clone()).unwrap(); }); }); } /// Propagate messages peer-to-peer between parties using their channels. fn propagate_p2p( mut receivers: Vec>, senders: Vec>, ) -> JoinHandle<()> { tokio::spawn(async move { loop { let messages = collect_messages(receivers.as_mut_slice()); broadcast_messages(messages, &senders); // Reduce processor load. sleep(Duration::from_millis(1)).await; } }) } /// Await completion of each party's process and aggregate results. async fn await_results( tasks: Vec>>, ) -> Vec> { join_all(tasks) .await .into_iter() .map(|res| res.unwrap()) .collect() } /// Assert consensus reached and log all errors. fn analyze_ballot(results: Vec>) { // Partition the results into successful values and errors. let (successful, errors): (Vec<_>, Vec<_>) = results.into_iter().partition(|res| res.is_ok()); // Log errors if any. if !errors.is_empty() { for err in errors.into_iter() { eprintln!("Error during ballot: {:?}", err.unwrap_err()); } } if successful.is_empty() { panic!("No consensus, all parties failed."); } // Extract the successful values. let values: Vec = successful.into_iter().map(|res| res.unwrap()).collect(); // Check if we reached consensus: all values should be the same. if let Some((first_value, rest)) = values.split_first() { let all_agreed = rest.iter().all(|v| v == first_value); assert!( all_agreed, "No consensus, different values found: {:?}", values ); println!("Consensus reached with value: {:?}", first_value); } } /// Run ballot on given parties, simulating faulty behavior for given `faulty_ids`. async fn run_ballot_faulty_party( parties: PartiesWithChannels, faulty_ids: Vec, ) -> Vec> { let (mut parties, mut receivers, mut senders) = parties; // Simulate failure excluding faulty parties from all processes: for id in faulty_ids { parties.remove(id); receivers.remove(id); senders.remove(id); } let ballot_tasks = launch_parties(parties); let p2p_task = propagate_p2p(receivers, senders); let results = await_results(ballot_tasks).await; p2p_task.abort(); results } #[tokio::test] async fn test_ballot_happy_case() { let (parties, receivers, senders) = create_parties(BPConConfig::default()); let ballot_tasks = launch_parties(parties); let p2p_task = propagate_p2p(receivers, senders); let results = await_results(ballot_tasks).await; p2p_task.abort(); analyze_ballot(results); } #[tokio::test] async fn test_ballot_faulty_party_common() { let parties = create_parties(BPConConfig::default()); let elector = DefaultLeaderElector::new(); let leader = elector.elect_leader(&parties.0[0]).unwrap(); let faulty_ids: Vec = vec![3]; for id in faulty_ids.iter() { assert_ne!( *id as u64, leader, "Should not fail the leader for the test to pass" ); } let results = run_ballot_faulty_party(parties, faulty_ids).await; analyze_ballot(results); } #[tokio::test] async fn test_ballot_faulty_party_leader() { let parties = create_parties(BPConConfig::default()); let elector = DefaultLeaderElector::new(); let leader = elector.elect_leader(&parties.0[0]).unwrap(); let faulty_ids = vec![leader as usize]; let results = run_ballot_faulty_party(parties, faulty_ids).await; assert!( results.into_iter().all(|res| res.is_err()), "All parties should have failed having a faulty leader in the consensus." ); } #[tokio::test] async fn test_ballot_malicious_party() { let (parties, mut receivers, senders) = create_parties(BPConConfig::default()); let elector = DefaultLeaderElector::new(); let leader = elector.elect_leader(&parties[0]).unwrap(); const MALICIOUS_PARTY_ID: u64 = 2; assert_ne!( MALICIOUS_PARTY_ID, leader, "Should not make malicious the leader for the test to pass" ); // We will be simulating malicious behaviour // sending 1b message (meaning, 5/6 times at incorrect stage) with the wrong data. let content = &Message1bContent { ballot: parties[0].ballot() + 1, // divergent ballot number last_ballot_voted: Some(parties[0].ballot() + 1), // early ballot number // shouldn't put malformed serialized value, because we won't be able to pack it last_value_voted: None, }; let malicious_msg = content.pack(MALICIOUS_PARTY_ID).unwrap(); let ballot_tasks = launch_parties(parties); let p2p_task = tokio::spawn(async move { // It is responsibility of the external to party code - p2p module // to rate-limit channels, because otherwise malicious // actors would be able to DDoS ballot, bloating all the channel with malicious ones. // For this test to pass, we will send malicious messages once in a while. let mut last_malicious_message_time = Instant::now(); let malicious_message_interval = Duration::from_millis(100); loop { // Collect all messages first. let mut messages: Vec<_> = receivers .iter_mut() .enumerate() .filter_map(|(i, receiver)| { // Skip receiving messages from the malicious party // to substitute it with invalid one to be propagated. (i != MALICIOUS_PARTY_ID as usize) .then(|| receiver.try_recv().ok()) .flatten() }) .collect(); // Push the malicious message at intervals. if last_malicious_message_time.elapsed() >= malicious_message_interval { messages.push(malicious_msg.clone()); last_malicious_message_time = Instant::now(); } broadcast_messages(messages, &senders); // Delay to simulate network latency. sleep(Duration::from_millis(100)).await; } }); let results = await_results(ballot_tasks).await; p2p_task.abort(); analyze_ballot(results); } #[tokio::test] #[ignore = "takes 20 secs to run, launch manually"] async fn test_ballot_many_parties() { const AMOUNT_OF_PARTIES: usize = 999; let party_weights = vec![1; AMOUNT_OF_PARTIES]; let threshold = BPConConfig::compute_bft_threshold(party_weights.clone()); let cfg = BPConConfig { party_weights, threshold, launch_at: Instant::now(), launch1a_timeout: Duration::from_secs(0), // 1a's and 2a's are sent only by leader launch1b_timeout: Duration::from_secs(1), // meaning we need to wait less. launch2a_timeout: Duration::from_secs(5), launch2av_timeout: Duration::from_secs(7), launch2b_timeout: Duration::from_secs(12), finalize_timeout: Duration::from_secs(19), grace_period: Duration::from_secs(0), }; let (parties, receivers, senders) = create_parties(cfg); let ballot_tasks = launch_parties(parties); let p2p_task = propagate_p2p(receivers, senders); let results = await_results(ballot_tasks).await; p2p_task.abort(); analyze_ballot(results); } #[tokio::test] async fn test_ballot_max_weight() { let weights = vec![u64::MAX, 1]; let threshold = BPConConfig::compute_bft_threshold(weights.clone()); let cfg = BPConConfig::with_default_timeouts(weights, threshold); let (parties, receivers, senders) = create_parties(cfg); let ballot_tasks = launch_parties(parties); let p2p_task = propagate_p2p(receivers, senders); let results = await_results(ballot_tasks).await; p2p_task.abort(); analyze_ballot(results); } #[tokio::test] async fn test_ballot_weights_underflow() { let weights = vec![100, 1, 2, 3, 4]; let threshold = BPConConfig::compute_bft_threshold(weights.clone()); let cfg = BPConConfig::with_default_timeouts(weights, threshold); let (parties, receivers, senders) = create_parties(cfg); let ballot_tasks = launch_parties(parties); let p2p_task = propagate_p2p(receivers, senders); let results = await_results(ballot_tasks).await; p2p_task.abort(); analyze_ballot(results); }