3 unstable releases
new 0.2.1 | Jan 29, 2025 |
---|---|
0.2.0 | Jan 28, 2025 |
0.1.0 | Jan 24, 2025 |
#255 in Rust patterns
97 downloads per month
1.5MB
6K
SLoC
Beekeeper
A Rust library that provides a thread pool implementation designed to execute the same operation in parallel on any number of inputs (this is sometimes called a "worker pool").
Overview
- Operations are defined by implementing the
Worker
trait. - A
Builder
is used to configure and create a worker pool called aHive
. - The
Hive
creates aWorker
instance for each thread in the pool. - Each thread in the pool continually:
- Depending on which of
Hive
's methods are called to submit a task (or batch of tasks), theOutcome
(s) may be returned as anIterator
, sent to an outputchannel
, or stored in theHive
for later retrieval. - A
Hive
may createWorker
s may in one of three ways: - Both
Worker
s andQueen
s may be stateful, i.e.,Worker::apply()
andQueen::create()
both take&mut self
. - Although it is strongly recommended to avoid
panic
s in worker threads (and thus, withinWorker
implementations), theHive
does automatically restart any threads that panic. - A
Hive
may besuspend
ed andresume
d at any time. When aHive
is suspended, worker threads do no work and tasks accumulate in the inputchannel
. - Several utility functions are provided in the util module. Notably, the
map
andtry_map
functions enable simple parallel processing of a single batch of tasks. - Several useful
Worker
implementations are provided in the stock module. Most notable are those in thecall
submodule, which provide different ways of wrappingcallable
s, i.e., closures and function pointers. - The following optional features are provided via feature flags:
affinity
: worker threads may be pinned to CPU cores to minimize the overhead of context-switching.retry
: Tasks that fail due to transient errors (e.g., temporarily unavailable resources) may be retried a set number of times, with an optional, exponentially increasing delay between retries.- Several alternative
channel
implementations are supported:
Usage
To parallelize a task, you'll need two things:
- A
Worker
implementation. Your options are:- Use an existing implementation from the stock module (see Example 2 below)
- Implement your own (See Example 3 below)
use
the necessary traits (e.g.,use beekeeper::bee::prelude::*
)- Define a
struct
for your worker - Implement the
Worker
trait on your struct and define theapply
method with the logic of your task - Do at least one of the following:
- Implement
Default
for your worker - Implement
Clone
for your worker - Create a custom worker fatory that implements the
Queen
trait
- Implement
- A
Hive
to execute your tasks. Your options are:- Use one of the convenience methods in the util module (see Example 1 below)
- Create a
Hive
manually usingBuilder
(see Examples 2 and 3 below)Builder::new()
creates an emptyBuilder
Builder::default()
creates aBuilder
with the global default settings (which may be changed using the functions in thehive
module, e.g.,beekeeper::hive::set_num_threads_default(4)
).- Use one of the
build_*
methods to build theHive
:- If you have a
Worker
that implementsDefault
, usebuild_with_default::<MyWorker>()
- If you have a
Worker
that implementsClone
, usebuild_with(MyWorker::new())
- If you have a custom
Queen
, usebuild_default::<MyQueen>()
if it implementsDefault
, otherwise usebuild(MyQueen::new())
- If you have a
- Note that
Builder::num_threads()
must be set to a non-zero value, otherwise the builtHive
will not start any worker threads until you call theHive::grow()
method.
Once you've created a Hive
, use its methods to submit tasks for processing. There are
four groups of methods available:
apply
: submits a single taskswarm
: submits a batch of tasks given a collection of inputs with known size (i.e., anything that implementsIntoIterator<IntoIter: ExactSizeIterator>
)map
: submits an arbitrary batch of tasks (i.e., anything that implementsIntoIterator
)scan
: Similar tomap
, but you also provide 1) an initial value for a state variable, and 2) a function that transforms each item in the input iterator into the input type required by theWorker
, and also has access to (and may modify) the state variable.
There are multiple methods in each group that differ by how the task results (called
Outcome
s) are handled:
- The unsuffixed methods return an
Iterator
over theOutcome
s in the same order as the inputs (or, in the case ofapply
, a singleOutcome
) - The methods with the
_unordered
suffix instead return an unordered iterator, which may be more performant than the ordered iterator - The methods with the
_send
suffix accept a channelSender
and send theOutcome
s to that channel as they are completed - The methods with the
_store
suffix store theOutcome
s in theHive
; these may be retrieved later using theHive::take_stored()
method, using one of theremove*
methods (which requiresOutcomeStore
to be in scope), or by using one of the methods onHusk
after shutting down theHive
usingHive::try_into_husk()
.
When using one of the _send
methods, you should ensure that the Sender
is dropped after
all tasks have been submitted, otherwise calling recv()
on (or iterating over) the Receiver
will block indefinitely.
Within a Hive
, each submitted task is assinged a unique ID. The _send
and _store
methods return the task_ids of the submitted tasks, which can be used to retrieve them later
(e.g., using Hive::remove()
).
After submitting tasks, you may use the Hive::join()
method to wait
for all tasks to complete. Using join
is strongly recommended when using one of the _store
methods, otherwise you'll need to continually poll the Hive
to check for completed tasks.
When you are finished with a Hive
, you may simply drop it (either explicitly, or by letting
it go out of scope) - the worker threads will be terminated automatically. If you used the
_store
methods and would like to have access to the stored task Outcome
s after the Hive
has been dropped, and/or you'd like to re-use the Hive's
Queen
or other configuration
parameters, you can use the Hive::try_into_husk()
method to extract
the relevant data from the Hive
into a Husk
object.
Examples
1. Parallelize an existing function
pub fn double(i: usize) -> usize {
i * 2
}
// parallelize the computation of `double` on a range of numbers
// over 4 threads, and sum the results
const N: usize = 100;
let sum_doubles: usize = beekeeper::util::map(4, 0..N, double)
.into_iter()
.sum();
println!("Sum of {} doubles: {}", N, sum_doubles);
2. Parallelize arbitrary tasks with the same output type
use beekeeper::bee::stock::{Thunk, ThunkWorker};
use beekeeper::hive::prelude::*;
// create a hive to process `Thunk`s - no-argument closures with the
// same return type (`i32`)
let hive = Builder::new()
.num_threads(4)
.thread_name("thunk_hive")
.build_with_default::<ThunkWorker<i32>>()
.unwrap();
// return results to your own channel...
let (tx, rx) = outcome_channel();
let _ = hive.swarm_send(
(0..10).map(|i: i32| Thunk::of(move || i * i)),
tx
);
assert_eq!(285, rx.into_outputs().take(10).sum());
// return results as an iterator...
let total = hive
.swarm_unordered((0..10).map(|i: i32| Thunk::of(move || i * -i)))
.into_outputs()
.sum();
assert_eq!(-285, total);
3. Parallelize a complex task using a stateful Worker
Suppose you'd like to parallelize executions of a line-delimited process, such as cat
.
This requires defining a struct
to hold the process stdin
and stdout
, and
implementing the Worker
trait for this struct. We'll also use a custom Queen
to keep track
of the Child
processes and make sure they're terminated properly.
use beekeeper::bee::prelude::*;
use beekeeper::hive::prelude::*;
use std::io::prelude::*;
use std::io::{self, BufReader};
use std::process::{Child, ChildStdin, ChildStdout, Command, ExitStatus, Stdio};
#[derive(Debug)]
struct CatWorker {
stdin: ChildStdin,
stdout: BufReader<ChildStdout>,
}
impl CatWorker {
fn new(stdin: ChildStdin, stdout: ChildStdout) -> Self {
Self {
stdin,
stdout: BufReader::new(stdout),
}
}
fn write_char(&mut self, c: u8) -> io::Result<String> {
self.stdin.write_all(&[c])?;
self.stdin.write_all(b"\n")?;
self.stdin.flush()?;
let mut s = String::new();
self.stdout.read_line(&mut s)?;
s.pop(); // exclude newline
Ok(s)
}
}
impl Worker for CatWorker {
type Input = u8;
type Output = String;
type Error = io::Error;
fn apply(
&mut self,
input: Self::Input,
_: &Context
) -> WorkerResult<Self> {
self.write_char(input).map_err(|error| {
ApplyError::Fatal { input: Some(input), error }
})
}
}
#[derive(Default)]
struct CatQueen {
children: Vec<Child>,
}
impl CatQueen {
fn wait_for_all(&mut self) -> Vec<io::Result<ExitStatus>> {
self.children
.drain(..)
.map(|mut child| child.wait())
.collect()
}
}
impl Queen for CatQueen {
type Kind = CatWorker;
fn create(&mut self) -> Self::Kind {
let mut child = Command::new("cat")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()
.unwrap();
let stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
self.children.push(child);
CatWorker::new(stdin, stdout)
}
}
impl Drop for CatQueen {
fn drop(&mut self) {
self.wait_for_all().into_iter().for_each(|result| {
match result {
Ok(status) if status.success() => (),
Ok(status) => {
eprintln!("Child process failed: {}", status);
}
Err(e) => {
eprintln!("Error waiting for child process: {}", e);
}
}
})
}
}
// build the Hive
let hive = Builder::new()
.num_threads(4)
.build_default::<CatQueen>()
.unwrap();
// prepare inputs
let inputs = (0..8).map(|i| 97 + i);
// execute tasks and collect outputs
let output = hive
.swarm(inputs)
.into_outputs()
.fold(String::new(), |mut a, b| {
a.push_str(&b);
a
})
.into_bytes();
// verify the output - note that `swarm` ensures the outputs are in
// the same order as the inputs
assert_eq!(output, b"abcdefgh");
// shutdown the hive, use the Queen to wait on child processes, and
// report errors
let (mut queen, _outcomes) = hive.try_into_husk().unwrap().into_parts();
let (wait_ok, wait_err): (Vec<_>, Vec<_>) =
queen.wait_for_all().into_iter().partition(Result::is_ok);
if !wait_err.is_empty() {
panic!(
"Error(s) occurred while waiting for child processes: {:?}",
wait_err
);
}
let exec_err_codes: Vec<_> = wait_ok
.into_iter()
.map(Result::unwrap)
.filter_map(|status| (!status.success()).then(|| status.code()))
.flatten()
.collect();
if !exec_err_codes.is_empty() {
panic!(
"Child process(es) failed with exit codes: {:?}",
exec_err_codes
);
}
Status
The beekeeper
API is generally considered to be stable, but additional real-world battle-testing
is desired before promoting the version to 1.0.0
. If you identify bugs or have suggestions for
improvement, please open an issue.
Similar libraries
License
You may choose either of the following licenses:
- Apache License, Version 2.0, (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
- MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)
Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.
Credits
Beekeeper began as a fork of workerpool.
Dependencies
~1–6.5MB
~40K SLoC