use std::collections::{BTreeMap, BTreeSet}; use bdk_bitcoind_rpc::Emitter; use bdk_chain::{ bitcoin::{Address, Amount, Txid}, local_chain::{CheckPoint, LocalChain}, spk_txout::SpkTxOutIndex, Balance, BlockId, IndexedTxGraph, Merge, }; use bdk_testenv::{anyhow, TestEnv}; use bitcoin::{hashes::Hash, Block, OutPoint, ScriptBuf, WScriptHash}; use bitcoincore_rpc::RpcApi; /// Ensure that blocks are emitted in order even after reorg. /// /// 1. Mine 101 blocks. /// 2. Emit blocks from [`Emitter`] and update the [`LocalChain`]. /// 3. Reorg highest 6 blocks. /// 4. Emit blocks from [`Emitter`] and re-update the [`LocalChain`]. #[test] pub fn test_sync_local_chain() -> anyhow::Result<()> { let env = TestEnv::new()?; let network_tip = env.rpc_client().get_block_count()?; let (mut local_chain, _) = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?); let mut emitter = Emitter::new(env.rpc_client(), local_chain.tip(), 0); // Mine some blocks and return the actual block hashes. // Because initializing `ElectrsD` already mines some blocks, we must include those too when // returning block hashes. let exp_hashes = { let mut hashes = (0..=network_tip) .map(|height| env.rpc_client().get_block_hash(height)) .collect::, _>>()?; hashes.extend(env.mine_blocks(101 - network_tip as usize, None)?); hashes }; // See if the emitter outputs the right blocks. while let Some(emission) = emitter.next_block()? { let height = emission.block_height(); let hash = emission.block_hash(); assert_eq!( emission.block_hash(), exp_hashes[height as usize], "emitted block hash is unexpected" ); assert_eq!( local_chain.apply_update(emission.checkpoint,)?, [(height, Some(hash))].into(), "chain update changeset is unexpected", ); } assert_eq!( local_chain .iter_checkpoints() .map(|cp| (cp.height(), cp.hash())) .collect::>(), exp_hashes .iter() .enumerate() .map(|(i, hash)| (i as u32, *hash)) .collect::>(), "final local_chain state is unexpected", ); // Perform reorg. let reorged_blocks = env.reorg(6)?; let exp_hashes = exp_hashes .iter() .take(exp_hashes.len() - reorged_blocks.len()) .chain(&reorged_blocks) .cloned() .collect::>(); // See if the emitter outputs the right blocks. let mut exp_height = exp_hashes.len() - reorged_blocks.len(); while let Some(emission) = emitter.next_block()? { let height = emission.block_height(); let hash = emission.block_hash(); assert_eq!( height, exp_height as u32, "emitted block has unexpected height" ); assert_eq!( hash, exp_hashes[height as usize], "emitted block is unexpected" ); assert_eq!( local_chain.apply_update(emission.checkpoint,)?, if exp_height == exp_hashes.len() - reorged_blocks.len() { bdk_chain::local_chain::ChangeSet { blocks: core::iter::once((height, Some(hash))) .chain((height + 1..exp_hashes.len() as u32).map(|h| (h, None))) .collect(), } } else { [(height, Some(hash))].into() }, "chain update changeset is unexpected", ); exp_height += 1; } assert_eq!( local_chain .iter_checkpoints() .map(|cp| (cp.height(), cp.hash())) .collect::>(), exp_hashes .iter() .enumerate() .map(|(i, hash)| (i as u32, *hash)) .collect::>(), "final local_chain state is unexpected after reorg", ); Ok(()) } /// Ensure that [`EmittedUpdate::into_tx_graph_update`] behaves appropriately for both mempool and /// block updates. /// /// [`EmittedUpdate::into_tx_graph_update`]: bdk_bitcoind_rpc::EmittedUpdate::into_tx_graph_update #[test] fn test_into_tx_graph() -> anyhow::Result<()> { let env = TestEnv::new()?; let addr_0 = env .rpc_client() .get_new_address(None, None)? .assume_checked(); let addr_1 = env .rpc_client() .get_new_address(None, None)? .assume_checked(); let addr_2 = env .rpc_client() .get_new_address(None, None)? .assume_checked(); env.mine_blocks(101, None)?; let (mut chain, _) = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?); let mut indexed_tx_graph = IndexedTxGraph::::new({ let mut index = SpkTxOutIndex::::default(); index.insert_spk(0, addr_0.script_pubkey()); index.insert_spk(1, addr_1.script_pubkey()); index.insert_spk(2, addr_2.script_pubkey()); index }); let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0); while let Some(emission) = emitter.next_block()? { let height = emission.block_height(); let _ = chain.apply_update(emission.checkpoint)?; let indexed_additions = indexed_tx_graph.apply_block_relevant(&emission.block, height); assert!(indexed_additions.is_empty()); } // send 3 txs to a tracked address, these txs will be in the mempool let exp_txids = { let mut txids = BTreeSet::new(); for _ in 0..3 { txids.insert(env.rpc_client().send_to_address( &addr_0, Amount::from_sat(10_000), None, None, None, None, None, None, )?); } txids }; // expect that the next block should be none and we should get 3 txs from mempool { // next block should be `None` assert!(emitter.next_block()?.is_none()); let mempool_txs = emitter.mempool()?; let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs); assert_eq!( indexed_additions .tx_graph .txs .iter() .map(|tx| tx.compute_txid()) .collect::>(), exp_txids, "changeset should have the 3 mempool transactions", ); assert!(indexed_additions.tx_graph.anchors.is_empty()); } // mine a block that confirms the 3 txs let exp_block_hash = env.mine_blocks(1, None)?[0]; let exp_block_height = env.rpc_client().get_block_info(&exp_block_hash)?.height as u32; let exp_anchors = exp_txids .iter() .map({ let anchor = BlockId { height: exp_block_height, hash: exp_block_hash, }; move |&txid| (anchor, txid) }) .collect::>(); // must receive mined block which will confirm the transactions. { let emission = emitter.next_block()?.expect("must get mined block"); let height = emission.block_height(); let _ = chain.apply_update(emission.checkpoint)?; let indexed_additions = indexed_tx_graph.apply_block_relevant(&emission.block, height); assert!(indexed_additions.tx_graph.txs.is_empty()); assert!(indexed_additions.tx_graph.txouts.is_empty()); assert_eq!(indexed_additions.tx_graph.anchors, exp_anchors); } Ok(()) } /// Ensure next block emitted after reorg is at reorg height. /// /// After a reorg, if the last-emitted block height is equal or greater than the reorg height, and /// the fallback height is equal to or lower than the reorg height, the next block/header emission /// should be at the reorg height. /// /// TODO: If the reorg height is lower than the fallback height, how do we find a block height to /// emit that can connect with our receiver chain? #[test] fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> { const EMITTER_START_HEIGHT: usize = 100; const CHAIN_TIP_HEIGHT: usize = 110; let env = TestEnv::new()?; let mut emitter = Emitter::new( env.rpc_client(), CheckPoint::new(BlockId { height: 0, hash: env.rpc_client().get_block_hash(0)?, }), EMITTER_START_HEIGHT as _, ); env.mine_blocks(CHAIN_TIP_HEIGHT, None)?; while emitter.next_header()?.is_some() {} for reorg_count in 1..=10 { let replaced_blocks = env.reorg_empty_blocks(reorg_count)?; let next_emission = emitter.next_header()?.expect("must emit block after reorg"); assert_eq!( ( next_emission.block_height() as usize, next_emission.block_hash() ), replaced_blocks[0], "block emitted after reorg should be at the reorg height" ); while emitter.next_header()?.is_some() {} } Ok(()) } fn process_block( recv_chain: &mut LocalChain, recv_graph: &mut IndexedTxGraph>, block: Block, block_height: u32, ) -> anyhow::Result<()> { recv_chain.apply_update(CheckPoint::from_header(&block.header, block_height))?; let _ = recv_graph.apply_block(block, block_height); Ok(()) } fn sync_from_emitter( recv_chain: &mut LocalChain, recv_graph: &mut IndexedTxGraph>, emitter: &mut Emitter, ) -> anyhow::Result<()> where C: bitcoincore_rpc::RpcApi, { while let Some(emission) = emitter.next_block()? { let height = emission.block_height(); process_block(recv_chain, recv_graph, emission.block, height)?; } Ok(()) } fn get_balance( recv_chain: &LocalChain, recv_graph: &IndexedTxGraph>, ) -> anyhow::Result { let chain_tip = recv_chain.tip().block_id(); let outpoints = recv_graph.index.outpoints().clone(); let balance = recv_graph .graph() .balance(recv_chain, chain_tip, outpoints, |_, _| true); Ok(balance) } /// If a block is reorged out, ensure that containing transactions that do not exist in the /// replacement block(s) become unconfirmed. #[test] fn tx_can_become_unconfirmed_after_reorg() -> anyhow::Result<()> { const PREMINE_COUNT: usize = 101; const ADDITIONAL_COUNT: usize = 11; const SEND_AMOUNT: Amount = Amount::from_sat(10_000); let env = TestEnv::new()?; let mut emitter = Emitter::new( env.rpc_client(), CheckPoint::new(BlockId { height: 0, hash: env.rpc_client().get_block_hash(0)?, }), 0, ); // setup addresses let addr_to_mine = env .rpc_client() .get_new_address(None, None)? .assume_checked(); let spk_to_track = ScriptBuf::new_p2wsh(&WScriptHash::all_zeros()); let addr_to_track = Address::from_script(&spk_to_track, bitcoin::Network::Regtest)?; // setup receiver let (mut recv_chain, _) = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?); let mut recv_graph = IndexedTxGraph::::new({ let mut recv_index = SpkTxOutIndex::default(); recv_index.insert_spk((), spk_to_track.clone()); recv_index }); // mine and sync receiver up to tip env.mine_blocks(PREMINE_COUNT, Some(addr_to_mine))?; // create transactions that are tracked by our receiver for _ in 0..ADDITIONAL_COUNT { let txid = env.send(&addr_to_track, SEND_AMOUNT)?; // lock outputs that send to `addr_to_track` let outpoints_to_lock = env .rpc_client() .get_transaction(&txid, None)? .transaction()? .output .into_iter() .enumerate() .filter(|(_, txo)| txo.script_pubkey == spk_to_track) .map(|(vout, _)| OutPoint::new(txid, vout as _)) .collect::>(); env.rpc_client().lock_unspent(&outpoints_to_lock)?; let _ = env.mine_blocks(1, None)?; } // get emitter up to tip sync_from_emitter(&mut recv_chain, &mut recv_graph, &mut emitter)?; assert_eq!( get_balance(&recv_chain, &recv_graph)?, Balance { confirmed: SEND_AMOUNT * ADDITIONAL_COUNT as u64, ..Balance::default() }, "initial balance must be correct", ); // perform reorgs with different depths for reorg_count in 1..=ADDITIONAL_COUNT { env.reorg_empty_blocks(reorg_count)?; sync_from_emitter(&mut recv_chain, &mut recv_graph, &mut emitter)?; assert_eq!( get_balance(&recv_chain, &recv_graph)?, Balance { confirmed: SEND_AMOUNT * (ADDITIONAL_COUNT - reorg_count) as u64, ..Balance::default() }, "reorg_count: {}", reorg_count, ); } Ok(()) } /// Ensure avoid-re-emission-logic is sound when [`Emitter`] is synced to tip. /// /// The receiver (bdk_chain structures) is synced to the chain tip, and there is txs in the mempool. /// When we call Emitter::mempool multiple times, mempool txs should not be re-emitted, even if the /// chain tip is extended. #[test] fn mempool_avoids_re_emission() -> anyhow::Result<()> { const BLOCKS_TO_MINE: usize = 101; const MEMPOOL_TX_COUNT: usize = 2; let env = TestEnv::new()?; let mut emitter = Emitter::new( env.rpc_client(), CheckPoint::new(BlockId { height: 0, hash: env.rpc_client().get_block_hash(0)?, }), 0, ); // mine blocks and sync up emitter let addr = env .rpc_client() .get_new_address(None, None)? .assume_checked(); env.mine_blocks(BLOCKS_TO_MINE, Some(addr.clone()))?; while emitter.next_header()?.is_some() {} // have some random txs in mempool let exp_txids = (0..MEMPOOL_TX_COUNT) .map(|_| env.send(&addr, Amount::from_sat(2100))) .collect::, _>>()?; // the first emission should include all transactions let emitted_txids = emitter .mempool()? .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(); assert_eq!( emitted_txids, exp_txids, "all mempool txs should be emitted" ); // second emission should be empty assert!( emitter.mempool()?.is_empty(), "second emission should be empty" ); // mine empty blocks + sync up our emitter -> we should still not re-emit for _ in 0..BLOCKS_TO_MINE { env.mine_empty_block()?; } while emitter.next_header()?.is_some() {} assert!( emitter.mempool()?.is_empty(), "third emission, after chain tip is extended, should also be empty" ); Ok(()) } /// Ensure mempool tx is still re-emitted if [`Emitter`] has not reached the tx's introduction /// height. /// /// We introduce a mempool tx after each block, where blocks are empty (does not confirm previous /// mempool txs). Then we emit blocks from [`Emitter`] (intertwining `mempool` calls). We check /// that `mempool` should always re-emit txs that have introduced at a height greater than the last /// emitted block height. #[test] fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()> { const PREMINE_COUNT: usize = 101; const MEMPOOL_TX_COUNT: usize = 21; let env = TestEnv::new()?; let mut emitter = Emitter::new( env.rpc_client(), CheckPoint::new(BlockId { height: 0, hash: env.rpc_client().get_block_hash(0)?, }), 0, ); // mine blocks to get initial balance, sync emitter up to tip let addr = env .rpc_client() .get_new_address(None, None)? .assume_checked(); env.mine_blocks(PREMINE_COUNT, Some(addr.clone()))?; while emitter.next_header()?.is_some() {} // mine blocks to introduce txs to mempool at different heights let tx_introductions = (0..MEMPOOL_TX_COUNT) .map(|_| -> anyhow::Result<_> { let (height, _) = env.mine_empty_block()?; let txid = env.send(&addr, Amount::from_sat(2100))?; Ok((height, txid)) }) .collect::>>()?; assert_eq!( emitter .mempool()? .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(), tx_introductions.iter().map(|&(_, txid)| txid).collect(), "first mempool emission should include all txs", ); assert_eq!( emitter .mempool()? .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(), tx_introductions.iter().map(|&(_, txid)| txid).collect(), "second mempool emission should still include all txs", ); // At this point, the emitter has seen all mempool transactions. It should only re-emit those // that have introduction heights less than the emitter's last-emitted block tip. while let Some(emission) = emitter.next_header()? { let height = emission.block_height(); // We call `mempool()` twice. // The second call (at height `h`) should skip the tx introduced at height `h`. for try_index in 0..2 { let exp_txids = tx_introductions .range((height as usize + try_index, Txid::all_zeros())..) .map(|&(_, txid)| txid) .collect::>(); let emitted_txids = emitter .mempool()? .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(); assert_eq!( emitted_txids, exp_txids, "\n emission {} (try {}) must only contain txs introduced at that height or lower: \n\t missing: {:?} \n\t extra: {:?}", height, try_index, exp_txids .difference(&emitted_txids) .map(|txid| (txid, tx_introductions.iter().find_map(|(h, id)| if id == txid { Some(h) } else { None }).unwrap())) .collect::>(), emitted_txids .difference(&exp_txids) .map(|txid| (txid, tx_introductions.iter().find_map(|(h, id)| if id == txid { Some(h) } else { None }).unwrap())) .collect::>(), ); } } Ok(()) } /// Ensure we force re-emit all mempool txs after reorg. #[test] fn mempool_during_reorg() -> anyhow::Result<()> { const TIP_DIFF: usize = 10; const PREMINE_COUNT: usize = 101; let env = TestEnv::new()?; let mut emitter = Emitter::new( env.rpc_client(), CheckPoint::new(BlockId { height: 0, hash: env.rpc_client().get_block_hash(0)?, }), 0, ); // mine blocks to get initial balance let addr = env .rpc_client() .get_new_address(None, None)? .assume_checked(); env.mine_blocks(PREMINE_COUNT, Some(addr.clone()))?; // introduce mempool tx at each block extension for _ in 0..TIP_DIFF { env.mine_empty_block()?; env.send(&addr, Amount::from_sat(2100))?; } // sync emitter to tip, first mempool emission should include all txs (as we haven't emitted // from the mempool yet) while emitter.next_header()?.is_some() {} assert_eq!( emitter .mempool()? .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(), env.rpc_client() .get_raw_mempool()? .into_iter() .collect::>(), "first mempool emission should include all txs", ); // perform reorgs at different heights, these reorgs will not confirm transactions in the // mempool for reorg_count in 1..TIP_DIFF { env.reorg_empty_blocks(reorg_count)?; // This is a map of mempool txids to tip height where the tx was introduced to the mempool // we recalculate this at every loop as reorgs may evict transactions from mempool. We use // the introduction height to determine whether we expect a tx to appear in a mempool // emission. // TODO: How can have have reorg logic in `TestEnv` NOT blacklast old blocks first? let tx_introductions = dbg!(env .rpc_client() .get_raw_mempool_verbose()? .into_iter() .map(|(txid, entry)| (txid, entry.height as usize)) .collect::>()); // `next_header` emits the replacement block of the reorg if let Some(emission) = emitter.next_header()? { let height = emission.block_height(); // the mempool emission (that follows the first block emission after reorg) should only // include mempool txs introduced at reorg height or greater let mempool = emitter .mempool()? .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(); let exp_mempool = tx_introductions .iter() .filter(|(_, &intro_h)| intro_h >= (height as usize)) .map(|(&txid, _)| txid) .collect::>(); assert_eq!( mempool, exp_mempool, "the first mempool emission after reorg should only include mempool txs introduced at reorg height or greater" ); let mempool = emitter .mempool()? .into_iter() .map(|(tx, _)| tx.compute_txid()) .collect::>(); let exp_mempool = tx_introductions .iter() .filter(|&(_, &intro_height)| intro_height > (height as usize)) .map(|(&txid, _)| txid) .collect::>(); assert_eq!( mempool, exp_mempool, "following mempool emissions after reorg should exclude mempool introduction heights <= last emitted block height: \n\t missing: {:?} \n\t extra: {:?}", exp_mempool .difference(&mempool) .map(|txid| (txid, tx_introductions.get(txid).unwrap())) .collect::>(), mempool .difference(&exp_mempool) .map(|txid| (txid, tx_introductions.get(txid).unwrap())) .collect::>(), ); } // sync emitter to tip while emitter.next_header()?.is_some() {} } Ok(()) } /// If blockchain re-org includes the start height, emit new start height block /// /// 1. mine 101 blocks /// 2. emit blocks 99a, 100a /// 3. invalidate blocks 99a, 100a, 101a /// 4. mine new blocks 99b, 100b, 101b /// 5. emit block 99b /// /// The block hash of 99b should be different than 99a, but their previous block hashes should /// be the same. #[test] fn no_agreement_point() -> anyhow::Result<()> { const PREMINE_COUNT: usize = 101; let env = TestEnv::new()?; // start height is 99 let mut emitter = Emitter::new( env.rpc_client(), CheckPoint::new(BlockId { height: 0, hash: env.rpc_client().get_block_hash(0)?, }), (PREMINE_COUNT - 2) as u32, ); // mine 101 blocks env.mine_blocks(PREMINE_COUNT, None)?; // emit block 99a let block_header_99a = emitter.next_header()?.expect("block 99a header").block; let block_hash_99a = block_header_99a.block_hash(); let block_hash_98a = block_header_99a.prev_blockhash; // emit block 100a let block_header_100a = emitter.next_header()?.expect("block 100a header").block; let block_hash_100a = block_header_100a.block_hash(); // get hash for block 101a let block_hash_101a = env.rpc_client().get_block_hash(101)?; // invalidate blocks 99a, 100a, 101a env.rpc_client().invalidate_block(&block_hash_99a)?; env.rpc_client().invalidate_block(&block_hash_100a)?; env.rpc_client().invalidate_block(&block_hash_101a)?; // mine new blocks 99b, 100b, 101b env.mine_blocks(3, None)?; // emit block header 99b let block_header_99b = emitter.next_header()?.expect("block 99b header").block; let block_hash_99b = block_header_99b.block_hash(); let block_hash_98b = block_header_99b.prev_blockhash; assert_ne!(block_hash_99a, block_hash_99b); assert_eq!(block_hash_98a, block_hash_98b); Ok(()) }