| Crates.io | hydro2-async-scheduler |
| lib.rs | hydro2-async-scheduler |
| version | 0.1.0 |
| created_at | 2025-02-28 06:44:37.39931+00 |
| updated_at | 2025-02-28 06:44:37.39931+00 |
| description | An asynchronous DAG-based scheduling framework for parallel operator execution. |
| homepage | |
| repository | https://github.com/klebs6/klebs-general |
| max_upload_size | |
| id | 1572526 |
| size | 498,667 |
This crate provides a fully asynchronous, concurrency‐aware scheduling framework for running directed acyclic graphs of operators (a "network"). The scheduler can handle a variety of strategies (immediate, wave‐based, or threshold chunking) for orchestrating node execution in parallel. Under the hood, each operator in the network is packaged into a TaskItem that is submitted to a worker pool. Edges define dependencies: once all parents of a node are finished, that node becomes ready and is eventually freed for execution.
Create and validate a Network<T>.
Each node has an operator that implements an async execute method, and edges define the flow of data or dependencies.
Configure an AsyncSchedulerConfig, specifying:
max_parallelism).Immediate, Wave, or Threshold).enable_streaming).Construct an AsyncScheduler with AsyncScheduler::with_config(...).
Call execute_network(...) with your network wrapped in an Arc<AsyncMutex<...>>.
This returns a tuple of (PerformanceStats, Option<StreamingOutput<T>>) on success.
Use the streaming channel (if enabled) to read node output data in real time.
Gather performance statistics and/or do further processing upon completion.
Below is a complete Rust test function demonstrating a minimal usage of this crate’s scheduler.
Because this is a parallel system, we use #[tokio::test] (rather than #[traced_test]) to allow for multi‐threaded concurrency testing.
#[tokio::test]
pub async fn should_execute_minimal_network_parallel() -> Result<(), Box<dyn std::error::Error>> {
use std::sync::Arc;
use hydro2_network::{Network, NetworkError};
use hydro2_operator::NoOpOperator;
use tokio::sync::Mutex as AsyncMutex;
use hydro2_async_scheduler::{
AsyncScheduler, AsyncSchedulerConfigBuilder,
BatchingStrategy, SharedCompletedNodes
};
// 1) Build a minimal network
let mut network = Network::default();
network.nodes_mut().push(
// Single node with a NoOp operator
hydro2_network::node![0 => NoOpOperator::default()]
);
// 2) Wrap it in Arc<AsyncMutex<...>>
let shared_network = Arc::new(AsyncMutex::new(network));
// 3) Prepare config
let cfg = AsyncSchedulerConfigBuilder::default()
.max_parallelism(4_usize)
.batching_strategy(BatchingStrategy::Immediate)
.enable_streaming(false)
.build()
.map_err(|_| NetworkError::AsyncSchedulerConfigBuilderFailure)?;
// 4) Build scheduler
let scheduler = AsyncScheduler::with_config(cfg);
// 5) Execute
let (perf_stats, maybe_stream) = scheduler.execute_network(shared_network)?;
assert!(maybe_stream.is_none(), "Streaming was disabled, but got a stream!");
println!("Performance stats: {:?}", perf_stats);
// 6) Verify completion
// In a real DAG with multiple nodes, we’d check the SharedCompletedNodes or other state.
println!("Test complete: minimal network executed without errors.");
Ok(())
}
execute method, typically through an operator implementing the Operator trait.NetworkError.tracing system for rich logging. Logs are sprinkled throughout the worker and aggregator logic.Distributed under the OGPv1 License (see ogp-license-text crate for more details).
This crate is developed at:
https://github.com/klebs6/klebs-general