Crates.io | rskafka_wasi |
lib.rs | rskafka_wasi |
version | 0.3.1 |
source | src |
created_at | 2022-11-22 08:30:43.792295 |
updated_at | 2023-02-04 02:47:24.137346 |
description | A minimal Rust client for Apache Kafka |
homepage | |
repository | https://github.com/WasmEdge/rskafka_wasi |
max_upload_size | |
id | 720676 |
size | 586,002 |
This crate aims to be a minimal Kafka implementation for simple workloads that wish to use Kafka as a distributed write-ahead log. This is a fork from the original RSKafka with support for WebAssembly compilation target. That allows Kafka apps to run inside the WasmEdge Runtime as a lightweight and secure alternative to natively compiled apps in Linux container.
It is not a general-purpose Kafka implementation, instead it is heavily optimised for simplicity, both in terms of implementation and its emergent operational characteristics. In particular, it aims to meet the needs of IOx.
This crate has:
It will be a good fit for workloads that:
Perform offset tracking independently of Kafka
Read/Write reasonably sized payloads per-partition
Have a low number of high-throughput partitions 1
# async fn test() {
use rskafka::{
client::{
ClientBuilder,
partition::{Compression, UnknownTopicHandling},
},
record::Record,
};
use chrono::{TimeZone, Utc};
use std::collections::BTreeMap;
// setup client
let connection = "localhost:9093".to_owned();
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
// create a topic
let topic = "my_topic";
let controller_client = client.controller_client().unwrap();
controller_client.create_topic(
topic,
2, // partitions
1, // replication factor
5_000, // timeout (ms)
).await.unwrap();
// get a partition-bound client
let partition_client = client
.partition_client(
topic.to_owned(),
0, // partition
UnknownTopicHandling::Retry,
)
.await
.unwrap();
// produce some data
let record = Record {
key: None,
value: Some(b"hello kafka".to_vec()),
headers: BTreeMap::from([
("foo".to_owned(), b"bar".to_vec()),
]),
timestamp: Utc.timestamp_millis(42),
};
partition_client.produce(vec![record], Compression::default()).await.unwrap();
// consume data
let (records, high_watermark) = partition_client
.fetch_records(
0, // offset
1..1_000_000, // min..max bytes
1_000, // max wait time
)
.await
.unwrap();
# }
For more advanced production and consumption, see [crate::client::producer
] and [crate::client::consumer
].
compression-gzip
(default): Support compression and decompression of messages using gzip.compression-lz4
(default): Support compression and decompression of messages using LZ4.compression-snappy
(default): Support compression and decompression of messages using Snappy.compression-zstd
(default): Support compression and decompression of messages using zstd.full
: Includes all stable features (compression-gzip
, compression-lz4
, compression-snappy
,
compression-zstd
, transport-socks5
, transport-tls
).transport-socks5
: Allow transport via SOCKS5 proxy.To run integration tests against Redpanda, run:
$ docker-compose -f docker-compose-redpanda.yml up
in one session, and then run:
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=redpanda KAFKA_CONNECT=0.0.0.0:9011 cargo test
in another session.
To run integration tests against Apache Kafka, run:
$ docker-compose -f docker-compose-kafka.yml up
in one session, and then run:
$ TEST_INTEGRATION=1 TEST_BROKER_IMPL=kafka KAFKA_CONNECT=localhost:9011 cargo test
in another session. Note that Apache Kafka supports a different set of features then redpanda, so we pass other environment variables.
Licensed under either of these:
Unless you explicitly state otherwise, any contribution you intentionally submit for inclusion in the work, as defined in the Apache-2.0 license, shall be dual-licensed as above, without any additional terms or conditions.
Kafka's design makes it hard for any client to support the converse, as ultimately each partition is an independent write stream within the broker. However, this crate makes no attempt to mitigate per-partition overheads e.g. by batching writes to multiple partitions in a single ProduceRequest ↩