use clap::Parser; use eth_event_stream::{ address, data_feed::block::BlockNotify, sink::{reduce_synced_events, EventReducer, StreamSignature}, stream::StreamFactory, }; use std::collections::HashMap; use std::env; use std::sync::Arc; use tokio::sync::Mutex; use web3::{ transports::Http, types::{Address, Log}, Web3, }; /// Event stream reduce example #[derive(Parser, Debug)] struct Args { // Number of previous blocks to get #[clap(long, default_value_t = 40)] diff_neg: u64, // Number of future blocks to get #[clap(long, default_value_t = 5)] diff_pos: u64, } #[eth_event_macro::event("Transfer(address indexed from, address indexed to, uint value)")] #[derive(Debug)] struct Erc20Transfer {} struct USDCNetFlow { sig: StreamSignature, netflows: HashMap, current_block: Option, } impl EventReducer for USDCNetFlow { fn new(sig: StreamSignature) -> Arc> { Arc::new(Mutex::new(USDCNetFlow { sig, netflows: HashMap::new(), current_block: None, })) } fn reduce(&mut self, block_number: u64, ordered_events: &[Log]) { match ordered_events { [first, ..] => { if self.sig.matches(first) { self.current_block = Some(block_number); // if this is the event pattern that we want let transfer = Erc20Transfer::from(first.clone()); match &transfer.data { (from, to, value) => { // get the flows let from_flow = self.netflows.get(from).unwrap_or(&0).clone(); let to_flow = self.netflows.get(to).unwrap_or(&0).clone(); let value_int = value.as_u128() as i128; // update the flows self.netflows.insert(*from, from_flow - value_int); self.netflows.insert(*to, to_flow + value_int); } } } } _ => (), } } } #[tokio::main] async fn main() -> anyhow::Result<()> { let flags = Args::parse(); let http_url = env::var("HTTP_NODE_URL")?; let ws_url = env::var("WS_NODE_URL")?; let web3 = Web3::new(Http::new(&http_url).unwrap()); let usdc_address = address("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"); let weth_address = address("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"); let cur_block = web3.eth().block_number().await?.as_u64(); let from_block = cur_block - flags.diff_neg; // till the end of time pls let to_block = cur_block + flags.diff_pos; println!( "Going to stream from block {} to {} inclusive", from_block, to_block ); let notify = BlockNotify::new(&http_url, &ws_url).await?; let mut factory = StreamFactory::new(http_url, from_block, to_block, 2, 1000); // make streams let mut usdc_stream = factory .make(usdc_address, Erc20Transfer::event(), notify.subscribe()) .await?; let mut weth_stream = factory .make(weth_address, Erc20Transfer::event(), notify.subscribe()) .await?; // get their signatures let usdc_signature = usdc_stream.signature; let weth_signature = weth_stream.signature; // get their sink let sink = factory.get_sink(); // run the streams on separate threads tokio::spawn(async move { usdc_stream.block_stream().await }); tokio::spawn(async move { weth_stream.block_stream().await }); let reducer = USDCNetFlow::new(usdc_signature); let reducer_in_thread = reducer.clone(); // stream and reduce events from multiple sources in batches of 1 block tokio::spawn( async move { reduce_synced_events(sink, to_block, &vec![reducer_in_thread]).await }, ); let mut new_blocks_sub = notify.subscribe(); while new_blocks_sub.changed().await.is_ok() { tokio::time::sleep(tokio::time::Duration::new(1, 0)).await; let cur_block = new_blocks_sub.borrow().as_u64(); println!("{:<17} <=== Live block", cur_block); let locked = reducer.lock().await; println!( "Live - {:<10} <=== Netflows block", cur_block - locked.current_block.unwrap_or_default() ); println!("{:<17} Addresses", locked.netflows.keys().len()); let positive_flows = locked .netflows .values() .filter(|&a| a > &0i128) .collect::>() .len(); let negative_flows = locked .netflows .values() .filter(|&a| a < &0i128) .collect::>() .len(); println!("{positive_flows:<17} Positive flows"); println!("{negative_flows:<17} Negative flows"); println!(""); } // stream ordered events from multiple sources in batches of 1 block // alternative is to use `stream_synced_blocks` that separates events and // they need to be got by signature // stream_synced_events(sink, to_block, |(number, logs)| async move { // if logs.len() != 0 { // let mut index = logs.first().unwrap().log_index.unwrap(); // for i in 1..logs.len() { // let log = logs.get(i).unwrap(); // if log.log_index.unwrap() <= index { // panic!("events are unordered"); // } // index = log.log_index.unwrap(); // } // } // println!("==> Block {}. Events {}.", number, logs.len()); // }) // .await; // stream_synced_blocks(sink, to_block, |(number, entry)| async move { // let usdc_transfers: Vec = entry // .get(&usdc_signature) // .unwrap() // .iter() // .map(|l| Erc20Transfer::from(l.to_owned())) // .collect(); // let weth_transfers: Vec = entry // .get(&weth_signature) // .unwrap() // .iter() // .map(|l| Erc20Transfer::from(l.to_owned())) // .collect(); // println!( // "==> Block {}. USDC transfers {} || WETH transfers {}", // number, // usdc_transfers.len(), // weth_transfers.len() // ); // // println!("First log {:?}", transfers.first()); // }) // .await; Ok(()) }