| Crates.io | beekeeper |
| lib.rs | beekeeper |
| version | 0.3.0 |
| created_at | 2025-01-24 01:20:50.266443+00 |
| updated_at | 2025-03-17 21:39:00.563693+00 |
| description | A full-featured worker pool library for parallelizing tasks |
| homepage | |
| repository | https://github.com/jdidion/beekeeper |
| max_upload_size | |
| id | 1528884 |
| size | 1,506,765 |
A Rust library that provides a thread pool implementation designed to execute the same operation in parallel on any number of inputs (this is sometimes called a "worker pool").
Worker trait.Builder is used to configure and create a worker pool called a Hive.Hive is generic over
Queen which creates Worker instancesTaskQueues, which provides the global and worker thread-local queues for managing tasksTaskQueues implementations are available:
crossbeam channel to send tasks from the Hive to worker threads
local-batch feature is enabled, local batch queues are implemented using crossbeam_queue::ArrayQueueHive creates a Worker instance for each thread in the pool.Hive's methods are called to submit a task (or batch of tasks), the
Outcome(s) may be returned as an Iterator, sent to an output channel, or stored in the
Hive for later retrieval.Hive may create Workers may in one of three ways:
Workers may be stateful, i.e., Worker::apply() takes a &mut selfQueen is not stateful, QueenMut may be (i.e., it's create() method takes a &mut self)panics in worker threads (and thus, within
Worker implementations), the Hive does automatically restart any threads that panic.Hive may be suspended and resumed at any time. When a Hive is suspended, worker threads do no work and tasks accumulate in the input queue.map and try_map functions enable simple parallel processing of a single batch of tasks.Worker implementations are provided in the stock module. Most notable are those in the call submodule, which provide different ways of wrapping callables, i.e., closures and function pointers.affinity: worker threads may be pinned to CPU cores to minimize the overhead of context-switching.local-batch (>=0.3.0): worker threads take batches of tasks from the global input queue and add them to a local queue, which may alleviate thread contention, especially when there are many short-lived tasks.
Weighted to enable balancing unevenly sized tasks between worker threads.retry: Tasks that fail due to transient errors (e.g., temporarily unavailable resources)
may be retried a set number of times, with an optional, exponentially increasing delay
between retries.channel implementations are supported:
To parallelize a task, you'll need two things:
Worker implementation. Your options are:
use the necessary traits (e.g., use beekeeper::bee::prelude::*)struct for your workerWorker trait on your struct and define the apply method with the logic of your taskDefault for your workerClone for your workerQueen or QueenMut traitHive to execute your tasks. Your options are:
Hive manually using a Builder (see Examples 2 and 3 below)
OpenBuilder is the most general builderOpenBuilder::new() creates an empty OpenBuilderBuilder::default() creates a OpenBuilder
with the global default settings (which may be changed using the functions in the
hive module, e.g., beekeeper::hive::set_num_threads_default(4)).Queen and TaskQueues types:
Worker that implements Default, use with_worker_default::<MyWorker>()Worker that implements Clone, use with_worker(MyWorker::new())Queen, use with_queen_default::<MyQueen>() if it implements Default, otherwise use with_queen(MyQueen::new())QueenMut, use with_queen_mut_default::<MyQueenMut>() if it implements Default, otherwise use with_queen_mut(MyQueenMut::new())with_channel_queues or with_workstealing_queues to configure the TaskQueues implementationbuild() methods to build the HiveBuilder::num_threads() must be set to a non-zero value, otherwise the built Hive will not start any worker threads until you call the Hive::grow() method.Once you've created a Hive, use its methods to submit tasks for processing. There are
four groups of methods available:
apply: submits a single taskswarm: submits a batch of tasks given a collection of inputs with known size (i.e., anything
that implements IntoIterator<IntoIter: ExactSizeIterator>)map: submits an arbitrary batch of tasks (i.e., anything that implements IntoIterator)scan: Similar to map, but you also provide 1) an initial value for a state variable, and
Worker, and also has access to (and may modify) the state variable.There are multiple methods in each group that differ by how the task results (called
Outcomes) are handled:
Iterator over the Outcomes in the same order as the inputs
(or, in the case of apply, a single Outcome)_unordered suffix instead return an unordered iterator, which may be
more performant than the ordered iterator_send suffix accept a channel Sender and send the Outcomes to that
channel as they are completed (see this note)._store suffix store the Outcomes in the Hive; these may be
retrieved later using the Hive::take_stored() method, using
one of the remove* methods (which requires
OutcomeStore to be in scope), or by
using one of the methods on Husk after shutting down the Hive using
Hive::try_into_husk().When using one of the _send methods, you should ensure that the Sender is dropped after
all tasks have been submitted, otherwise calling recv() on (or iterating over) the Receiver
will block indefinitely.
Within a Hive, each submitted task is assinged a unique ID. The _send and _store
methods return the task_ids of the submitted tasks, which can be used to retrieve them later
(e.g., using Hive::remove()).
After submitting tasks, you may use the Hive::join() method to wait
for all tasks to complete. Using join is strongly recommended when using one of the _store
methods, otherwise you'll need to continually poll the Hive to check for completed tasks.
When you are finished with a Hive, you may simply drop it (either explicitly, or by letting
it go out of scope) - the worker threads will be terminated automatically. If you used the
_store methods and would like to have access to the stored task Outcomes after the Hive
has been dropped, and/or you'd like to re-use the Hive's Queen or other configuration
parameters, you can use the Hive::try_into_husk() method to extract
the relevant data from the Hive into a Husk object.
pub fn double(i: usize) -> usize {
i * 2
}
// parallelize the computation of `double` on a range of numbers
// over 4 threads, and sum the results
const N: usize = 100;
let sum_doubles: usize = beekeeper::util::map(4, 0..N, double)
.into_iter()
.sum();
println!("Sum of {} doubles: {}", N, sum_doubles);
use beekeeper::bee::stock::{Thunk, ThunkWorker};
use beekeeper::hive::prelude::*;
// create a hive to process `Thunk`s - no-argument closures with the
// same return type (`i32`)
let hive = Builder::new()
.num_threads(4)
.thread_name("thunk_hive")
.build_with_default::<ThunkWorker<i32>>()
.unwrap();
// return results to your own channel...
let (tx, rx) = outcome_channel();
let _ = hive.swarm_send(
(0..10).map(|i: i32| Thunk::from(move || i * i)),
tx
);
assert_eq!(285, rx.into_outputs().take(10).sum());
// return results as an iterator...
let total = hive
.swarm_unordered((0..10).map(|i: i32| Thunk::from(move || i * -i)))
.into_outputs()
.sum();
assert_eq!(-285, total);
WorkerSuppose you'd like to parallelize executions of a line-delimited process, such as cat.
This requires defining a struct to hold the process stdin and stdout, and
implementing the Worker trait for this struct. We'll also use a custom Queen to keep track
of the Child processes and make sure they're terminated properly.
use beekeeper::bee::prelude::*;
use beekeeper::hive::prelude::*;
use std::io::prelude::*;
use std::io::{self, BufReader};
use std::process::{Child, ChildStdin, ChildStdout, Command, ExitStatus, Stdio};
#[derive(Debug)]
struct CatWorker {
stdin: ChildStdin,
stdout: BufReader<ChildStdout>,
}
impl CatWorker {
fn new(stdin: ChildStdin, stdout: ChildStdout) -> Self {
Self {
stdin,
stdout: BufReader::new(stdout),
}
}
fn write_char(&mut self, c: u8) -> io::Result<String> {
self.stdin.write_all(&[c])?;
self.stdin.write_all(b"\n")?;
self.stdin.flush()?;
let mut s = String::new();
self.stdout.read_line(&mut s)?;
s.pop(); // exclude newline
Ok(s)
}
}
impl Worker for CatWorker {
type Input = u8;
type Output = String;
type Error = io::Error;
fn apply(
&mut self,
input: Self::Input,
_: &Context<Self::Input>
) -> WorkerResult<Self> {
self.write_char(input).map_err(|error| {
ApplyError::Fatal { input: Some(input), error }
})
}
}
#[derive(Default)]
struct CatQueen {
children: Vec<Child>,
}
impl CatQueen {
fn wait_for_all(&mut self) -> Vec<io::Result<ExitStatus>> {
self.children
.drain(..)
.map(|mut child| child.wait())
.collect()
}
}
impl QueenMut for CatQueen {
type Kind = CatWorker;
fn create(&mut self) -> Self::Kind {
let mut child = Command::new("cat")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()
.unwrap();
let stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
self.children.push(child);
CatWorker::new(stdin, stdout)
}
}
impl Drop for CatQueen {
fn drop(&mut self) {
self.wait_for_all().into_iter().for_each(|result| {
match result {
Ok(status) if status.success() => (),
Ok(status) => {
eprintln!("Child process failed: {}", status);
}
Err(e) => {
eprintln!("Error waiting for child process: {}", e);
}
}
})
}
}
// build the Hive
let hive = Builder::new()
.num_threads(4)
.build_default_mut::<CatQueen>()
.unwrap();
// prepare inputs
let inputs = (0..8).map(|i| 97 + i);
// execute tasks and collect outputs
let output = hive
.swarm(inputs)
.into_outputs()
.fold(String::new(), |mut a, b| {
a.push_str(&b);
a
})
.into_bytes();
// verify the output - note that `swarm` ensures the outputs are in
// the same order as the inputs
assert_eq!(output, b"abcdefgh");
// shutdown the hive, use the Queen to wait on child processes, and
// report errors
let (mut queen, _outcomes) = hive.try_into_husk().unwrap().into_parts();
let (wait_ok, wait_err): (Vec<_>, Vec<_>) = queen
.into_inner()
.wait_for_all()
.into_iter()
.partition(Result::is_ok);
if !wait_err.is_empty() {
panic!(
"Error(s) occurred while waiting for child processes: {:?}",
wait_err
);
}
let exec_err_codes: Vec<_> = wait_ok
.into_iter()
.map(Result::unwrap)
.filter_map(|status| (!status.success()).then(|| status.code()))
.flatten()
.collect();
if !exec_err_codes.is_empty() {
panic!(
"Child process(es) failed with exit codes: {:?}",
exec_err_codes
);
}
Early versions of this crate (< 0.3) had some fatal design flaws that needed to be corrected with breaking changes (see the changelog).
As of version 0.3, the beekeeper API is generally considered to be stable, but additional real-world battle-testing is desired before promoting the version to 1.0.0. If you identify bugs or have suggestions for improvement, please open an issue.
You may choose either of the following licenses:
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.
Beekeeper began as a fork of workerpool.