//! Simple DBSP example for counting the distribution of degrees in a graph. //! //! This program creates a series of graph edges, then counts the number of //! times each source node appears, then counts the number of times each count //! appears. This is similar to the example for Differential Dataflow at //! . use anyhow::Result; use clap::Parser; use dbsp::{utils::Tup2, OrdIndexedZSet, OutputHandle, Runtime}; type Node = u64; #[derive(Debug, Clone, Parser)] struct Args { /// Number of initial edges in the graph. #[clap(long, default_value = "100")] edges: u64, /// Number of source nodes in the graph. #[clap(long, default_value = "13")] sources: u64, /// Number of extra edges added later to the graph. #[clap(long, default_value = "5")] extra: u64, /// Number of threads. #[clap(long, default_value = "2")] threads: u64, } fn print_changes( degrees: &OutputHandle>, distribution: &OutputHandle>, ) { for (src, outdegree, weight) in degrees.consolidate().iter() { println!(" {weight:+}: Node {src} has out-degree {outdegree}"); } println!(); for (outdegree, count, weight) in distribution.consolidate().iter() { println!(" {weight:+}: {count} nodes have out-degree {outdegree}"); } println!(); } fn main() -> Result<()> { let Args { threads, edges, sources, extra, } = Args::parse(); let (mut dbsp, (hedges, degrees, distribution)) = Runtime::init_circuit(threads as usize, |circuit| { let (edges, hedges) = circuit.add_input_zset::>(); // Count the number of edges with each node as its source (each node's // out-degree). let degrees = edges.map(|Tup2(src, _dst)| *src).weighted_count(); // Count the number of nodes with each out-degree. let distribution = degrees.map(|(_src, count)| *count).weighted_count(); Ok((hedges, degrees.output(), distribution.output())) }) .unwrap(); // Add some initial edges and print the results. for i in 0..edges { hedges.push(Tup2(i % sources, i % 7), 1); } dbsp.step().unwrap(); println!("Initialization:"); print_changes(°rees, &distribution); // Add a few more nodes and print the changes. for i in 0..extra { hedges.push(Tup2(i % sources, i % 9), 1); } dbsp.step().unwrap(); println!("Changes:"); print_changes(°rees, &distribution); dbsp.kill().unwrap(); Ok(()) }