use futures_lite::StreamExt; use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties}; use tracing::info; fn main() { if std::env::var("RUST_LOG").is_err() { std::env::set_var("RUST_LOG", "info"); } tracing_subscriber::fmt::init(); let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into()); async_global_executor::block_on(async { let conn = Connection::connect(&addr, ConnectionProperties::default()) .await .expect("connection error"); info!("CONNECTED"); //receive channel let channel = conn.create_channel().await.expect("create_channel"); info!(state=?conn.status().state()); let queue = channel .queue_declare( "hello", QueueDeclareOptions::default(), FieldTable::default(), ) .await .expect("queue_declare"); info!(state=?conn.status().state()); info!(?queue, "Declared queue"); info!("will consume"); let mut consumer = channel .basic_consume( "hello", "my_consumer", BasicConsumeOptions::default(), FieldTable::default(), ) .await .expect("basic_consume"); info!(state=?conn.status().state()); while let Some(delivery) = consumer.next().await { info!(message=?delivery, "received message"); if let Ok(delivery) = delivery { delivery .ack(BasicAckOptions::default()) .await .expect("basic_ack"); } } }) }