#worker-thread #outcome #hive #task #thread-pool #input #iterator

beekeeper

A full-featured worker pool library for parallelizing tasks

3 unstable releases

new 0.2.1 Jan 29, 2025
0.2.0 Jan 28, 2025
0.1.0 Jan 24, 2025

#255 in Rust patterns

Download history 97/week @ 2025-01-21

97 downloads per month

MIT/Apache

1.5MB
6K SLoC

logo

CI codecov crates.io docs.rs

Beekeeper

A Rust library that provides a thread pool implementation designed to execute the same operation in parallel on any number of inputs (this is sometimes called a "worker pool").

Overview

  • Operations are defined by implementing the Worker trait.
  • A Builder is used to configure and create a worker pool called a Hive.
  • The Hive creates a Worker instance for each thread in the pool.
  • Each thread in the pool continually:
    • Recieves a task from an input channel,
    • Calls its Worker's apply method on the input, and
    • Produces an Outcome.
  • Depending on which of Hive's methods are called to submit a task (or batch of tasks), the Outcome(s) may be returned as an Iterator, sent to an output channel, or stored in the Hive for later retrieval.
  • A Hive may create Workers may in one of three ways:
    • Call the default() function on a Worker type that implements Default
    • Clone an instance of a Worker that implements Clone
    • Call the create() method on a worker factory that implements the Queen trait.
  • Both Workers and Queens may be stateful, i.e., Worker::apply() and Queen::create() both take &mut self.
  • Although it is strongly recommended to avoid panics in worker threads (and thus, within Worker implementations), the Hive does automatically restart any threads that panic.
  • A Hive may be suspended and resumed at any time. When a Hive is suspended, worker threads do no work and tasks accumulate in the input channel.
  • Several utility functions are provided in the util module. Notably, the map and try_map functions enable simple parallel processing of a single batch of tasks.
  • Several useful Worker implementations are provided in the stock module. Most notable are those in the call submodule, which provide different ways of wrapping callables, i.e., closures and function pointers.
  • The following optional features are provided via feature flags:
    • affinity: worker threads may be pinned to CPU cores to minimize the overhead of context-switching.
    • retry: Tasks that fail due to transient errors (e.g., temporarily unavailable resources) may be retried a set number of times, with an optional, exponentially increasing delay between retries.
    • Several alternative channel implementations are supported:

Usage

To parallelize a task, you'll need two things:

  1. A Worker implementation. Your options are:
    • Use an existing implementation from the stock module (see Example 2 below)
    • Implement your own (See Example 3 below)
      • use the necessary traits (e.g., use beekeeper::bee::prelude::*)
      • Define a struct for your worker
      • Implement the Worker trait on your struct and define the apply method with the logic of your task
      • Do at least one of the following:
        • Implement Default for your worker
        • Implement Clone for your worker
        • Create a custom worker fatory that implements the Queen trait
  2. A Hive to execute your tasks. Your options are:

Once you've created a Hive, use its methods to submit tasks for processing. There are four groups of methods available:

  • apply: submits a single task
  • swarm: submits a batch of tasks given a collection of inputs with known size (i.e., anything that implements IntoIterator<IntoIter: ExactSizeIterator>)
  • map: submits an arbitrary batch of tasks (i.e., anything that implements IntoIterator)
  • scan: Similar to map, but you also provide 1) an initial value for a state variable, and 2) a function that transforms each item in the input iterator into the input type required by the Worker, and also has access to (and may modify) the state variable.

There are multiple methods in each group that differ by how the task results (called Outcomes) are handled:

  • The unsuffixed methods return an Iterator over the Outcomes in the same order as the inputs (or, in the case of apply, a single Outcome)
  • The methods with the _unordered suffix instead return an unordered iterator, which may be more performant than the ordered iterator
  • The methods with the _send suffix accept a channel Sender and send the Outcomes to that channel as they are completed
  • The methods with the _store suffix store the Outcomes in the Hive; these may be retrieved later using the Hive::take_stored() method, using one of the remove* methods (which requires OutcomeStore to be in scope), or by using one of the methods on Husk after shutting down the Hive using Hive::try_into_husk().

When using one of the _send methods, you should ensure that the Sender is dropped after all tasks have been submitted, otherwise calling recv() on (or iterating over) the Receiver will block indefinitely.

Within a Hive, each submitted task is assinged a unique ID. The _send and _store methods return the task_ids of the submitted tasks, which can be used to retrieve them later (e.g., using Hive::remove()).

After submitting tasks, you may use the Hive::join() method to wait for all tasks to complete. Using join is strongly recommended when using one of the _store methods, otherwise you'll need to continually poll the Hive to check for completed tasks.

When you are finished with a Hive, you may simply drop it (either explicitly, or by letting it go out of scope) - the worker threads will be terminated automatically. If you used the _store methods and would like to have access to the stored task Outcomes after the Hive has been dropped, and/or you'd like to re-use the Hive's Queen or other configuration parameters, you can use the Hive::try_into_husk() method to extract the relevant data from the Hive into a Husk object.

Examples

1. Parallelize an existing function

pub fn double(i: usize) -> usize {
  i * 2
}

// parallelize the computation of `double` on a range of numbers
// over 4 threads, and sum the results
const N: usize = 100;
let sum_doubles: usize = beekeeper::util::map(4, 0..N, double)
    .into_iter()
    .sum();
println!("Sum of {} doubles: {}", N, sum_doubles);

2. Parallelize arbitrary tasks with the same output type

use beekeeper::bee::stock::{Thunk, ThunkWorker};
use beekeeper::hive::prelude::*;

// create a hive to process `Thunk`s - no-argument closures with the
// same return type (`i32`)
let hive = Builder::new()
    .num_threads(4)
    .thread_name("thunk_hive")
    .build_with_default::<ThunkWorker<i32>>()
    .unwrap();

// return results to your own channel...
let (tx, rx) = outcome_channel();
let _ = hive.swarm_send(
    (0..10).map(|i: i32| Thunk::of(move || i * i)),
    tx
);
assert_eq!(285, rx.into_outputs().take(10).sum());

// return results as an iterator...
let total = hive
    .swarm_unordered((0..10).map(|i: i32| Thunk::of(move || i * -i)))
    .into_outputs()
    .sum();
assert_eq!(-285, total);

3. Parallelize a complex task using a stateful Worker

Suppose you'd like to parallelize executions of a line-delimited process, such as cat. This requires defining a struct to hold the process stdin and stdout, and implementing the Worker trait for this struct. We'll also use a custom Queen to keep track of the Child processes and make sure they're terminated properly.

use beekeeper::bee::prelude::*;
use beekeeper::hive::prelude::*;
use std::io::prelude::*;
use std::io::{self, BufReader};
use std::process::{Child, ChildStdin, ChildStdout, Command, ExitStatus, Stdio};

#[derive(Debug)]
struct CatWorker {
    stdin: ChildStdin,
    stdout: BufReader<ChildStdout>,
}

impl CatWorker {
    fn new(stdin: ChildStdin, stdout: ChildStdout) -> Self {
        Self {
            stdin,
            stdout: BufReader::new(stdout),
        }
    }

    fn write_char(&mut self, c: u8) -> io::Result<String> {
        self.stdin.write_all(&[c])?;
        self.stdin.write_all(b"\n")?;
        self.stdin.flush()?;
        let mut s = String::new();
        self.stdout.read_line(&mut s)?;
        s.pop(); // exclude newline
        Ok(s)
    }
}

impl Worker for CatWorker {
    type Input = u8;
    type Output = String;
    type Error = io::Error;

    fn apply(
        &mut self,
        input: Self::Input,
        _: &Context
    ) -> WorkerResult<Self> {
        self.write_char(input).map_err(|error| {
            ApplyError::Fatal { input: Some(input), error }
        })
    }
}

#[derive(Default)]
struct CatQueen {
    children: Vec<Child>,
}

impl CatQueen {
    fn wait_for_all(&mut self) -> Vec<io::Result<ExitStatus>> {
        self.children
            .drain(..)
            .map(|mut child| child.wait())
            .collect()
    }
}

impl Queen for CatQueen {
    type Kind = CatWorker;

    fn create(&mut self) -> Self::Kind {
        let mut child = Command::new("cat")
            .stdin(Stdio::piped())
            .stdout(Stdio::piped())
            .stderr(Stdio::inherit())
            .spawn()
            .unwrap();
        let stdin = child.stdin.take().unwrap();
        let stdout = child.stdout.take().unwrap();
        self.children.push(child);
        CatWorker::new(stdin, stdout)
    }
}

impl Drop for CatQueen {
    fn drop(&mut self) {
        self.wait_for_all().into_iter().for_each(|result| {
            match result {
                Ok(status) if status.success() => (),
                Ok(status) => {
                    eprintln!("Child process failed: {}", status);
                }
                Err(e) => {
                    eprintln!("Error waiting for child process: {}", e);
                }
            }
        })
    }
}

// build the Hive
let hive = Builder::new()
    .num_threads(4)
    .build_default::<CatQueen>()
    .unwrap();

// prepare inputs
let inputs = (0..8).map(|i| 97 + i);

// execute tasks and collect outputs
let output = hive
    .swarm(inputs)
    .into_outputs()
    .fold(String::new(), |mut a, b| {
        a.push_str(&b);
        a
    })
    .into_bytes();

// verify the output - note that `swarm` ensures the outputs are in
// the same order as the inputs
assert_eq!(output, b"abcdefgh");

// shutdown the hive, use the Queen to wait on child processes, and
// report errors
let (mut queen, _outcomes) = hive.try_into_husk().unwrap().into_parts();
let (wait_ok, wait_err): (Vec<_>, Vec<_>) =
    queen.wait_for_all().into_iter().partition(Result::is_ok);
if !wait_err.is_empty() {
    panic!(
        "Error(s) occurred while waiting for child processes: {:?}",
        wait_err
    );
}
let exec_err_codes: Vec<_> = wait_ok
    .into_iter()
    .map(Result::unwrap)
    .filter_map(|status| (!status.success()).then(|| status.code()))
    .flatten()
    .collect();
if !exec_err_codes.is_empty() {
    panic!(
        "Child process(es) failed with exit codes: {:?}",
        exec_err_codes
    );
}

Status

The beekeeper API is generally considered to be stable, but additional real-world battle-testing is desired before promoting the version to 1.0.0. If you identify bugs or have suggestions for improvement, please open an issue.

Similar libraries

License

You may choose either of the following licenses:

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.

Credits

Beekeeper began as a fork of workerpool.

The logo was generated using DeepAI.

Dependencies

~1–6.5MB
~40K SLoC