Crates.io | async-mq |
lib.rs | async-mq |
version | 1.1.0 |
source | src |
created_at | 2019-12-18 21:41:27.409044 |
updated_at | 2020-03-25 07:47:27.845886 |
description | Zero-cost async-await abstraction of lapin AMQP client crate |
homepage | |
repository | https://github.com/keithnoguchi/async-mq |
max_upload_size | |
id | 190399 |
size | 139,744 |
Zero-cost async-await abstraction of lapin AMQP client crate
Client
and Connection
structsConsumer
and ConsumerBuilder
structsProducer
and ProducerBuilder
structsMessage
struct, MessagePeek
and MessageProcess
async traitsCurrently, mqctl.rs demonstrates the RabbitMQ RPC pattern through the Rust 1.39 async-await feature. It uses FlatBuffers for the message encoding/decoding.
Here is the tokio
's Threaded scheduler example, as in mqctl.rs:
fn tokio_threaded(cfg: crate::cfg::Config) -> Result<(), Box<dyn std::error::Error>> {
let mut rt = tokio::runtime::Builder::new()
.threaded_scheduler()
.enable_time()
.build()?;
let client = Client::new();
rt.block_on(async move {
// One connection for multiple producers.
let conn = client.connect(&cfg.uri).await?;
let mut builder = conn.producer_builder();
builder.exchange(&cfg.exchange).queue(&cfg.queue);
for _ in 0..cfg.producers {
let builder = builder.clone();
tokio::spawn(async move {
match builder.build().await {
Err(e) => eprintln!("{}", e),
Ok(p) => {
let mut p = ASCIIGenerator(p);
if let Err(err) = p.run().await {
eprintln!("{}", err);
}
}
}
});
}
// One connection for multiple consumers.
let conn = client.connect(&cfg.uri).await?;
let mut builder = conn.consumer_builder();
builder.exchange(&cfg.exchange).queue(&cfg.queue);
for _ in 0..cfg.consumers {
let builder = builder.clone();
tokio::spawn(async move {
match builder.build().await {
Err(err) => eprintln!("{}", err),
Ok(c) => {
let mut c = EchoConsumer(c);
if let Err(err) = c.run().await {
eprintln!("{}", err);
}
}
}
});
}
// idle loop.
loop {
tokio::time::delay_for(std::time::Duration::from_millis(1000)).await;
}
})
}
Here is the sample ASCIIGererator
producer:
struct ASCIIGenerator(Producer);
impl ASCIIGenerator {
async fn run(&mut self) -> Result<(), Error> {
let mut builder = FlatBufferBuilder::new();
loop {
// Generate ASCII character FlatBuffer messages
// and print the received message to stderr.
for data in { b'!'..=b'~' } {
let req = Self::make_buf(&mut builder, vec![data]);
let resp = self.0.rpc(req).await?;
Self::print_buf(resp);
}
}
}
fn make_buf(builder: &mut FlatBufferBuilder, data: Vec<u8>) -> Vec<u8> {
let data = builder.create_string(&String::from_utf8(data).unwrap());
let mut mb = crate::msg::MessageBuilder::new(builder);
mb.add_msg(data);
let msg = mb.finish();
builder.finish(msg, None);
let req = builder.finished_data().to_vec();
builder.reset();
req
}
fn print_buf(resp: Vec<u8>) {
if resp.is_empty() {
return;
}
let msg = crate::msg::get_root_as_message(&resp);
if let Some(data) = msg.msg() {
eprint!("{}", data);
}
}
}
And the sample EchoConsumer
consumer:
struct EchoConsumer(Consumer);
impl EchoConsumer {
async fn run(&mut self) -> Result<(), Error> {
while let Some(msg) = self.0.next().await {
match msg {
// Echo back the message.
Ok(req) => self.0.response(&req, req.data()).await?,
Err(err) => return Err(err),
}
}
Ok(())
}
}
Here is the output of the make run, which is an alias of cargo run --example
as in Makefile.
It dumps printable ascii characters to the stderr.
$ make run
Compiling async-mq v0.3.0 (/home/kei/git/async-mq)
Finished dev [unoptimized + debuginfo] target(s) in 1.63s
Running `target/debug/examples/async-mq`
!!!!!!!!!!!"!!"!!!!!!!!!"!!""!!!!!!!"!""""#"""#""""#""#"""""#""#"#"##""#"$##"$#######$######$$###$##$$$$$%$$%#$#$$$$$$%$$$
$$$%%$$$%$%%%$%%&%&%%$%%%%%%%&%%%%&%&%&%&&%&'&%&%%&&&&&&&&''&'&&&&&'&&'''(&&&'''('''''('&'''''(''(&'(''('()'()('(()((()(((
()'()()(()((()*'((*()*())))*)**))))))))(+)*))*))(+*)*+*)*+*+*****+*,****+)**+*+**+,)*+,*,+++,+,++++-,+++*++++,+*-,,++,-,,-
,+,,-,,-,,-,,+,-,,,-,..+--.,---.------,.,,-..---,.--/.-//..-......-.//.--./...-//...0.///0////././/00////100.//0/.00//0.01
/010/00020110/0000100/101121100/100121112111120131112012223122221122231223022323343131322332223423534244332442344313234333
3343465444535445543345434444555425475556665455466555666447554568365766556766755676766579787876767458875677666667768788789:
978788987577898677887688798899:99;989:8:869988899:99998:98:77;:::::;7:9;9<99;9;::9:;;;9;:<::;8:8:9=:;;<;8::<<::;<:;:=<9:<9
9<<;<=;;><<;<;;;;=;=:<;=;>=;<;==:<<?=<=<><>><<<=<=:=;=><><<@=<==>?>;?===>>;?<?===>===?>?>>A=>?<?=?=?@>>><@>>@@?B=?>>?@>>>@
?@>?>>@>@??=@AA?CAA??@?@A?AA?@?@@A??@??A>?B@>AB@D@B@@BA@BABBA@B@C@AAEAA@CAB@C?@A@A?BAACBCCAACBBADCBDABBBAEB@BAFBDCDBCDDBBC
BDCADB@CCEBCCBCCFGEEAACDDCBCFEECCBEDDCDDDCDECCDFHGDBFGDDFEFBEFEDIHDDEDDCEGEGDEEECEFCDHEEGFCFIHFFDEEDEEFEFFHJEEFFDFGFGIFHGD
GGJIGEFFEHGFIHEHJHFGGGFKGIHFGFGJGKGEHFGGFGIJIKHIJIHHHFIGHHGKGHIHGFHHLGLHGHJKJIJJILGIHJHIKGLJMJHIKHIIKILHIHIKIIMKJMHJKNJHKJ
LLJKMIKJIIJJKILNNIMLLJJJIILLKJMIKKKJMMOKKLMKMJNLJKMNJOJOLJMLKNKJLLNKLNKLONPNOKPKLMLKPNLMMMNMKLOKLMOMLLOOMPLMPLLNMONNNMQLMP
MNPLMNQOOQPNMPNOMNMMNPQQOOOMMPRNPQROONNNQONOQPRONNQONPPRNPRQNSQPORQPOPOSORSQQSQPORPQOOSOPORORRTQPSRSTPSQQURTPQPPRTRPPTQRPQ
SRPSTQSSPUTSRQVRTSSUTQQQRQTQTTUUUTRSQQRSSVURQWRUSSTRTVUVXUVTRRRURSRTSTVVRRVWUUSSTTUSVWTSSVWSUTVSWVTXUTVVUSWSWWUTUSYWUXUTWT
UWVXWTUTVWXYTVXUXXUWUXTVVYVTVZXUVWXXUYWVUXYZYZUWVXYYYW[UWWWVVUYYVXWYVZVZWZX[YV[XXWX\VYZWXWZ[YVWXXWZZWZZ[ZW[Y\YXY\X[XXZ\X]W
XZY[[WZ\Y]X[\[Y[[YXZ]YYY\Z][Y^X[YZY]Z\X]\Z[ZZ^\_[\YY^Z\Z[\]^\Z[YZZ\[^]Y\][^[]Z[Z_]`\_\[[Z]]_\][[Z\^_]^][\^\[[^\`_a^^[]^\\]
^^_]\[\^]\`\`]__]`\_]ba`_\_^`_]_`c_^a\ba]]a^`^`a]_]^]^]`^]`a``^_dc^__^bab`_^a`]ab^^aaa^___^bab_`ed__``cbab_`^cb_`b`e_ab_cb
bfc`a`_c``aadcb`cd_c`a`cdcagacdeafaabecdba`c``bddabdabedhbabdedadedbabcfbcbacgeeecbffbceiebdchdbeegeccfgfcffcccfbdgdfbfdjc
cdefefcichhdeddedegfddggfdgckddghefggjggdieeeifledeehehgehhghehehfhgfiffkfhjfjhiifimiggiikffgfffiigjfhkiiegjhglgjgjnhjgggj
jgkjmkjijlkhhkhlghghihjhofkihhhkhkkkmilniglijlpjlliikhilhikiimjiillmlhmommjmijjjjqknjnljmmjkjjimnjkmlinnnnpmkknnkrkjklomkn
ookojkoknkoklljoqnllllskloplpokollpkpplmpolmnplrmommmplmlmmqplqqmqmpnstomqpnqmmqnnpnnqmnmnqnmrnnonrrotonrnrurqqoronpoqonro
onrsoooppsoussspptpsrppvooppsoprporssqqpqtpvtqsttqttptpwqptqqpqqsrurqqqsruqwuuurqrvurrttuuvsurrrsvqqxrsvtxrrqrvswrsvssuvuv
sswwwrtsyvsxswvrrttwtytsussttvwwtxtxuuswuzwxsuuvxywutxtx
^C
You can check the queue status with rabbitmqctl list_queues
as below:
$ sudo rabbitmqctl list_queues --vhost your_vhost
Timeout: 60.0 seconds ...
Listing queues for vhost mx ...
name messages
amq.gen-3CNgzxmjJGoTIjAcy2zhHQ 0
amq.gen-6kHFOKiqnJIltqeH-1I4WQ 0
amq.gen-kWjXOMz0MDX9Rwo6F8sPCA 0
amq.gen-tFNZuCMpdn6so9WnNLAS4w 1
amq.gen-ScKfpco30LHj1feWIIsFXg 1
amq.gen-7uGgXCiiExxrAXZEcrbypg 0
amq.gen-vkFA83xnTHHhR6c_zUG34A 0
amq.gen-48HpeKkKhmQLAa4Q1beQvg 0
amq.gen-FAYzlhiy9liKuMEUVUm2Uw 0
amq.gen-de6sYdZX8cT8yYkb_Y-mPw 0
hello 23
amq.gen-gu_TDgPgWcestqlWNtFSoA 0
amq.gen-advkCsBJKod22vexRSBV6A 0
amq.gen-T6jLPqqOKsL9jk04CFJwtA 0
amq.gen--wRWW5hI-rpdds9goMAYdg 1
amq.gen-f9EYMuzwQoCAhNyIg6SiUQ 0
amq.gen-DxmcpUxHYGOxCD9Q7QM1xg 1
amq.gen-EeeJAmHFOT3GPIMhfXDi8Q 0
amq.gen-zhqOTQag0rM7MDU17pCdXA 0
amq.gen-vyX6_lAv2Pnmcm1a_tBUKA 0
amq.gen-enXP4BCXZhB0tg4C1an4sw 0
amq.gen-nIpKSKokUzU_pCoGTiSBCQ 0
amq.gen-k1p4udDFoA0xhqSXIpPo-Q 0
amq.gen-XrNQZ0cqHgSgUZK_CP0g6w 0
amq.gen-9WZJ4Jw02Dbhhl7sJIQKAQ 0
amq.gen-1TMw_E8g09Xt9UgCoMz-ig 0
amq.gen-rVUkIh-85ims0IiabvF7GA 1
amq.gen-9NeGc4C9qmfX-PLjHkXVDA 0
amq.gen-swJrMKZmNnLhtI0Djz--ag 0
amq.gen-5rEAFqYpG4cp8lBEDG6_gQ 1
amq.gen-ZDNy2Ggt4Dqbvj6cnS-c8A 0
amq.gen-D8id7SF143eN-k7tmHratw 1
amq.gen-AiKQaNiz73V9du8EtgKfMg 0
Happy Hacking!