//! //! # Common For tests //! use rdkafka::config::ClientConfig; use rdkafka::message::ToBytes; use rdkafka::producer::{FutureProducer, FutureRecord}; use std::collections::HashMap; use std::time::Duration; /// Taken from https://github.com/fede1024/rust-rdkafka/blob/master/tests/utils.rs with some slight modifications and updates /// credit to rdkafka /// Produce the specified count of messages to the topic and partition specified. A map /// of (partition, offset) -> message id will be returned. It panics if any error is encountered /// while populating the topic. pub async fn populate_topic( bootstrap_server: &str, topic_name: &str, count: i32, value_fn: &P, key_fn: &K, partition: Option, timestamp: Option, ) -> HashMap<(i32, i64), i32> where P: Fn(i32) -> J, K: Fn(i32) -> Q, J: ToBytes, Q: ToBytes, { // Produce some messages let producer = &ClientConfig::new() .set("bootstrap.servers", bootstrap_server) .set("statistics.interval.ms", "500") .set("api.version.request", "true") //.set("debug", "all") .set("message.timeout.ms", "10000") .create::>() .expect("Producer creation error"); let futures = (0..count) .map(|id| { let future = async move { producer .send( FutureRecord { topic: topic_name, payload: Some(&value_fn(id)), key: Some(&key_fn(id)), partition, timestamp, headers: None, }, Duration::from_secs(1), ) .await }; (id, future) }) .collect::>(); let mut message_map = HashMap::new(); for (id, future) in futures { match future.await { Ok((partition, offset)) => message_map.insert((partition, offset), id), Err((kafka_error, _message)) => panic!("Delivery failed: {}", kafka_error), }; } message_map } pub fn value_fn(id: i32) -> String { format!("Message {}", id) } pub fn key_fn(id: i32) -> String { format!("Key {}", id) } pub fn random_topic_name() -> String { rusty_ulid::generate_ulid_string() } #[cfg(test)] #[ctor::ctor] fn init() { // Enable RUST_LOG logging configuration for test let _ = env_logger::builder() .filter_level(log::LevelFilter::Info) .parse_filters("testcontainers-redpanda-rs=debug") .is_test(true) .try_init(); }