Crates.io | rsmq_async |
lib.rs | rsmq_async |
version | 12.0.0 |
source | src |
created_at | 2020-02-01 14:26:01.829709 |
updated_at | 2024-07-28 20:45:35.7151 |
description | Async RSMQ port to rust. RSMQ is a simple redis queue system that works in any redis v2.4+. It contains the same methods as the original one in https://github.com/smrchy/rsmq |
homepage | https://crates.io/crates/rsmq_async |
repository | https://github.com/Couragium/rsmq-async-rs |
max_upload_size | |
id | 203925 |
size | 86,298 |
RSMQ port to async rust. RSMQ is a simple redis queue system that works in any redis v2.6+. It contains the same methods as the original one in https://github.com/smrchy/rsmq
This crate uses async in the implementation. If you want to use it in your sync code you can use tokio/async_std "block_on" method. Async was used in order to simplify the code and allow 1-to-1 port oft he JS code.
use rsmq_async::{Rsmq, RsmqError, RsmqConnection};
let mut rsmq = Rsmq::new(Default::default()).await?;
let message = rsmq.receive_message::<String>("myqueue", None).await?;
if let Some(message) = message {
rsmq.delete_message("myqueue", &message.id).await?;
}
Main object documentation are in: Rsmq and PooledRsmq and they both implement the trait RsmqConnection where you can see all the RSMQ methods. Make sure you always import the trait RsmqConnection.
Check https://crates.io/crates/rsmq_async
use rsmq_async::{Rsmq, RsmqConnection};
async fn it_works() {
let mut rsmq = Rsmq::new(Default::default())
.await
.expect("connection failed");
rsmq.create_queue("myqueue", None, None, None)
.await
.expect("failed to create queue");
rsmq.send_message("myqueue", "testmessage", None)
.await
.expect("failed to send message");
let message = rsmq
.receive_message::<String>("myqueue", None)
.await
.expect("cannot receive message");
if let Some(message) = message {
rsmq.delete_message("myqueue", &message.id).await;
}
}
When initializing RSMQ you can enable the realtime PUBLISH for
new messages. On every new message that gets sent to RSQM via sendMessage
a
Redis PUBLISH will be issued to {rsmq.ns}:rt:{qname}
. So, you can subscribe
to it using redis-rs library directly.
Besides the PUBLISH when a new message is sent to RSMQ nothing else will happen.
Your app could use the Redis SUBSCRIBE command to be notified of new messages
and issue a receiveMessage
then. However make sure not to listen with multiple
workers for new messages with SUBSCRIBE to prevent multiple simultaneous
receiveMessage
calls.
If you enable the sync
feature, you can import a RsmqSync
object with sync
versions of the methods.
By default this library keeps compatibility with the JS counterpart. If you require
sub-second precision or are sending many messages very close together and require to
keep track of them with more precision than one second, you can enable the feature
break-js-comp
like this on your Cargo.toml
rsmq_async = { version = "11", features = [ "break-js-comp" ] }
If you want to implement "at least one delivery" guarantee, you need to receive the messages using "receive_message" and then, once the message is successfully processed, delete it with "delete_message".
If you want to use a connection pool, just use PooledRsmq instad of Rsmq. It implements the RsmqConnection trait as the normal Rsmq.
If you want to accept any of both implementation, just accept the trait RsmqConnection
There are 3 functions that take generic types:
pop_message
and receive_message
: Where the type for the received message is
RsmqMessage<E>
where E: TryFrom<RedisBytes, Error = Vec<u8>>
. So, If you have custom type, you can implement the trait
TryFrom<RedisBytes>
for YourCustomType
and use it like: rsmq.receive_message::<YourCustomType>("myqueue", None)
.
Implementations are provided for String
and Vec<u8>
.send_message
where the message to send needs to implement Into<RedisBytes> + Send
. So you will
need to implement the trait for your type. You can check the implementations for the type RedisBytes and see how
we did it. Implementations are provided for String
, &str
and Vec<u8>
.All this is because strings in Rust are very convenient to use for json messages, so always returning a Vecrsmq.receive_message::<Vec<u8>>("myqueue", None)
and transform it later to your type. (Or just implement
the TryFrom
impl TryFrom<RedisBytes> for String {
// We sacrifice the ability of recovering the original error for the ability of having the
// original data. If you know how to conserver both, let me know!
type Error = Vec<u8>; // Always set Error as Vec<u8>;
fn try_from(bytes: RedisBytes) -> Result<Self, Self::Error> {
String::from_utf8(bytes.0).map_err(|e| e.into_bytes())
}
}