RabbitMQ Streams Client for Rust


A Rust client for RabbitMQ Streams

Tests status Download docs.rs docs deps codecov
Welcome to the documentation for the RabbitMQ Stream Rust Client. This guide provides comprehensive information on installation, usage, and examples. ## Table of Contents 1. [Introduction](#introduction) 2. [Installation](#installation) 3. [Getting Started](#getting-started) 4. [Usage](#usage) - [Publishing Messages](#publishing-messages) - [Consuming Messages](#consuming-messages) - [Super Stream](#super-stream) - [Single Active Consumer](#single-active-consumer) - [Filtering](#filtering) 5. [Examples](#examples) 6. [Development](#development) - [Compiling](#Compiling) - [Running Tests](#running-tests) - [Running Benchmarks](#running-benchmarks) - [Contributing](#contributing) - [License](#license) ## Introduction The RabbitMQ Stream Rust Client is a library designed for integrating Rust applications with RabbitMQ streams efficiently. It supports high throughput and low latency message streaming. ## Installation Install from [crates.io](https://crates.io/crates/rabbitmq-stream-client) ```toml [dependencies] rabbitmq-stream-client = "*" ``` Then run `cargo build `to include it in your project. ## Getting Started This section covers the initial setup and necessary steps to incorporate the RabbitMQ Stream client into your Rust application. Ensure RabbitMQ server with stream support is installed. The main access point is `Environment`, which is used to connect to a node. ```rust,no_run use rabbitmq_stream_client::Environment; let environment = Environment::builder().build().await?; ``` ### Environment with TLS ```rust,no_run use rabbitmq_stream_client::Environment; let tls_configuration: TlsConfiguration = TlsConfiguration::builder() .add_root_certificates(String::from(".ci/certs/ca_certificate.pem")) .build(); // Use this configuration if you want to trust the certificates // without providing the root certificate let tls_configuration: TlsConfiguration = TlsConfiguration::builder() .trust_certificates(true) .build(); let environment = Environment::builder() .host("localhost") .port(5551) // specify the TLS port of the node .tls(tls_configuration) .build() ``` ### Environment with a load balancer See the [documentation](https://www.rabbitmq.com/blog/2021/07/23/connecting-to-streams#with-a-load-balancer) about the stream and load-balancer. ```rust,no_run use rabbitmq_stream_client::Environment; let environment = Environment::builder() .load_balancer_mode(true) .build() ``` ## Publishing messages You can publish messages with three different methods: * `send`: asynchronous, messages are automatically buffered internally and sent at once after a timeout expires. On confirmation a callback is triggered. See the [example](./examples/send_async.rs) * `batch_send`: asynchronous, the user buffers the messages and sends them. This is the fastest publishing method. On confirmation a callback is triggered. See the [example](./examples/batch_send.rs) * `send_with_confirm`: synchronous, the caller wait till the message is confirmed. This is the slowest publishing method. See the [example](./examples/send_with_confirm.rs) ## Consuming messages As streams never delete any messages, any consumer can start reading/consuming from any point in the log See the Consuming section part of the streaming doc for further info (Most of the examples refer to Java but applies for ths library too): [Consuming messages from a stream](https://www.rabbitmq.com/docs/streams#consuming) See also the Rust streaming tutorial-2 on how consume messages starting from different positions and how to enable Server-Side Offset Tracking too: [RabbitMQ Streams - Rust tutorial 2](https://www.rabbitmq.com/tutorials/tutorial-two-rust-stream) and the relative examples from the tutorials: [Rust tutorials examples](https://github.com/rabbitmq/rabbitmq-tutorials/tree/main/rust-stream) See also a simple example here on how to consume from a stream: [Consuming messages from a stream example](./examples/simple-consumer.rs) ## Super Stream The client supports the super-stream functionality. A super stream is a logical stream made of individual, regular streams. It is a way to scale out publishing and consuming with RabbitMQ Streams: a large logical stream is divided into partition streams, splitting up the storage and the traffic on several cluster nodes. See the [blog post](https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-feature-preview-super-streams/) for more info. You can use SuperStreamProducer and SuperStreamConsumer classes which internally uses producers and consumers to operate on the componsing streams. SuperstreamProducers can act in Hashing and Routing Key mode. See the Java documentation for more details (Same concepts apply here): [Super Stream Producer - Java doc](https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#super-stream-producer) Have a look to the examples to see on how to work with super streams. See the [Super Stream Producer Example using Hashing mmh3 mode](./examples/superstreams/send_super_stream_hash.rs) See the [Super Stream Producer Example using Routing key mode](./examples/superstreams/send_super_stream_routing_key.rs) See the [Super Stream Consumer Example](./examples/superstreams/receive_super_stream.rs) ## Single active consumer The client supports the single-active-consumer feature: [single-active-consumer feature](https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams) See the Java doc for further information (Same concepts apply here): [Single-Active-Consumer Java doc](https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#single-active-consumer) See the Rust full example here: [Single-Active-Consumer-Full-Example](/examples/single_active_consumer) ## Filtering Filtering is a new streaming feature enabled from RabbitMQ 3.13 based on Bloom filter. RabbitMQ Stream provides a server-side filtering feature that avoids reading all the messages of a stream and filtering only on the client side. This helps to save network bandwidth when a consuming application needs only a subset of messages. See the Java documentation for more details (Same concepts apply here): [Filtering - Java Doc](https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#filtering) See Rust filtering examples: See the [Producer with filtering Example](./examples/filtering/send_with_filtering.rs) See the [Consumer with filtering Example](./examples/filtering/receive_with_filtering.rs) ### Examples Refer to the [examples](./examples) directory for detailed code samples illustrating various use cases like error handling, batch processing, super streams and different ways to send messages. ## Development ### Compiling ```bash git clone https://github.com/rabbitmq/rabbitmq-stream-rust-client . make build ``` ### Running Tests To run tests you need to have a running RabbitMQ Stream node with a TLS configuration. It is mandatory to use `make rabbitmq-server` to create a TLS configuration compatible with the tests. See the `Environment` TLS tests for more details. ```bash make rabbitmq-server make test ``` ### Running Benchmarks ```bash make rabbitmq-server make run-benchmark ``` ## Contributing Contributions are welcome! Please read our contributing guide to understand how to submit issues, enhancements, or patches. ## License See the LICENSE file for details.