| Crates.io | kafka-http |
| lib.rs | kafka-http |
| version | 0.1.2 |
| created_at | 2025-10-14 16:35:55.363437+00 |
| updated_at | 2025-10-15 17:35:23.167941+00 |
| description | Simple Rust Kafka HTTP client to allow a rust native way to interact with the restful kafka HTTP proxy |
| homepage | |
| repository | |
| max_upload_size | |
| id | 1882696 |
| size | 93,686 |
A Rust client library for interacting with Kafka REST Proxy over HTTP.
The Rust kafka binary clients are pretty bad to work with, buggy, incomplete, and the popular rdkafka crate requires a lot of dependencies and forces the use of C bindings and the C++ standard library, build system, etc. that is very heavyweight for a simple Kafka client.
The other projects such as kafka-rust and samsa are also not very ergonomic along with being effectively unmaintained. After working with them, I decided to write my own client via the http client proxy to have a lightweight and simple client that is easy to use without any large dependencies or C bindings.
Note that some features are missing, such as admin APIs, etc. but feel free to extend, fork, etc.
kafka-http provides a simple and ergonomic interface for working with Apache Kafka through the Kafka REST Proxy. It supports consumer group management, message consumption, message production, and offset management.
Add this to your Cargo.toml:
[dependencies]
kafka-http = "0.1" # or the latest version
Note that this example uses the redpanda kafka rest proxy. If you want to use Kafka, confluence HTTP Proxy, etc. the port usually is 8082 but should be set to whatever your port is.
use std::error;
use std::error::Error;
use kafka_http::{KafkaHttpClient, CreateConsumerParams, SubscribeParams};
#[tokio::main]
async fn main() -> Result<(), Box<dyn error::Error>> { // Create a client let mut client = KafkaHttpClient::new(" http://localhost:8082");
// Create a client
let mut client = KafkaHttpClient::new(" http://localhost:18082");
// Set polling timeout (optional, default is 1000ms)
client.set_timeout_ms(5000);
// Create a consumer
let consumer_params = CreateConsumerParams {
name: "my-consumer".to_string(),
format: "json".to_string(),
auto_offset_reset: Some("earliest".to_string()),
..Default::default()
};
client.create_consumer("my-group", &consumer_params).await?;
// Subscribe to topics
let subscribe_params = SubscribeParams {
topics: vec!["my-topic".to_string()],
..Default::default()
};
client.subscribe("my-group", &subscribe_params).await?;
// Poll for messages
loop {
let records = client.poll().await?;
for record in records {
println!("Received: key={:?}, value={:?}", record.key, record.value);
}
}
Ok(())
}
use std::error;
use std::error::Error;
use kafka_http::{KafkaHttpClient, CreateConsumerParams, SubscribeParams};
#[tokio::main]
async fn main() -> Result<(), Box<dyn error::Error>> { // Create a client let mut client = KafkaHttpClient::new(" http://localhost:8082");
// Create a client
let mut client = KafkaHttpClient::new(" http://localhost:18082");
let params = ProduceParams {
records: vec![
// Add your records here
],
..Default::default()
};
client.produce("my-topic", ¶ms).await?;
Ok(())
}
use std::error;
use std::error::Error;
use kafka_http::{KafkaHttpClient, CreateConsumerParams, SubscribeParams};
#[tokio::main]
async fn main() -> Result<(), Box<dyn error::Error>> { // Create a client let mut client = KafkaHttpClient::new(" http://localhost:8082");
// Create a client
let mut client = KafkaHttpClient::new(" http://localhost:18082");
let commit_params = PartitionOffsetCommitParams {
offsets: vec![
// Add partition offset information here
],
..Default::default()
};
client.commit(&commit_params).await?;
Ok(())
}
KafkaHttpClientThe main client struct for interacting with Kafka REST Proxy.
new(base_uri: &str) -> Self - Create a new client instanceset_timeout_ms(&mut self, timeout_ms: u64) - Set the polling timeoutset_consumer_uri(&mut self, uri: &String) - Manually set consumer URIcreate_consumer(&mut self, group: &str, params: &CreateConsumerParams) -> Result<String, Error> - Create a new consumer (fails if exists)try_create_consumer(&mut self, group: &str, params: &CreateConsumerParams) -> Result<Option<String>, Error> - Create or reconnect to existing consumersubscribe(&self, group: &str, params: &SubscribeParams) -> Result<(), Error> - Subscribe to topicspoll(&self) -> Result<Vec<Record>, Error> - Poll for new recordsproduce(&self, topic: &str, params: &ProduceParams) -> Result<(), Error> - Produce messagescommit(&self, params: &PartitionOffsetCommitParams) -> Result<(), Error> - Commit offsetsreqwest - HTTP clientserde / serde_json - JSON serializationtracing - Logging and diagnosticsThe library uses a custom Error type that provides detailed error information. All async methods return Result<T, Error>.
The library uses tracing for logging. To enable logs in your application: tracing
See the LICENSE file for details.
Contributions are welcome! Please feel free to submit a Pull Request.
For more examples, see the tests/ directory.
Feel free to open an issue or submit a pull request.