picokafka

Crates.iopicokafka
lib.rspicokafka
version0.1.12
sourcesrc
created_at2022-09-16 12:16:33.161666
updated_at2024-06-21 12:05:36.074417
descriptionKafka library for tarantool-module based on librdkafka.
homepage
repositoryhttps://git.picodata.io/picodata/picodata/kafka-driver
max_upload_size
id667324
size79,580
(picodata-account)

documentation

README

PICOKAFKA

Latest Version Docs badge

Kafka driver for distributed systems built with tarantool-module.

Consumer

Create new consumer:

use picokafka::consumer;

let consumer = consumer::Builder::new("kafka:29092")
        .with_group("group_1")
        .append_topic("topic_1")
        .start();

You can pass additional configuration parameters for librdkafka https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with Builder::with_opts method:

    let consumer = consumer::Builder::new("kafka:29092")
        .with_opt("enable.auto.offset.store", "false")
        .with_session_timeout(Duration::from_secs(10))
        .start();

For handling consumer output use Consumer::output method:

    let consumed = consumer.output().collect::<Vec<_>>();
    consumed.iter().for_each(|received| {
        assert!(received.is_ok());
    });

Note that consumer prepare kafka records for output in separate tokio threads.

Producer

Create new producer:

use picokafka::producer;

let producer = producer::Builder::new("kafka:29092")
    .with_message_timeout(Duration::from_secs(1))
    .build()
    .unwrap();

You can pass additional configuration parameters for librdkafka https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md with Builder::with_opts method.

Send message:

producer.send(
    Record::new("topic_1")
        .key(String::from("key_1"))
        .payload(String::from("payload_1")),
    Duration::from_secs(1),
    move |result| {
        println!("send result: {:?}", result);
    },
);

Note that sent callback executed in separate tokio threads. If you want to use tarantool API - use TarantoolProducer instead of Producer.

TarantoolProducer

Create:

use picokafka::producer;

let producer = producer::Builder::new("kafka:29092")
    .with_message_timeout(Duration::from_secs(1))
    .build()
    .unwrap()
    .tarantool();

Send:

    producer.send(
        IdentifiedRecord::new(
            2,
            Record::new("topic_1")
                .key(String::from("key_1"))
                .payload(String::from("payload_1")),
        ),
        Duration::from_secs(1),
    );

Sent result handling:

    producer.output().for_each(|(descriptor, _)| {
        let descriptor = descriptor.downcast::<i32>().unwrap();
        assert!(*descriptor == 2);
    });

TarantoolProducer use IdentifiedRecord instead of Record because we need a way to distinguish messages in the output from each other.

SSL and SASL

For enabling ssl and sasl protocols use a "ssl" feature. See auth test for familiarize with it.

Statistic

Picokafka supports a statistic callbacks, use an own context implementation on producer/receiver for acquire it. Note that all callbacks implemented in context will be executed in librdkafka threads, not in TX thread. About statistic format: https://github.com/confluentinc/librdkafka/blob/master/STATISTICS.md. See test as example:

  • producer - test_producer_statistic
  • consumer - test_consumer_statistic

Tests

You need start kafka before testing. You can use tests/docker-compose.yml file:

    docker run --rm -v $(pwd)/tests:/opt/kafka confluentinc/cp-kafka:latest /opt/kafka/setup_ssl.sh
    docker-compose -f tests/docker-compose.yml up -d

Or create your own environment (set KAFKA_ADDR and KAFKA_REST_ADDR if you do that).

Then run cargo test or cargo test --features "ssl" if you need ssl feature.

Benchmarks

After starting kafka environment use cargo bench command.

Commit count: 0

cargo fmt