Crates.io | picokafka |
lib.rs | picokafka |
version | 0.1.12 |
source | src |
created_at | 2022-09-16 12:16:33.161666 |
updated_at | 2024-06-21 12:05:36.074417 |
description | Kafka library for tarantool-module based on librdkafka. |
homepage | |
repository | https://git.picodata.io/picodata/picodata/kafka-driver |
max_upload_size | |
id | 667324 |
size | 79,580 |
Kafka driver for distributed systems built with tarantool-module.
Create new consumer:
use picokafka::consumer;
let consumer = consumer::Builder::new("kafka:29092")
.with_group("group_1")
.append_topic("topic_1")
.start();
You can pass additional configuration parameters for librdkafka https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with Builder::with_opts
method:
let consumer = consumer::Builder::new("kafka:29092")
.with_opt("enable.auto.offset.store", "false")
.with_session_timeout(Duration::from_secs(10))
.start();
For handling consumer output use Consumer::output
method:
let consumed = consumer.output().collect::<Vec<_>>();
consumed.iter().for_each(|received| {
assert!(received.is_ok());
});
Note that consumer prepare kafka records for output in separate tokio threads.
Create new producer:
use picokafka::producer;
let producer = producer::Builder::new("kafka:29092")
.with_message_timeout(Duration::from_secs(1))
.build()
.unwrap();
You can pass additional configuration parameters for librdkafka https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with Builder::with_opts
method.
Send message:
producer.send(
Record::new("topic_1")
.key(String::from("key_1"))
.payload(String::from("payload_1")),
Duration::from_secs(1),
move |result| {
println!("send result: {:?}", result);
},
);
Note that sent callback executed in separate tokio threads. If you want to use
tarantool API - use TarantoolProducer
instead of Producer
.
Create:
use picokafka::producer;
let producer = producer::Builder::new("kafka:29092")
.with_message_timeout(Duration::from_secs(1))
.build()
.unwrap()
.tarantool();
Send:
producer.send(
IdentifiedRecord::new(
2,
Record::new("topic_1")
.key(String::from("key_1"))
.payload(String::from("payload_1")),
),
Duration::from_secs(1),
);
Sent result handling:
producer.output().for_each(|(descriptor, _)| {
let descriptor = descriptor.downcast::<i32>().unwrap();
assert!(*descriptor == 2);
});
TarantoolProducer
use IdentifiedRecord
instead of Record
because we need a way to distinguish messages in the output from each other.
For enabling ssl and sasl protocols use a "ssl" feature. See auth test for familiarize with it.
Picokafka supports a statistic callbacks, use an own context implementation on producer/receiver for acquire it. Note that all callbacks implemented in context will be executed in librdkafka threads, not in TX thread. About statistic format: https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md. See test as example:
test_producer_statistic
test_consumer_statistic
You need start kafka before testing. You can use tests/docker-compose.yml file:
docker run --rm -v $(pwd)/tests:/opt/kafka confluentinc/cp-kafka:latest /opt/kafka/setup_ssl.sh
docker-compose -f tests/docker-compose.yml up -d
Or create your own environment (set KAFKA_ADDR and KAFKA_REST_ADDR if you do that).
Then run cargo test
or cargo test --features "ssl"
if you need ssl feature.
After starting kafka environment use cargo bench
command.