Crates.io | bunbun-worker |
lib.rs | bunbun-worker |
version | 0.2.2 |
source | src |
created_at | 2024-11-13 19:05:24.857537 |
updated_at | 2024-11-25 05:39:34.729837 |
description | An rpc/non-rpc AMQP worker library |
homepage | |
repository | https://git.4o1x5.dev/4o1x5/bunbun-worker |
max_upload_size | |
id | 1446938 |
size | 96,220 |
bunbun-worker
is a bare-bone simple multithreaded worker & client library.
The creator of this crate recommends against using this library in production as it's a rather a proof of concept. If you intend to use RPC, I highly recommend using a crate like tonic.
Remote procedure call, as it's name says is a message that can be sent to a remote microservice to be processed and the result to be returned. In bunbun-worker
it's implemented by the following example:
sequenceDiagram
ServiceA->>+ServiceB: Hey, could you do this job for me?
Note right of ServiceB: ServiceB does the job
ServiceB->>+ServiceA: Sure, here is the data result
In bunbun-worker
regular jobs are called non-rpc jobs, indicating that the response is not awaited.
Get directly from crates.io
[dependencies]
bunbun-worker = { version = "0.2.0" }
or get it directly from source
[dependencies]
bunbun-worker = { git = "https://git.4o1x5.dev/4o1x5/bunbun-worker", branch = "master" }
This example uses DTO as a way to transfer data between services.
Add futures
via cargo add futures
// server.rs
#[derive(Clone, Debug)]
pub struct State {
pub something: String,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct EmailJob {
send_to: String,
contents: String,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct EmailJobResult {
contents: String,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
pub enum EmailJobResultError {
Errored,
}
impl RPCTask for EmailJob {
type ErroredResult = EmailJobResultError;
type Result = EmailJobResult;
type State = State;
fn run(
self,
state: Arc<Self::State>,
) -> futures::prelude::future::BoxFuture<'static, Result<Self::Result, Self::ErroredResult>>
{
async move {
tracing::info!("Sent email to {}", self.send_to);
tokio::time::sleep(Duration::from_secs(2)).await;
return Ok(EmailJobResult {
contents: self.contents.clone(),
});
}
.boxed()
}
}
#[tokio::main]
async fn main() {
let mut listener = Worker::new(
env::var("AMQP_SERVER_URL").unwrap(),
WorkerConfig::default(),
)
.await;
listener
.add_rpc_consumer::<EmailJob>(
Arc::new(State {
something: "test".into(),
}),
ListenerConfig::default("emailjob").set_prefetch_count(100),
);
listener.start_all_listeners().await;
}
// client.rs
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct EmailJob {
send_to: String,
contents: String,
}
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
pub struct EmailJobResult {
contents: String,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
pub enum EmailJobResultError {
Errored,
}
// Implement the client side trait, so the caller knows what the return types are
impl RPCClientTask for EmailJob {
type ErroredResult = EmailJobResultError;
type Result = EmailJobResult;
}
#[tokio::main]
async fn main() {
let client = Client::new(env::var("AMQP_SERVER_URL").unwrap().as_str())
.await
.unwrap();
let result = client
.rpc_call::<EmailJob>(
EmailJob {
send_to: "someone".into(),
contents: "something".into(),
},
BasicCallOptions::default("emailjob").timeout(Duration::from_secs(3)),
)
.await
.unwrap();
println!("{:?}",result);
}
In this crate message versioning is done by including v1.0.0
or such on the end of the queue name, instead of including it in the headers of a message. This reduces the amount of redelivered messages.
The following example will send a job to a queue named emailjob-v1.0.0
.
let result = client
.rpc_call::<EmailJob>(
EmailJob {
send_to: "someone".into(),
contents: "something".into(),
},
BasicCallOptions::default("emailjob")
.timeout(Duration::from_secs(3))
.message_version("v1.0.0")
)
.await
.unwrap();
Since the code is hosted on a private git instance (as of right now) any bugs shall be discussed in 4o1x5's project room.
Licensed under GNU AFFERO GENERAL PUBLIC LICENSE
Currently this library does not accept any contributors, as it's hosted on a private git server.
This package was made with the help of-