
created_at2024-07-21 09:17:47.616548+00
updated_at2024-07-21 09:27:42.395125+00
descriptionCoordinator is a library to load balance tasks into task runners
Tachibana Yui (tachibanayui)




License Cargo Documentation


Coordinator is a simple library to load balance tasks into task runners that run asynchronously. Each worker added into the coordinator will have a queue to process work unit (or task). Each worker will only process one task at a given time.

You can select which worker to process a task by using the following apis:

  • TaskPrefs::Any (my_coordinator.any() for #[coordinator] macro): This will tell the coordinator to queue with the most available worker
  • TaskPrefs::Preferred(worker_id) (my_coordinator.prefer(worker_id) for #[coordinator] macro): This will tell the coordinator to queue with worker with id worker_id if it's not currently full, otherwise queue the task with any worker.
  • TaskPrefs::Required(worker_id) (my_coordinator.require(worker_id) for #[coordinator] macro): This will tell the coordinator to queue with worker with id worker_id.

The coordinator will try to find the most available worker using the average task completion time and the number of task in queue of a worker.

Table of Contents


This crate is available on Please visit the link to find the latest version and instructions for installation.


For full examples, check out playground/examples

// Create a worker that sleeps for 1 sec and return a number that double the input
struct Doubler(String);
impl TaskProcessor<i32> for Doubler {
    type Output = i32;
    async fn do_work(&mut self, task: i32) -> Self::Output {
        println!("Task {} computed {}", self.0, task * 2);
        task * 2

// the queue thershold of a single queue, if the number of task item in queue exceeded the thershold
// any `TaskPref::Preferred(x)` will be processed by a different task processor.
let queue_len = 3;
let b = Coordinator::new(queue_len);

// Add `Doubler` as task processor
b.add_worker("Doubler 1st", Doubler("Doubler 1st".to_string()))

// Add a closure as a task processor. Any `FnMut` closure can be used as task processor!
b.add_worker("Doubler 2nd", |x| async move { x * 2 }).await;

// Schedule a task for processing. The task will be polled to completion in the worker future
// and not the current future. The `join_handle` can be used to retrieve the returned value
let join_handle =, TaskPrefs::Any).await.unwrap();
println!("Task scheduled!");

// Do other works.....

// Wait for the task result
let rs = join_handle.join().await.unwrap().0;
println!("Task result: {}", rs);

If your task processors can process different types of tasks (eg: CalculatorProcessor can process both add and subtract tasks), you can use the #[coordinator] attribute macro to avoid needing to define your own input and output enums and manually dispatch them when implementing TaskProcessor

pub trait InteractableObject {
    fn size(&self) -> [f32; 3];
    fn weight(&self) -> f32;
    fn set_weight(&mut self, val: f32);

pub struct Ball /* ... */; // implements [`InteractableObject`]
pub struct Crystal /* ... */; // implements [`InteractableObject`]

// Type alias for not having to type out this long type every time we use it
type ArcMut<T> = Arc<AssertUnwindSafe<Mutex<T>>>;

pub trait CatFamily<I>
    I: InteractableObject + RefUnwindSafe,
    fn locate_object(obj: ArcMut<I>) -> Option<[f32; 3]>;
    fn upgrade<O: InteractableObject>(obj: ArcMut<I>, material: O);
    fn meow() -> bool;
    fn meow_repeatedly(times: usize)
        Self: Send,
        async move {
            for _ in 0..times {

pub struct DomesticatedCat {
    name: String,
    exp: usize,

impl DomesticatedCat {
    pub fn new(name: String) -> Self {
        Self { name, exp: 0 }

// Instead of implementing the [`TaskProcessor`]  trait, we implement the trait generated by `#[coordinator]` instead, this way we don't have to enum dispatch ourself. The trait name will always be `[Name]Processor`
impl<I> CatFamilyProcessor<I> for DomesticatedCat
    I: InteractableObject + RefUnwindSafe + Send + Sync + 'static,
    async fn locate_object(&mut self, obj: ArcMut<I>) -> Option<[f32; 3]> {
        // ...

    async fn upgrade<O: InteractableObject>(&mut self, obj: ArcMut<I>, material: O) {
        // ...

    async fn meow(&mut self) -> bool {
        // ...

pub struct RobotCat /* ... */; // Another CatProcessor impl

async fn main() -> Result<(), Box<dyn Error>> {
    // The `CatFamily` struct is generated automatically, with `From<Coordinator>` impl so you can convert any `Coordinator` into it using `into()`
    let cat_family: CatFamily<Ball, Crystal, &str> = Coordinator::new(3).into();
        .add_worker("Maple", DomesticatedCat::new("Maple".to_owned()))

        .add_worker("Oktocat", RobotCat::new("Oktocat".to_owned()))

    for _ in 0..10 {
        // Cloning here is only cloning the `Arc` under the hood, not creating a new `Coordinator`
        let cat_family = cat_family.clone();
        tokio::spawn(async move {
            let balls = Arc::new(AssertUnwindSafe(Mutex::new(Ball {
                size: [2.2, 3.3, 4.4],
                weight: 5.9,
                bounciness: 10.2,

            let crystal = Crystal {
                size: [5.2, 3.1, 6.4],
                weight: 15.9,
                purity: 0.9,

            let (pos, cat) = cat_family

            let Some(pos) = pos else {
                println!("Cat {} cannot find the object!", cat);
                return Ok(());

            println!("Cat {} has found the ball at {:?}", cat, pos);

            let (_, cat) = cat_family
                .upgrade(balls.clone(), crystal)

                "Cat {} has upgrade ball to {}",

            // We don't care about the result here so no need to join
            return Ok::<(), Box<dyn Error + Send + Sync + 'static>>(());



We welcome any contributions to this project. Before submitting a pull request, please open an issue to check if someone is already working on the feature.


This project is licensed under the MIT License.

Commit count: 0

cargo fmt