//! A demonstration of timely dataflow progress tracking, using differential dataflow operators. use timely::PartialOrder; use timely::dataflow::*; use timely::dataflow::operators::probe::Handle; use differential_dataflow::input::Input; use differential_dataflow::Collection; use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; use timely::progress::{Timestamp, Source, Target, Location}; use timely::progress::timestamp::PathSummary; fn main() { timely::execute_from_args(std::env::args(), move |worker| { let timer = worker.timer(); let mut probe = Handle::new(); let (mut nodes, mut edges, mut times) = worker.dataflow::(|scope| { let (node_input, nodes) = scope.new_collection(); let (edge_input, edges) = scope.new_collection(); let (time_input, times) = scope.new_collection(); // Detect cycles that do not increment timestamps. find_cycles::<_,usize>(nodes.clone(), edges.clone()) .inspect(move |x| println!("{:?}\tcycles: {:?}", timer.elapsed(), x)) .probe_with(&mut probe); // Summarize all paths to inputs of operator zero. summarize::<_,usize>(nodes.clone(), edges.clone()) .inspect(move |x| println!("{:?}\tsummary: {:?}", timer.elapsed(), x)) .probe_with(&mut probe); // Track the frontier at each dataflow location. frontier::<_,usize>(nodes, edges, times) .inspect(move |x| println!("{:?}\tfrontier: {:?}", timer.elapsed(), x)) .probe_with(&mut probe); (node_input, edge_input, time_input) }); // A PageRank-like graph, as represented here: // https://github.com/TimelyDataflow/diagnostics/blob/master/examples/pagerank.png nodes.insert((Target::new(2, 0), Source::new(2, 0), 1)); nodes.insert((Target::new(3, 0), Source::new(3, 0), 0)); nodes.insert((Target::new(3, 1), Source::new(3, 0), 0)); nodes.insert((Target::new(4, 0), Source::new(4, 0), 0)); edges.insert((Source::new(1, 0), Target::new(3, 0))); edges.insert((Source::new(3, 0), Target::new(4, 0))); edges.insert((Source::new(4, 0), Target::new(2, 0))); edges.insert((Source::new(2, 0), Target::new(3, 1))); // Initially no capabilities. nodes.advance_to(1); nodes.flush(); edges.advance_to(1); edges.flush(); times.advance_to(1); times.flush(); while probe.less_than(times.time()) { worker.step(); } // Introduce a new input capability at time zero. times.insert((Location::new_source(1, 0), 0)); nodes.advance_to(2); nodes.flush(); edges.advance_to(2); edges.flush(); times.advance_to(2); times.flush(); while probe.less_than(times.time()) { worker.step(); } // Remove input capability and produce a message. times.remove((Location::new_source(1, 0), 0)); times.insert((Location::new_target(3, 0), 0)); nodes.advance_to(3); nodes.flush(); edges.advance_to(3); edges.flush(); times.advance_to(3); times.flush(); while probe.less_than(times.time()) { worker.step(); } // Consume the message, and .. do nothing, I guess. times.remove((Location::new_target(3, 0), 0)); nodes.advance_to(4); nodes.flush(); edges.advance_to(4); edges.flush(); times.advance_to(4); times.flush(); while probe.less_than(times.time()) { worker.step(); } println!("finished; elapsed: {:?}", timer.elapsed()); }).unwrap(); } /// Propagates times along a timely dataflow graph. /// /// Timely dataflow graphs are described by nodes with interconnected input and output ports, /// and edges which connect output ports to input ports of what may be other nodes. /// /// A set of times at various locations (input or output ports) could traverse nodes and /// edges to arrive at various other locations. Each location can then track minimal times /// that can reach them: those times not greater than some other time that can reach it. /// /// The computation to determine this, and to maintain it as times change, is an iterative /// computation that propagates times and maintains the minimal elements at each location. fn frontier( nodes: Collection, edges: Collection, times: Collection, ) -> Collection where G::Timestamp: Lattice+Ord, T::Summary: differential_dataflow::ExchangeData, { // Translate node and edge transitions into a common Location to Location edge with an associated Summary. let nodes = nodes.map(|(target, source, summary)| (Location::from(target), (Location::from(source), summary))); let edges = edges.map(|(source, target)| (Location::from(source), (Location::from(target), Default::default()))); let transitions: Collection = nodes.concat(&edges); times .iterate(|reach| { transitions .enter(&reach.scope()) .join_map(&reach, |_from, (dest, summ), time| (dest.clone(), summ.results_in(time))) .flat_map(|(dest, time)| time.map(move |time| (dest, time))) .concat(×.enter(&reach.scope())) .reduce(|_location, input, output: &mut Vec<(T, isize)>| { // retain the lower envelope of times. for (t1, _count1) in input.iter() { if !input.iter().any(|(t2, _count2)| t2.less_than(t1)) { output.push(((*t1).clone(), 1)); } } }) }) .consolidate() } /// Summary paths from locations to operator zero inputs. fn summarize( nodes: Collection, edges: Collection, ) -> Collection where G::Timestamp: Lattice+Ord, T::Summary: differential_dataflow::ExchangeData+std::hash::Hash, { // Start from trivial reachability from each input to itself. let zero_inputs = edges .map(|(_source, target)| Location::from(target)) .filter(|location| location.node == 0) .map(|location| (location, (location, Default::default()))); // Retain node connections along "default" timestamp summaries. let nodes = nodes.map(|(target, source, summary)| (Location::from(source), (Location::from(target), summary))); let edges = edges.map(|(source, target)| (Location::from(target), (Location::from(source), Default::default()))); let transitions: Collection = nodes.concat(&edges); zero_inputs .iterate(|summaries| { transitions .enter(&summaries.scope()) .join_map(summaries, |_middle, (from, summ1), (to, summ2)| (from.clone(), to.clone(), summ1.followed_by(summ2))) .flat_map(|(from, to, summ)| summ.map(move |summ| (from, (to, summ)))) .concat(&zero_inputs.enter(&summaries.scope())) .map(|(from, (to, summary))| ((from, to), summary)) .reduce(|_from_to, input, output| { for (summary, _count) in input.iter() { if !input.iter().any(|(sum2, _count2)| sum2.less_than(*summary)) { output.push(((*summary).clone(), 1)); } } }) .map(|((from, to), summary)| (from, (to, summary))) }) .consolidate() } /// Identifies cycles along paths that do not increment timestamps. fn find_cycles( nodes: Collection, edges: Collection, ) -> Collection where G::Timestamp: Lattice+Ord, T::Summary: differential_dataflow::ExchangeData, { // Retain node connections along "default" timestamp summaries. let nodes = nodes.flat_map(|(target, source, summary)| { if summary == Default::default() { Some((Location::from(target), Location::from(source))) } else { None } }); let edges = edges.map(|(source, target)| (Location::from(source), Location::from(target))); let transitions: Collection = nodes.concat(&edges); // Repeatedly restrict to locations with an incoming path. transitions .iterate(|locations| { let active = locations .map(|(_source, target)| target) .distinct(); transitions .enter(&locations.scope()) .semijoin(&active) }) .consolidate() }