/// Still to test: /// /// * compression /// * secure connections use super::*; use std::collections::HashMap; use std::collections::HashSet; use std::time::Duration; use kafka::client::fetch::Response; use kafka::client::{ CommitOffset, FetchOffset, FetchPartition, PartitionOffset, ProduceMessage, RequiredAcks, }; fn flatten_fetched_messages(resps: &Vec) -> Vec<(&str, i32, &[u8])> { let mut messages = Vec::new(); for resp in resps { for topic in resp.topics() { for partition in topic.partitions() { for msg in partition.data().as_ref().unwrap().messages() { messages.push((topic.topic(), partition.partition(), msg.value)); } } } } messages } #[test] fn test_kafka_client_load_metadata() { let hosts = vec![LOCAL_KAFKA_BOOTSTRAP_HOST.to_owned()]; let mut client = new_kafka_client(); let client_id = "test-id".to_string(); client.set_client_id(client_id.clone()); client.load_metadata_all(); // why unwrap?? let topics = client.topics(); // sanity checks assert_eq!(hosts.as_ref() as &[String], client.hosts()); assert_eq!(&client_id, client.client_id()); // names let topic_names: HashSet<&str> = topics .names() // don't count the consumer offsets internal topic .filter(|name| *name != KAFKA_CONSUMER_OFFSETS_TOPIC_NAME) .collect(); let mut correct_topic_names: HashSet<&str> = HashSet::new(); correct_topic_names.insert(TEST_TOPIC_NAME); correct_topic_names.insert(TEST_TOPIC_NAME_2); assert_eq!(correct_topic_names, topic_names); // partitions let mut topic_partitions = topics.partitions(TEST_TOPIC_NAME).unwrap().available_ids(); let mut correct_topic_partitions = TEST_TOPIC_PARTITIONS.to_vec(); assert_eq!(correct_topic_partitions, topic_partitions); topic_partitions = topics .partitions(TEST_TOPIC_NAME_2) .unwrap() .available_ids(); correct_topic_partitions = TEST_TOPIC_PARTITIONS.to_vec(); assert_eq!(correct_topic_partitions, topic_partitions); } /// Tests: /// /// * KafkaClient::produce_messages /// * KafkaClient::fetch_messages /// * KafkaClient::fetch_offsets #[test] fn test_produce_fetch_messages() { tracing_subscriber::fmt::try_init(); let mut client = new_ready_kafka_client(); let topics = [TEST_TOPIC_NAME, TEST_TOPIC_NAME_2]; let init_latest_offsets = client.fetch_offsets(&topics, FetchOffset::Latest).unwrap(); // first send the messages and verify correct confirmation responses // from kafka let req = vec![ ProduceMessage::new(TEST_TOPIC_NAME, 0, None, Some("a".as_bytes())), ProduceMessage::new(TEST_TOPIC_NAME, 1, None, Some("b".as_bytes())), ProduceMessage::new(TEST_TOPIC_NAME_2, 0, None, Some("c".as_bytes())), ProduceMessage::new(TEST_TOPIC_NAME_2, 1, None, Some("d".as_bytes())), ]; let resp = client .produce_messages(RequiredAcks::All, Duration::from_millis(1000), req) .unwrap(); assert_eq!(2, resp.len()); // need to keep track of the offsets so we can fetch them next let mut fetches = Vec::new(); for confirm in &resp { assert!(confirm.topic == TEST_TOPIC_NAME || confirm.topic == TEST_TOPIC_NAME_2); assert_eq!(2, confirm.partition_confirms.len()); assert!(confirm .partition_confirms .iter() .any(|part_confirm| { part_confirm.partition == 0 && part_confirm.offset.is_ok() })); assert!(confirm .partition_confirms .iter() .any(|part_confirm| { part_confirm.partition == 1 && part_confirm.offset.is_ok() })); for part_confirm in confirm.partition_confirms.iter() { fetches.push(FetchPartition::new( confirm.topic.as_ref(), part_confirm.partition, part_confirm.offset.unwrap(), )); } } // now fetch the messages back and verify that they are the correct // messages let fetch_resps = client.fetch_messages(fetches).unwrap(); let messages = flatten_fetched_messages(&fetch_resps); let correct_messages = vec![ (TEST_TOPIC_NAME, 0, "a".as_bytes()), (TEST_TOPIC_NAME, 1, "b".as_bytes()), (TEST_TOPIC_NAME_2, 0, "c".as_bytes()), (TEST_TOPIC_NAME_2, 1, "d".as_bytes()), ]; assert!(correct_messages .into_iter() .all(|c_msg| { messages.contains(&c_msg) })); let end_latest_offsets = client.fetch_offsets(&topics, FetchOffset::Latest).unwrap(); for (topic, begin_partition_offsets) in init_latest_offsets { let begin_partition_offsets: HashMap = begin_partition_offsets .iter() .map(|po| (po.partition, po.offset)) .collect(); let end_partition_offsets: HashMap = end_latest_offsets .get(&topic) .unwrap() .iter() .map(|po| (po.partition, po.offset)) .collect(); for (partition, begin_offset) in begin_partition_offsets { let end_offset = end_partition_offsets.get(&partition).unwrap(); assert_eq!(begin_offset + 1, *end_offset); } } } #[test] fn test_commit_offset() { tracing_subscriber::fmt::try_init(); let mut client = new_ready_kafka_client(); for &(partition, offset) in &[ (TEST_TOPIC_PARTITIONS[0], 100), (TEST_TOPIC_PARTITIONS[1], 200), (TEST_TOPIC_PARTITIONS[0], 300), (TEST_TOPIC_PARTITIONS[1], 400), (TEST_TOPIC_PARTITIONS[0], 500), (TEST_TOPIC_PARTITIONS[1], 600), ] { client.commit_offset(TEST_GROUP_NAME, TEST_TOPIC_NAME, partition, offset); let partition_offsets: HashSet = client .fetch_group_topic_offset(TEST_GROUP_NAME, TEST_TOPIC_NAME) .unwrap() // Already being unwrapped .into_iter() .collect(); let correct_partition_offset = PartitionOffset { partition, offset }; assert!(partition_offsets.contains(&correct_partition_offset)); } } #[test] fn test_commit_offsets() { let mut client = new_ready_kafka_client(); let commits = [ [ CommitOffset { topic: TEST_TOPIC_NAME, partition: TEST_TOPIC_PARTITIONS[0], offset: 100, }, CommitOffset { topic: TEST_TOPIC_NAME, partition: TEST_TOPIC_PARTITIONS[1], offset: 200, }, ], [ CommitOffset { topic: TEST_TOPIC_NAME, partition: TEST_TOPIC_PARTITIONS[0], offset: 300, }, CommitOffset { topic: TEST_TOPIC_NAME, partition: TEST_TOPIC_PARTITIONS[1], offset: 400, }, ], [ CommitOffset { topic: TEST_TOPIC_NAME, partition: TEST_TOPIC_PARTITIONS[0], offset: 500, }, CommitOffset { topic: TEST_TOPIC_NAME, partition: TEST_TOPIC_PARTITIONS[1], offset: 600, }, ], ]; for commit_pair in &commits { client.commit_offsets(TEST_GROUP_NAME, commit_pair); // why unwrap ?? let partition_offsets: HashSet = client .fetch_group_topic_offset(TEST_GROUP_NAME, TEST_TOPIC_NAME) .unwrap() .into_iter() .collect(); println!("partition_offsets: {:?}", partition_offsets); let correct_partition_offsets: HashSet = vec![ PartitionOffset { partition: commit_pair[0].partition, offset: commit_pair[0].offset, }, PartitionOffset { partition: commit_pair[1].partition, offset: commit_pair[1].offset, }, ] .into_iter() .collect(); assert_eq!(correct_partition_offsets, partition_offsets); } }