Crates.io | workerpool |
lib.rs | workerpool |
version | 1.2.1 |
source | src |
created_at | 2017-11-23 23:27:40.205707 |
updated_at | 2024-01-13 15:07:33.357997 |
description | A thread pool for running a number of jobs on a fixed set of stateful worker threads. |
homepage | https://github.com/lorepozo/workerpool |
repository | https://github.com/lorepozo/workerpool |
max_upload_size | |
id | 40355 |
size | 68,756 |
A worker threadpool used to execute a number of jobs atop stateful workers in parallel. It spawns a specified number of worker threads and replenishes the pool if any worker threads panic.
A single Worker
runs in its own thread, to be implemented according to the trait:
pub trait Worker : Default {
type Input: Send;
type Output: Send;
fn execute(&mut self, Self::Input) -> Self::Output;
}
[dependencies]
workerpool = "1.2"
To use crossbeam's
channels
instead of std::sync::mpsc
,
enable the crossbeam
feature:
[dependencies]
workerpool = { version = "1.2", features = ["crossbeam"] }
This crate provides Pool<W> where W: Worker
. With a pool, there are four
primary functions of interest:
Pool::<MyWorker>::new(n_threads)
creates a new pool for a particular Worker
.pool.execute(inp)
non-blocking executes the worker and ignores the return value.pool.execute_to(tx, inp)
non-blocking executes the worker and sends return value to
the given Sender.pool.join()
blocking waits for all tasks (from execute
and
execute_to
) to complete.A worker is provided in workerpool::thunk
, a stateless ThunkWorker<T>
.
It executes on inputs of Thunk<T>
, effectively argumentless functions that
are Sized + Send
. These thunks are creates by wrapping functions which
return T
with Thunk::of
.
use workerpool::Pool;
use workerpool::thunk::{Thunk, ThunkWorker};
use std::sync::mpsc::channel;
fn main() {
let n_workers = 4;
let n_jobs = 8;
let pool = Pool::<ThunkWorker<i32>>::new(n_workers);
let (tx, rx) = channel();
for i in 0..n_jobs {
pool.execute_to(tx.clone(), Thunk::of(move || i * i));
}
assert_eq!(140, rx.iter().take(n_jobs as usize).sum());
}
For stateful workers, you have to implement Worker
yourself.
Suppose there's a line-delimited process, such as cat
or tr
, which you'd
like running on many threads for use in a pool-like manner. You may create
and use a worker, with maintained state of the stdin/stdout for the process,
as follows:
use workerpool::{Worker, Pool};
use std::process::{Command, ChildStdin, ChildStdout, Stdio};
use std::io::prelude::*;
use std::io::{self, BufReader};
use std::sync::mpsc::channel;
struct LineDelimitedProcess {
stdin: ChildStdin,
stdout: BufReader<ChildStdout>,
}
impl Default for LineDelimitedProcess {
fn default() -> Self {
let child = Command::new("cat")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()
.unwrap();
Self {
stdin: child.stdin.unwrap(),
stdout: BufReader::new(child.stdout.unwrap()),
}
}
}
impl Worker for LineDelimitedProcess {
type Input = Box<[u8]>;
type Output = io::Result<String>;
fn execute(&mut self, inp: Self::Input) -> Self::Output {
self.stdin.write_all(&*inp)?;
self.stdin.write_all(b"\n")?;
self.stdin.flush()?;
let mut s = String::new();
self.stdout.read_line(&mut s)?;
s.pop(); // exclude newline
Ok(s)
}
}
fn main() {
let n_workers = 4;
let n_jobs = 8;
let pool = Pool::<LineDelimitedProcess>::new(n_workers);
let (tx, rx) = channel();
for i in 0..n_jobs {
let inp = Box::new([97 + i]);
pool.execute_to(tx.clone(), inp);
}
// output is a permutation of "abcdefgh"
let mut output = rx.iter()
.take(n_jobs as usize)
.fold(String::new(), |mut a, b| {
a.push_str(&b.unwrap());
a
})
.into_bytes();
output.sort();
assert_eq!(output, b"abcdefgh");
}
This work is derivative of threadpool.
Licensed under either of
at your option.
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.