//! Lightwalletd sync functions. use std::{ net::SocketAddr, sync::atomic::{AtomicBool, Ordering}, time::{Duration, Instant}, }; use tempfile::TempDir; use zebra_node_services::rpc_client::RpcRequestClient; use zebra_test::prelude::*; use crate::common::{launch::ZebradTestDirExt, test_type::TestType}; /// The amount of time we wait between each tip check. pub const TIP_CHECK_RATE_LIMIT: Duration = Duration::from_secs(60); /// Wait for lightwalletd to sync to Zebra's tip. /// /// If `wait_for_zebrad_mempool` is `true`, wait for Zebra to activate its mempool. /// If `wait_for_zebrad_tip` is `true`, also wait for Zebra to sync to the network chain tip. #[tracing::instrument] pub fn wait_for_zebrad_and_lightwalletd_sync< P: ZebradTestDirExt + std::fmt::Debug + std::marker::Send + 'static, >( mut lightwalletd: TestChild, lightwalletd_rpc_port: u16, mut zebrad: TestChild

, zebra_rpc_address: SocketAddr, test_type: TestType, wait_for_zebrad_mempool: bool, wait_for_zebrad_tip: bool, ) -> Result<(TestChild, TestChild

)> { let is_zebrad_finished = AtomicBool::new(false); let is_lightwalletd_finished = AtomicBool::new(false); let is_zebrad_finished = &is_zebrad_finished; let is_lightwalletd_finished = &is_lightwalletd_finished; // TODO: split these closures out into their own functions // Check Zebra's logs for errors. // Optionally waits until Zebra has synced to the tip, based on `wait_for_zebrad_tip`. // // `lightwalletd` syncs can take a long time, // so we need to check that `zebrad` has synced to the tip in parallel. let zebrad_mut = &mut zebrad; let zebrad_wait_fn = || -> Result<_> { // When we are near the tip, the mempool should activate at least once if wait_for_zebrad_mempool { tracing::info!( ?test_type, "waiting for zebrad to activate the mempool when it gets near the tip..." ); zebrad_mut.expect_stdout_line_matches("activating mempool")?; } // When we are near the tip, this message is logged multiple times if wait_for_zebrad_tip { tracing::info!(?test_type, "waiting for zebrad to sync to the tip..."); zebrad_mut.expect_stdout_line_matches(crate::common::sync::SYNC_FINISHED_REGEX)?; } // Tell the other thread that Zebra has finished is_zebrad_finished.store(true, Ordering::SeqCst); tracing::info!( ?test_type, "zebrad is waiting for lightwalletd to sync to the tip..." ); while !is_lightwalletd_finished.load(Ordering::SeqCst) { // Just keep checking the Zebra logs for errors... if wait_for_zebrad_tip { // Make sure the sync is still finished, this is logged every minute or so. zebrad_mut.expect_stdout_line_matches(crate::common::sync::SYNC_FINISHED_REGEX)?; } else { // Use sync progress logs, which are logged every minute or so. zebrad_mut.expect_stdout_line_matches(crate::common::sync::SYNC_PROGRESS_REGEX)?; } } Ok(zebrad_mut) }; // Wait until `lightwalletd` has synced to Zebra's tip. // Calls `lightwalletd`'s gRPCs and Zebra's JSON-RPCs. // Also checks `lightwalletd`'s logs for errors. // // `zebrad` syncs can take a long time, // so we need to check that `lightwalletd` has synced to the tip in parallel. let lightwalletd_mut = &mut lightwalletd; let lightwalletd_wait_fn = || -> Result<_> { tracing::info!( ?test_type, "lightwalletd is waiting for zebrad to sync to the tip..." ); while !is_zebrad_finished.load(Ordering::SeqCst) { // Just keep checking the `lightwalletd` logs for errors. // It usually logs something every 30-90 seconds, // but there's no specific message we need to wait for here. assert!( lightwalletd_mut.wait_for_stdout_line(None), "lightwalletd output unexpectedly finished early", ); } tracing::info!(?test_type, "waiting for lightwalletd to sync to the tip..."); while !are_zebrad_and_lightwalletd_tips_synced(zebra_rpc_address, lightwalletd_mut)? { let previous_check = Instant::now(); // To improve performance, only check the tips occasionally while previous_check.elapsed() < TIP_CHECK_RATE_LIMIT { assert!( lightwalletd_mut.wait_for_stdout_line(None), "lightwalletd output unexpectedly finished early", ); } } // Tell the other thread that `lightwalletd` has finished is_lightwalletd_finished.store(true, Ordering::SeqCst); Ok(lightwalletd_mut) }; // Run both threads in parallel, automatically propagating any panics to this thread. std::thread::scope(|s| { // Launch the sync-waiting threads let zebrad_thread = s.spawn(|| { let zebrad_result = zebrad_wait_fn(); is_zebrad_finished.store(true, Ordering::SeqCst); zebrad_result.expect("test failed while waiting for zebrad to sync"); }); let lightwalletd_thread = s.spawn(|| { let lightwalletd_result = lightwalletd_wait_fn(); is_lightwalletd_finished.store(true, Ordering::SeqCst); lightwalletd_result.expect("test failed while waiting for lightwalletd to sync."); }); // Mark the sync-waiting threads as finished if they fail or panic. // This tells the other thread that it can exit. // // TODO: use `panic::catch_unwind()` instead, // when `&mut zebra_test::command::TestChild` is unwind-safe s.spawn(|| { let zebrad_result = zebrad_thread.join(); is_zebrad_finished.store(true, Ordering::SeqCst); zebrad_result.expect("test panicked or failed while waiting for zebrad to sync"); }); s.spawn(|| { let lightwalletd_result = lightwalletd_thread.join(); is_lightwalletd_finished.store(true, Ordering::SeqCst); lightwalletd_result .expect("test panicked or failed while waiting for lightwalletd to sync"); }); }); Ok((lightwalletd, zebrad)) } /// Returns `Ok(true)` if zebrad and lightwalletd are both at the same height. #[tracing::instrument] pub fn are_zebrad_and_lightwalletd_tips_synced( zebra_rpc_address: SocketAddr, lightwalletd: &mut TestChild, ) -> Result { let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() .build()?; rt.block_on(async { // We are going to try getting the current lightwalletd height by reading the lightwalletd logs. let mut lightwalletd_next_height = 1; // Only go forward on getting next height from lightwalletd logs if we find the line we are interested in. // // TODO: move this blocking code out of the async executor. // The executor could block all tasks and futures while this code is running. // That's ok for now, but it might cause test hangs or failures if we spawn tasks, select(), or join(). if let Ok(line) = lightwalletd.expect_stdout_line_matches("Waiting for block: [0-9]+") { let line_json: serde_json::Value = serde_json::from_str(line.as_str()) .expect("captured lightwalletd logs are always valid json"); let msg = line_json["msg"] .as_str() .expect("`msg` field is always a valid string"); // Block number is the last word of the message. We rely on that specific for this to work. let last = msg .split(' ') .last() .expect("always possible to get the last word of a separated by space string"); lightwalletd_next_height = last .parse() .expect("the last word is always the block number so it can be parsed to i32 "); } // The last height in lightwalletd is the one the program is expecting minus one. let lightwalletd_tip_height = (lightwalletd_next_height - 1) as u64; // Get the block tip from zebrad let client = RpcRequestClient::new(zebra_rpc_address); let zebrad_blockchain_info = client .text_from_call("getblockchaininfo", "[]".to_string()) .await?; let zebrad_blockchain_info: serde_json::Value = serde_json::from_str(&zebrad_blockchain_info)?; let zebrad_tip_height = zebrad_blockchain_info["result"]["blocks"] .as_u64() .expect("unexpected block height: doesn't fit in u64"); Ok(lightwalletd_tip_height == zebrad_tip_height) }) }