use std::{env, error::Error as StdError, sync::Arc, time::Duration}; use async_trait::async_trait; use general_mq::{ connection::{EventHandler as ConnHandler, GmqConnection, Status as ConnStatus}, queue::{ EventHandler as QueueHandler, GmqQueue, Message, MessageHandler, Status as QueueStatus, }, AmqpConnection, AmqpConnectionOptions, AmqpQueue, AmqpQueueOptions, MqttConnection, MqttConnectionOptions, MqttQueue, MqttQueueOptions, }; struct TestConnHandler; struct TestQueueHandler { pub name: String, } const TEST_BROADCAST: bool = true; const TEST_RELIABLE: bool = true; #[async_trait] impl ConnHandler for TestConnHandler { async fn on_error( &self, handler_id: String, _conn: Arc, err: Box, ) { println!("handler_id: {}, ev: {}", handler_id.as_str(), err); } async fn on_status( &self, handler_id: String, _conn: Arc, status: ConnStatus, ) { let status = match status { ConnStatus::Closing => "status: closing", ConnStatus::Closed => "status: closed", ConnStatus::Connecting => "status: connecting", ConnStatus::Connected => "status: connected", ConnStatus::Disconnected => "status: disconnected", }; println!("handler_id: {}, status: {}", handler_id.as_str(), status); } } #[async_trait] impl QueueHandler for TestQueueHandler { async fn on_error(&self, queue: Arc, err: Box) { println!( "name: {}, queue: {}, error: {}", self.name.as_str(), queue.name(), err ); } async fn on_status(&self, queue: Arc, status: QueueStatus) { let status = match status { QueueStatus::Closing => "status: closing", QueueStatus::Closed => "status: closed", QueueStatus::Connecting => "status: connecting", QueueStatus::Connected => "status: connected", QueueStatus::Disconnected => "status: disconnected", }; println!( "name: {}, queue: {}, status: {}", self.name.as_str(), queue.name(), status ); } } #[async_trait] impl MessageHandler for TestQueueHandler { async fn on_message(&self, _queue: Arc, msg: Box) { match String::from_utf8(msg.payload().to_vec()) { Err(e) => { println!( "name {} received bin {:?} with parse error: {}", self.name.as_str(), msg.payload(), e ); match msg.ack().await { Err(e) => println!( "name {} ack {:?} error: {}", self.name.as_str(), msg.payload(), e ), Ok(()) => { println!("name {} ack {:?} ok", self.name.as_str(), msg.payload()) } } } Ok(payload) => { println!("name {} received {}", self.name.as_str(), payload.as_str()); match msg.ack().await { Err(e) => println!( "name {} ack {} error: {}", self.name.as_str(), payload.as_str(), e ), Ok(()) => println!("name {} ack {} ok", self.name.as_str(), payload.as_str()), } } }; } } #[tokio::main] async fn main() { let run_mqtt = env::var("RUN_MQTT").is_ok(); if run_mqtt { println!("Run MQTT"); test_mqtt().await; } else { println!("Run AMQP"); test_amqp().await; } } async fn test_amqp() { let opts = AmqpConnectionOptions::default(); let mut conn = match AmqpConnection::new(opts) { Err(e) => { println!("new AmqpConnection error: {}", e); return; } Ok(conn) => conn, }; conn.add_handler(Arc::new(TestConnHandler {})); conn.add_handler(Arc::new(TestConnHandler {})); let opts = AmqpQueueOptions { name: "test".to_string(), is_recv: false, reliable: TEST_RELIABLE, broadcast: TEST_BROADCAST, reconnect_millis: 1000, prefetch: 10, ..Default::default() }; let mut send_queue = match AmqpQueue::new(opts, &conn) { Err(e) => { println!("new AmqpQueue error: {}", e); return; } Ok(queue) => queue, }; send_queue.set_handler(Arc::new(TestQueueHandler { name: "send".to_string(), })); if let Err(e) = send_queue.connect() { println!("connect send queue error: {}", e); return; } let opts = AmqpQueueOptions { name: "test".to_string(), is_recv: true, reliable: TEST_RELIABLE, broadcast: TEST_BROADCAST, reconnect_millis: 1000, prefetch: 10, ..Default::default() }; let mut recv_queue1 = match AmqpQueue::new(opts.clone(), &conn) { Err(e) => { println!("new AmqpQueue error: {}", e); return; } Ok(queue) => queue, }; let handler = Arc::new(TestQueueHandler { name: "recv1".to_string(), }); recv_queue1.set_handler(handler.clone()); recv_queue1.set_msg_handler(handler); if let Err(e) = recv_queue1.connect() { println!("connect recv1 queue error: {}", e); return; } let mut recv_queue2 = match AmqpQueue::new(opts, &conn) { Err(e) => { println!("new AmqpQueue error: {}", e); return; } Ok(queue) => queue, }; let handler = Arc::new(TestQueueHandler { name: "recv2".to_string(), }); recv_queue2.set_handler(handler.clone()); recv_queue2.set_msg_handler(handler); if let Err(e) = recv_queue2.connect() { println!("connect recv2 queue error: {}", e); return; } loop { if let Err(e) = conn.connect() { println!("connect error: {}", e); return; } let mut count = 10; while count > 0 { tokio::time::sleep(Duration::from_secs(2)).await; let str = format!("count {}", count); match send_queue.send_msg(str.as_bytes().to_vec()).await { Err(e) => println!("send {} error: {}", str, e), Ok(()) => println!("send {} ok", str), } count = count - 1; } if let Err(e) = conn.close().await { println!("close error: {}", e); return; } tokio::time::sleep(Duration::from_secs(5)).await; } } async fn test_mqtt() { let opts = MqttConnectionOptions::default(); let mut conn = match MqttConnection::new(opts) { Err(e) => { println!("new MqttConnection error: {}", e); return; } Ok(conn) => conn, }; conn.add_handler(Arc::new(TestConnHandler {})); conn.add_handler(Arc::new(TestConnHandler {})); let opts = MqttConnectionOptions::default(); let mut conn2 = match MqttConnection::new(opts) { Err(e) => { println!("new MqttConnection error: {}", e); return; } Ok(conn) => conn, }; conn2.add_handler(Arc::new(TestConnHandler {})); let opts = MqttQueueOptions { name: "test".to_string(), is_recv: false, reliable: TEST_RELIABLE, broadcast: TEST_BROADCAST, reconnect_millis: 1000, shared_prefix: Some("$share/general-mq/".to_string()), ..Default::default() }; let mut send_queue = match MqttQueue::new(opts, &conn) { Err(e) => { println!("new MqttQueue error: {}", e); return; } Ok(queue) => queue, }; send_queue.set_handler(Arc::new(TestQueueHandler { name: "send".to_string(), })); if let Err(e) = send_queue.connect() { println!("connect send queue error: {}", e); return; } let opts = MqttQueueOptions { name: "test".to_string(), is_recv: true, reliable: TEST_RELIABLE, broadcast: TEST_BROADCAST, reconnect_millis: 1000, shared_prefix: Some("$share/general-mq/".to_string()), ..Default::default() }; let mut recv_queue1 = match MqttQueue::new(opts.clone(), &conn) { Err(e) => { println!("new MqttQueue error: {}", e); return; } Ok(queue) => queue, }; let handler = Arc::new(TestQueueHandler { name: "recv1".to_string(), }); recv_queue1.set_handler(handler.clone()); recv_queue1.set_msg_handler(handler); if let Err(e) = recv_queue1.connect() { println!("connect recv1 queue error: {}", e); return; } let mut recv_queue2 = match MqttQueue::new(opts, &conn2) { Err(e) => { println!("new MqttQueue error: {}", e); return; } Ok(queue) => queue, }; let handler = Arc::new(TestQueueHandler { name: "recv2".to_string(), }); recv_queue2.set_handler(handler.clone()); recv_queue2.set_msg_handler(handler); if let Err(e) = recv_queue2.connect() { println!("connect recv2 queue error: {}", e); return; } loop { if let Err(e) = conn.connect() { println!("connect error: {}", e); return; } if let Err(e) = conn2.connect() { println!("connect 2 error: {}", e); return; } let mut count = 10; while count > 0 { tokio::time::sleep(Duration::from_secs(2)).await; let str = format!("count {}", count); match send_queue.send_msg(str.as_bytes().to_vec()).await { Err(e) => println!("send {} error: {}", str, e), Ok(()) => println!("send {} ok", str), } count = count - 1; } if let Err(e) = conn.close().await { println!("close error: {}", e); return; } if let Err(e) = conn2.close().await { println!("close 2 error: {}", e); return; } tokio::time::sleep(Duration::from_secs(5)).await; } }