// SPDX-FileCopyrightText: 2023 Thomas Kramer // // SPDX-License-Identifier: GPL-3.0-or-later #[test] fn example_atomic() { use pargraph::prelude::*; use petgraph::data::DataMap; use petgraph::graph::DiGraph; use petgraph::visit::*; use std::sync::atomic::AtomicU32; struct NodeData { distance: AtomicU32, } impl NodeData { fn new(distance: u32) -> Self { Self { distance: AtomicU32::new(distance), } } } // Create a graph like: // src // | // a // / \ // b c // \ / // d let mut g = DiGraph::new(); // Helper function for creating new nodes with default node data. // Initialize the distance to the maximum value. let mut new_node = || g.add_node(NodeData::new(u32::MAX)); // Create some new nodes. let [src, a, b, c, d] = [(); 5].map(|_| new_node()); // Add some edges with weights. g.add_edge(src, a, 1); g.add_edge(a, b, 1); g.add_edge(a, c, 1); g.add_edge(c, d, 2); g.add_edge(b, d, 1); let operator = AtomicDistanceLabelling { source_node: src }; let executor = MultiThreadExecutor::new(); // Create a worklist and add the source node. let wl = FifoWorklist::new_with_local_queues(vec![src].into()); executor.run_readonly(wl, &operator, &g); let get_distance = |n: petgraph::graph::NodeIndex| -> u32 { g.node_weight(n) .unwrap() .distance .load(std::sync::atomic::Ordering::Relaxed) }; // Check the distances. assert_eq!(get_distance(src), 0); assert_eq!(get_distance(a), 1); assert_eq!(get_distance(b), 2); assert_eq!(get_distance(c), 2); assert_eq!(get_distance(d), 3); /// This is our operator. struct AtomicDistanceLabelling { /// The operator needs to recognize the source node. source_node: NodeId, } impl ReadonlyOperator for &AtomicDistanceLabelling where G: GraphBase + IntoEdgesDirected, G: DataMap, { type WorkItem = G::NodeId; fn op( &self, active_node: Self::WorkItem, local_view: LocalGraphView<&G>, mut worklist: impl WorklistPush, ) { let input_edges = local_view.edges_directed(active_node, petgraph::Direction::Incoming); let output_nodes = local_view.neighbors_directed(active_node, petgraph::Direction::Outgoing); let min_distance = if active_node == self.source_node { // By definition the source node has distance 0. Some(0) } else { // Compute the smallest distance to the source via the input edges. input_edges .map(|e| { // Get minimal distance to source at the input node. let d = local_view .node_weight(e.source()) .unwrap() .distance .load(std::sync::atomic::Ordering::Relaxed); // Add edge weight d + *e.weight() }) // Take minimal value .min() }; if let Some(min_distance) = min_distance { // Atomically store the minimal distance value. let prev_min_distance = local_view .node_weight(active_node) .unwrap() .distance .fetch_min(min_distance, std::sync::atomic::Ordering::Relaxed); if prev_min_distance != min_distance { // Minimal distance has improved, need to update the output nodes. output_nodes.for_each(|n| worklist.push(n)); } } } } }