//!
//! # Common For tests
//!
use rdkafka::config::ClientConfig;
use rdkafka::message::ToBytes;
use rdkafka::producer::{FutureProducer, FutureRecord};
use std::collections::HashMap;
use std::time::Duration;
/// Taken from https://github.com/fede1024/rust-rdkafka/blob/master/tests/utils.rs with some slight modifications and updates
/// credit to rdkafka
/// Produce the specified count of messages to the topic and partition specified. A map
/// of (partition, offset) -> message id will be returned. It panics if any error is encountered
/// while populating the topic.
pub async fn populate_topic
(
bootstrap_server: &str,
topic_name: &str,
count: i32,
value_fn: &P,
key_fn: &K,
partition: Option,
timestamp: Option,
) -> HashMap<(i32, i64), i32>
where
P: Fn(i32) -> J,
K: Fn(i32) -> Q,
J: ToBytes,
Q: ToBytes,
{
// Produce some messages
let producer = &ClientConfig::new()
.set("bootstrap.servers", bootstrap_server)
.set("statistics.interval.ms", "500")
.set("api.version.request", "true")
//.set("debug", "all")
.set("message.timeout.ms", "10000")
.create::>()
.expect("Producer creation error");
let futures = (0..count)
.map(|id| {
let future = async move {
producer
.send(
FutureRecord {
topic: topic_name,
payload: Some(&value_fn(id)),
key: Some(&key_fn(id)),
partition,
timestamp,
headers: None,
},
Duration::from_secs(1),
)
.await
};
(id, future)
})
.collect::>();
let mut message_map = HashMap::new();
for (id, future) in futures {
match future.await {
Ok((partition, offset)) => message_map.insert((partition, offset), id),
Err((kafka_error, _message)) => panic!("Delivery failed: {}", kafka_error),
};
}
message_map
}
pub fn value_fn(id: i32) -> String {
format!("Message {}", id)
}
pub fn key_fn(id: i32) -> String {
format!("Key {}", id)
}
pub fn random_topic_name() -> String {
rusty_ulid::generate_ulid_string()
}
#[cfg(test)]
#[ctor::ctor]
fn init() {
// Enable RUST_LOG logging configuration for test
let _ = env_logger::builder()
.filter_level(log::LevelFilter::Info)
.parse_filters("testcontainers-redpanda-rs=debug")
.is_test(true)
.try_init();
}