// SPDX-FileCopyrightText: 2023 Thomas Kramer // // SPDX-License-Identifier: GPL-3.0-or-later #[test] #[cfg(not(loom))] fn test_multithread_executor() { use pargraph::prelude::*; use petgraph::data::DataMap; use petgraph::graph::{DiGraph, NodeIndex}; use petgraph::visit::GraphBase; use petgraph::visit::IntoEdgesDirected; use petgraph::visit::IntoNeighborsDirected; use petgraph::visit::{Data, EdgeRef}; struct NodeData { /// Minimal distance to the source node. distance: i32, /// Next hop in the shortest path to the source node. prev: Option, } impl Default for NodeData { fn default() -> Self { Self { distance: i32::MAX, prev: None, } } } // Create a graph like: // a // / \ // b c // \ / // d let mut g = DiGraph::new(); // Helper function for creating new nodes. // Note that the node data is wrapped in a `DataCell`. `DataCell` // implements interior mutability and a sort of read-write lock for multi-threaded access. let mut new_node = || g.add_node(DataCell::new(NodeData::default())); let [a, b, c, d] = [(); 4].map(|_| new_node()); g.add_edge(a, b, 1); g.add_edge(a, c, 1); g.add_edge(c, d, 2); g.add_edge(b, d, 1); let operator = DistanceLabellingOp {}; let executor = MultiThreadExecutor::new(); let wl = FifoWorklist::new_global_queue(vec![a].into()); // A priority queue would be more efficient. executor.run_node_labelling(wl, &operator, &g); let get_distance = |n: petgraph::graph::NodeIndex| -> i32 { g.node_weight(n).unwrap().try_read().unwrap().distance }; assert_eq!(get_distance(a), 0); assert_eq!(get_distance(b), 1); assert_eq!(get_distance(c), 1); assert_eq!(get_distance(d), 2); // Initialize the data of the source node. //g.node_weight_mut(a).unwrap().get_mut().prev = Some(a); // Operator which computes for all nodes the distance to the initial source node. struct DistanceLabellingOp {} impl LabellingOperator for &DistanceLabellingOp where G: GraphBase + IntoEdgesDirected + IntoNeighborsDirected + Data, EdgeWeight = i32> + DataMap, G::NodeId: std::fmt::Debug, { type NodeWeight = NodeData; type WorkItem = G::NodeId; fn op( &self, active_node: Self::WorkItem, local_view: LocalGraphView<&G, FullConflictDetection>, node_data: &mut NodeData, mut worklist: impl WorklistPush, ) -> Result<(), DataConflictErr> { // Acquire read access to all incoming neighbour nodes. // Use SmallVec instead of a Vec could help avoiding allocations. let neighbors_with_data: Result, _> = local_view .edges_directed(active_node, petgraph::Direction::Incoming) .map(|e| { local_view .try_node_weight(e.source()) .expect("node has no data") // Associate the edge with the node data of the other node. .map(|d| (e, d)) }) .collect(); // Abort on data conflicts. This node will be rescheduled. let neighbors_with_data = neighbors_with_data?; // Find the neighbour which provides the shortest path to the source. let closest_neighbor = neighbors_with_data .into_iter() .filter(|(_e, data)| { // Take only initialized nodes. data.prev.is_some() }) // compute distances .map(|(e, data)| (e.source(), data.distance + e.weight())) // find the closest .min_by_key(|(_, dist)| *dist); if let Some((via, distance_to_source)) = closest_neighbor { if distance_to_source < node_data.distance { // Update distance. node_data.distance = distance_to_source; node_data.prev = Some(via); // Expand wavefront to other neighbors. local_view .neighbors_directed(active_node, petgraph::Direction::Outgoing) .filter(|n| n != &via) .for_each(|n| worklist.push(n)); } else { // No improvement. } } else { // No neighbour found with initialized distance. This must be the first node. node_data.distance = 0; node_data.prev = Some(active_node); // Expand wavefront to unprocessed neighbors. local_view .neighbors_directed(active_node, petgraph::Direction::Outgoing) .for_each(|n| worklist.push(n)); }; Ok(()) } } }