// code mostly borrowed from the rdkafka simple consumer example #![allow(dead_code)] #![allow(unused_variables)] #![allow(unused_imports)] use std::io::Write; use std::u32; use std::convert::TryFrom; use clap::{App, Arg}; use log::{info, self}; use rdkafka::client::ClientContext; use rdkafka::config::{ClientConfig, RDKafkaLogLevel}; use rdkafka::consumer::stream_consumer::StreamConsumer; use rdkafka::consumer::{CommitMode, Consumer, ConsumerContext, Rebalance}; use rdkafka::error::KafkaResult; use rdkafka::topic_partition_list::TopicPartitionList; use rdkafka::util::get_rdkafka_version; use rdkafka::Message; use kafka_offsets_parser::{OffsetCommitValue, ConsumerOffsetsMessageKey}; // A context can be used to change the behavior of producers and consumers by adding callbacks // that will be executed by librdkafka. // This particular context sets up custom callbacks to log rebalancing events. struct CustomContext; impl ClientContext for CustomContext {} impl ConsumerContext for CustomContext { fn pre_rebalance(&self, rebalance: &Rebalance) { log::trace!("Pre rebalance {:?}", rebalance); } fn post_rebalance(&self, rebalance: &Rebalance) { log::trace!("Post rebalance {:?}", rebalance); } fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) { log::trace!("Committing offsets: {:?}", result); } } // A type alias with your custom consumer can be created for convenience. type LoggingConsumer = StreamConsumer; async fn consume_and_print(brokers: &str, group_id: &str) { let context = CustomContext; let consumer: LoggingConsumer = ClientConfig::new() .set("group.id", group_id) .set("bootstrap.servers", brokers) .set("enable.partition.eof", "false") .set("session.timeout.ms", "6000") .set("enable.auto.commit", "false") .set("enable.auto.offset.store", "false") .set("auto.offset.reset", "latest") //.set("statistics.interval.ms", "30000") //.set("auto.offset.reset", "smallest") .set_log_level(RDKafkaLogLevel::Debug) .create_with_context(context) .expect("Consumer creation failed"); eprintln!("subscribing"); consumer .subscribe(&["__consumer_offsets"]) .expect("Can't subscribe to specified topics"); loop { match consumer.recv().await { Err(e) => eprintln!("Kafka error: {}", e), Ok(m) => { if let (Some(key), Some(message)) = (m.key(), m.payload()) { let message_key = ConsumerOffsetsMessageKey::try_from(key).unwrap(); match message_key { ConsumerOffsetsMessageKey::Offset(offset_key) => { let offset = OffsetCommitValue::try_from(message).unwrap(); println!("{:?}", offset_key); println!("{:?}", offset); }, _ => (), // no-op } } } }; } } #[tokio::main] async fn main() { env_logger::init(); let matches = App::new("consumer example") .version(option_env!("CARGO_PKG_VERSION").unwrap_or("")) .about("Parse the __consumer_offsets topic messages") .arg( Arg::with_name("brokers") .short("b") .long("brokers") .help("Broker list in kafka format") .takes_value(true) .default_value("localhost:9092"), ) .arg( Arg::with_name("group-id") .short("g") .long("group-id") .help("Consumer group id") .takes_value(true) .default_value("consumer-offsets-parser"), ) .get_matches(); let (version_n, version_s) = get_rdkafka_version(); info!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s); let brokers = matches.value_of("brokers").unwrap(); let group_id = matches.value_of("group-id").unwrap(); consume_and_print(brokers, group_id).await }