#task-runner #task-queue #task #load-balancing

coordinator

Coordinator is a library to load balance tasks into task runners

2 releases

0.1.1 Jul 21, 2024
0.1.0 Jul 21, 2024

#920 in Algorithms

MIT license

31KB
581 lines

Coordinator

License Cargo Documentation

Description

Coordinator is a simple library to load balance tasks into task runners that run asynchronously. Each worker added into the coordinator will have a queue to process work unit (or task). Each worker will only process one task at a given time.

You can select which worker to process a task by using the following apis:

  • TaskPrefs::Any (my_coordinator.any() for #[coordinator] macro): This will tell the coordinator to queue with the most available worker
  • TaskPrefs::Preferred(worker_id) (my_coordinator.prefer(worker_id) for #[coordinator] macro): This will tell the coordinator to queue with worker with id worker_id if it's not currently full, otherwise queue the task with any worker.
  • TaskPrefs::Required(worker_id) (my_coordinator.require(worker_id) for #[coordinator] macro): This will tell the coordinator to queue with worker with id worker_id.

The coordinator will try to find the most available worker using the average task completion time and the number of task in queue of a worker.

Table of Contents

Installation

This crate is available on crates.io. Please visit the link to find the latest version and instructions for installation.

Usage

For full examples, check out playground/examples

// Create a worker that sleeps for 1 sec and return a number that double the input
struct Doubler(String);
impl TaskProcessor<i32> for Doubler {
    type Output = i32;
    async fn do_work(&mut self, task: i32) -> Self::Output {
        tokio::time::sleep(Duration::from_secs(1)).await;
        println!("Task {} computed {}", self.0, task * 2);
        task * 2
    }
}

// the queue thershold of a single queue, if the number of task item in queue exceeded the thershold
// any `TaskPref::Preferred(x)` will be processed by a different task processor.
let queue_len = 3;
let b = Coordinator::new(queue_len);

// Add `Doubler` as task processor
b.add_worker("Doubler 1st", Doubler("Doubler 1st".to_string()))
    .await;

// Add a closure as a task processor. Any `FnMut` closure can be used as task processor!
b.add_worker("Doubler 2nd", |x| async move { x * 2 }).await;

// Schedule a task for processing. The task will be polled to completion in the worker future
// and not the current future. The `join_handle` can be used to retrieve the returned value
let join_handle = b.run(2, TaskPrefs::Any).await.unwrap();
println!("Task scheduled!");

// Do other works.....

// Wait for the task result
let rs = join_handle.join().await.unwrap().0;
println!("Task result: {}", rs);

If your task processors can process different types of tasks (eg: CalculatorProcessor can process both add and subtract tasks), you can use the #[coordinator] attribute macro to avoid needing to define your own input and output enums and manually dispatch them when implementing TaskProcessor

pub trait InteractableObject {
    fn size(&self) -> [f32; 3];
    fn weight(&self) -> f32;
    fn set_weight(&mut self, val: f32);
}

pub struct Ball /* ... */; // implements [`InteractableObject`]
pub struct Crystal /* ... */; // implements [`InteractableObject`]

// Type alias for not having to type out this long type every time we use it
type ArcMut<T> = Arc<AssertUnwindSafe<Mutex<T>>>;

#[coordinator]
pub trait CatFamily<I>
where
    I: InteractableObject + RefUnwindSafe,
{
    fn locate_object(obj: ArcMut<I>) -> Option<[f32; 3]>;
    fn upgrade<O: InteractableObject>(obj: ArcMut<I>, material: O);
    fn meow() -> bool;
    fn meow_repeatedly(times: usize)
    where
        Self: Send,
    {
        async move {
            for _ in 0..times {
                self.meow().await;
            }
        }
    }
}

pub struct DomesticatedCat {
    name: String,
    exp: usize,
}

impl DomesticatedCat {
    pub fn new(name: String) -> Self {
        Self { name, exp: 0 }
    }
}

// Instead of implementing the [`TaskProcessor`]  trait, we implement the trait generated by `#[coordinator]` instead, this way we don't have to enum dispatch ourself. The trait name will always be `[Name]Processor`
impl<I> CatFamilyProcessor<I> for DomesticatedCat
where
    I: InteractableObject + RefUnwindSafe + Send + Sync + 'static,
{
    async fn locate_object(&mut self, obj: ArcMut<I>) -> Option<[f32; 3]> {
        // ...
    }

    async fn upgrade<O: InteractableObject>(&mut self, obj: ArcMut<I>, material: O) {
        // ...
    }

    async fn meow(&mut self) -> bool {
        // ...
    }
}

pub struct RobotCat /* ... */; // Another CatProcessor impl

async fn main() -> Result<(), Box<dyn Error>> {
    // The `CatFamily` struct is generated automatically, with `From<Coordinator>` impl so you can convert any `Coordinator` into it using `into()`
    let cat_family: CatFamily<Ball, Crystal, &str> = Coordinator::new(3).into();
    cat_family
        .add_worker("Maple", DomesticatedCat::new("Maple".to_owned()))
        .await;

    cat_family
        .add_worker("Oktocat", RobotCat::new("Oktocat".to_owned()))
        .await;

    for _ in 0..10 {
        // Cloning here is only cloning the `Arc` under the hood, not creating a new `Coordinator`
        let cat_family = cat_family.clone();
        tokio::spawn(async move {
            let balls = Arc::new(AssertUnwindSafe(Mutex::new(Ball {
                size: [2.2, 3.3, 4.4],
                weight: 5.9,
                bounciness: 10.2,
            })));

            let crystal = Crystal {
                size: [5.2, 3.1, 6.4],
                weight: 15.9,
                purity: 0.9,
            };

            let (pos, cat) = cat_family
                .any()
                .locate_object(balls.clone())
                .await?
                .join()
                .await?;

            let Some(pos) = pos else {
                println!("Cat {} cannot find the object!", cat);
                return Ok(());
            };

            println!("Cat {} has found the ball at {:?}", cat, pos);

            let (_, cat) = cat_family
                .prefer(&cat)
                .upgrade(balls.clone(), crystal)
                .await?
                .join()
                .await?;

            println!(
                "Cat {} has upgrade ball to {}",
                cat,
                balls.0.lock().await.weight
            );

            // We don't care about the result here so no need to join
            cat_family.require(&cat).meow_repeatedly(3).await?;
            return Ok::<(), Box<dyn Error + Send + Sync + 'static>>(());
        });
    }

    Ok(())
}

Contributing

We welcome any contributions to this project. Before submitting a pull request, please open an issue to check if someone is already working on the feature.

License

This project is licensed under the MIT License.

Dependencies

~5–11MB
~121K SLoC