# Coordinator [![License](https://img.shields.io/badge/license-MIT-blue.svg)]( https://github.com/tachibanayui/coordinator) [![Cargo](https://img.shields.io/crates/v/coordinator.svg)]( https://crates.io/crates/coordinator) [![Documentation](https://docs.rs/coordinator/badge.svg)]( https://docs.rs/coordinator/latest/coordinator/) ## Description Coordinator is a simple library to load balance tasks into task runners that run asynchronously. Each worker added into the coordinator will have a queue to process work unit (or task). Each worker will only process one task at a given time. You can select which worker to process a task by using the following apis: - `TaskPrefs::Any` (`my_coordinator.any()` for `#[coordinator]` macro): This will tell the coordinator to queue with the most available worker - `TaskPrefs::Preferred(worker_id)` (`my_coordinator.prefer(worker_id)` for `#[coordinator]` macro): This will tell the coordinator to queue with worker with id `worker_id` if it's not currently full, otherwise queue the task with any worker. - `TaskPrefs::Required(worker_id)` (`my_coordinator.require(worker_id)` for `#[coordinator]` macro): This will tell the coordinator to queue with worker with id `worker_id`. The coordinator will try to find the most available worker using the average task completion time and the number of task in queue of a worker. ## Table of Contents - [Installation](#installation) - [Usage](#usage) - [Contributing](#contributing) - [License](#license) ## Installation This crate is available on [crates.io](https://crates.io/crates/coordinator). Please visit the link to find the latest version and instructions for installation. ## Usage For full examples, check out `playground/examples` ```rust // Create a worker that sleeps for 1 sec and return a number that double the input struct Doubler(String); impl TaskProcessor for Doubler { type Output = i32; async fn do_work(&mut self, task: i32) -> Self::Output { tokio::time::sleep(Duration::from_secs(1)).await; println!("Task {} computed {}", self.0, task * 2); task * 2 } } // the queue thershold of a single queue, if the number of task item in queue exceeded the thershold // any `TaskPref::Preferred(x)` will be processed by a different task processor. let queue_len = 3; let b = Coordinator::new(queue_len); // Add `Doubler` as task processor b.add_worker("Doubler 1st", Doubler("Doubler 1st".to_string())) .await; // Add a closure as a task processor. Any `FnMut` closure can be used as task processor! b.add_worker("Doubler 2nd", |x| async move { x * 2 }).await; // Schedule a task for processing. The task will be polled to completion in the worker future // and not the current future. The `join_handle` can be used to retrieve the returned value let join_handle = b.run(2, TaskPrefs::Any).await.unwrap(); println!("Task scheduled!"); // Do other works..... // Wait for the task result let rs = join_handle.join().await.unwrap().0; println!("Task result: {}", rs); ``` If your task processors can process different types of tasks (eg: `CalculatorProcessor` can process both `add` and `subtract` tasks), you can use the `#[coordinator]` attribute macro to avoid needing to define your own input and output enums and manually dispatch them when implementing `TaskProcessor` ```rust pub trait InteractableObject { fn size(&self) -> [f32; 3]; fn weight(&self) -> f32; fn set_weight(&mut self, val: f32); } pub struct Ball /* ... */; // implements [`InteractableObject`] pub struct Crystal /* ... */; // implements [`InteractableObject`] // Type alias for not having to type out this long type every time we use it type ArcMut = Arc>>; #[coordinator] pub trait CatFamily where I: InteractableObject + RefUnwindSafe, { fn locate_object(obj: ArcMut) -> Option<[f32; 3]>; fn upgrade(obj: ArcMut, material: O); fn meow() -> bool; fn meow_repeatedly(times: usize) where Self: Send, { async move { for _ in 0..times { self.meow().await; } } } } pub struct DomesticatedCat { name: String, exp: usize, } impl DomesticatedCat { pub fn new(name: String) -> Self { Self { name, exp: 0 } } } // Instead of implementing the [`TaskProcessor`] trait, we implement the trait generated by `#[coordinator]` instead, this way we don't have to enum dispatch ourself. The trait name will always be `[Name]Processor` impl CatFamilyProcessor for DomesticatedCat where I: InteractableObject + RefUnwindSafe + Send + Sync + 'static, { async fn locate_object(&mut self, obj: ArcMut) -> Option<[f32; 3]> { // ... } async fn upgrade(&mut self, obj: ArcMut, material: O) { // ... } async fn meow(&mut self) -> bool { // ... } } pub struct RobotCat /* ... */; // Another CatProcessor impl async fn main() -> Result<(), Box> { // The `CatFamily` struct is generated automatically, with `From` impl so you can convert any `Coordinator` into it using `into()` let cat_family: CatFamily = Coordinator::new(3).into(); cat_family .add_worker("Maple", DomesticatedCat::new("Maple".to_owned())) .await; cat_family .add_worker("Oktocat", RobotCat::new("Oktocat".to_owned())) .await; for _ in 0..10 { // Cloning here is only cloning the `Arc` under the hood, not creating a new `Coordinator` let cat_family = cat_family.clone(); tokio::spawn(async move { let balls = Arc::new(AssertUnwindSafe(Mutex::new(Ball { size: [2.2, 3.3, 4.4], weight: 5.9, bounciness: 10.2, }))); let crystal = Crystal { size: [5.2, 3.1, 6.4], weight: 15.9, purity: 0.9, }; let (pos, cat) = cat_family .any() .locate_object(balls.clone()) .await? .join() .await?; let Some(pos) = pos else { println!("Cat {} cannot find the object!", cat); return Ok(()); }; println!("Cat {} has found the ball at {:?}", cat, pos); let (_, cat) = cat_family .prefer(&cat) .upgrade(balls.clone(), crystal) .await? .join() .await?; println!( "Cat {} has upgrade ball to {}", cat, balls.0.lock().await.weight ); // We don't care about the result here so no need to join cat_family.require(&cat).meow_repeatedly(3).await?; return Ok::<(), Box>(()); }); } Ok(()) } ``` ## Contributing We welcome any contributions to this project. Before submitting a pull request, please open an issue to check if someone is already working on the feature. ## License This project is licensed under the MIT License.