use neutron::{ConsumerBuilder, Message}; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Deserialize, Serialize)] #[allow(dead_code)] struct Data { name: String, } impl TryFrom> for Data { type Error = Box; fn try_from(value: Vec) -> Result { Ok(Data { name: String::from_utf8(value).unwrap(), }) } } impl From for Vec { fn from(val: Data) -> Self { val.name.as_bytes().to_vec() } } #[tokio::main] async fn main() -> Result<(), Box> { env_logger::init(); let pulsar_config = neutron::PulsarConfig { endpoint_url: "pulsar://localhost".to_string(), endpoint_port: 6650, }; let pulsar = neutron::PulsarBuilder::new() .with_config(pulsar_config) .build() .run(); let consumer = ConsumerBuilder::new() .with_topic("test") .with_subscription("test") .with_consumer_name("test") .connect(&pulsar) .await?; let mut count = 0; while count < 1000000 { let response: Vec> = consumer.next_batch(1000).await?; response.iter().for_each(|msg| { log::info!("Received message: {:?}", msg.payload); count += 1; }); consumer .ack_all(response.into_iter().map(|msg| msg.ack).collect()) .await?; } Ok(()) }