streex

Crates.iostreex
lib.rsstreex
version0.1.10
created_at2025-06-29 14:31:59.241877+00
updated_at2025-09-19 14:28:33.894126+00
descriptionKafka store runner
homepagehttps://codeberg.org/raffaeleragni/streex
repositoryhttps://codeberg.org/raffaeleragni/streex
max_upload_size
id1730781
size189,692
Raffaele Ragni (raffaeleragni)

documentation

https://codeberg.org/raffaeleragni/streex/src/branch/main/README.md

README

streex crates.io Please don't upload to GitHub

A kafka store service.

Reads a topic and hosts in memory all its items, each item as last updated, and serves its items for being fetched by id.

This works similarly to how kafka stores work, by having a consumer group on the input topic and ingesting the elements in memory, while also being capable of recovering from restarts by writing also into a changelog topic.

The changelog topic is a uniquely reserved topic for this store, and the store will also have a unique id. Both specified by the user via configuration.

The in memory store is eventually consistent between write flush operations, however the changelog topic is always kept up to date and written even if the flush did not happen. The flush opeartion only affects in memory data.

Usage

Key & Value

Key and Values are intended to be TryFrom<Vec<u8>> and Into<Vec<u8>>. In case of the stantalone web service they will also need to be serde Serialize and Deserialize.

The store accepts 2 distinct Value types if they are different and need conversion. The value of the store needs to implement From<X> where X is the value type of the topic. See types in docs.

Configuration

The minimal configuration needed is to specify the store id and the kafka variables. An example is, either by env or .env:

STORE_ID="test"
SOURCE_TOPIC="test-input"
CHANGELOG_TOPIC="test-changelog"
KAFKA_BROKERS="localhost:9092"

Launch the store

This will launch a store and the http endpoint for getting its items. Assuming the types Key and Value defined by the user:

#[tokio::main]
async fn main() {
    streex::SingleStoreApp::<Key, Value>::new()
        .await.start()
        .await.unwrap();
}

Assuming the configuration shown before, endpoints published will be:

  • GET http://localhost:8080/stores/test/<key> to get the value for key.
  • POST http://localhost:8080/flush-store/test to flush the write queue.

Advanced configuration

Initial capacity

The indexmap is a structure that keeps its buckets contiguous and the store will use swap_remove instructions to keep it like that. The last items of the bucket array will be used to fill in back deleted items. The initial capacity of the map is 0 so it will cause numerous allocation at the beginning or when restarting from the changelog.

To avoid that, the map can also be pre allocated with an initial size with:

INITIAL_CAPACITY=5000000

Auto flush

The store uses internally a ChiralMap.

This means that the store will contain 2 copies of the data that will be swapped when a flush happens. The flush can be requested with the endpoint or configured with the autoflush variables. The values shown here are their default:

AUTO_FLUSH_COUNT=1000
AUTO_FLUSH_SECONDS=1

Reader pool

The readers of the map have a limited pool. This comes down to the nature of mixing them with async/web runtimes. The reader themselves are lock-free and they are not affected by the write operations but they are acquired from a synchronized pool.

The size of the pool determines how many concurrent reads can happen. Defaults to 1000. It can be configured as:

READER_POOL_SIZE=1000

Manual assign of partitions

The store can be configured to only listen to specific partitions of the source topic, and along with that, only manage those partitions also in the changelog. This allows to have multiple instances of the store and split them by the list of their partitions. The default for the store is to always listen to all partitions, however a list of them can be configured as:

PARTITIONS=0,1,2,3...

Replication

The store can also hold replicas of data which are read only and for redundancy and high availability purposes. In an example multi node scenario where there are partitions 1,2,3 it's possible to setup 3 nodes each of which will have these partitions as primary (node 1 > partition 1 etc) plus a copy of the other partitions so if a node goes down or is in the middle of restarting the data can still be queried from the replica of another node.

These copied partitions do not write into the changelog and don't consume from the source topic, they only read from the changelog to keep a copy and keep listening to the changelog (where the primary ones will write) to keep up to date with the newly processed data.

# which partition to keep as an extra copy in this instance
PARTITIONS_REPLICA=4,5
# when this is true this node will be a replica only, ignoring all the primary
# partitions and source topic, only being a read only copy of the changelog
USE_REPLICA_ONLY=true
# when this is true this node will wait at bootstrap for the replica to be
# filled up from the changelog. Otherwise only the primary will block on
# startup and the replicas will be asynchronously building in the background.
# Use this to true if you need consistency of replicas immediately after boot,
# often used in nodes that are replica only.
WAIT_FOR_REPLICAS=true

librdkafka

The store uses librdkafka internally from rdkafka. There are 3 instances of kafka clients: consumer, producer and changelog. Each of them can be configured additionally with they key/value properties of librdkafka.

The instances are:

  • consumer: used to consume the input topic.
  • producer: used to write into the changelog.
  • changelog: used to consume the changelog at bootstrap.

Example configuration:

KAFKA_CONSUMER_PROPERTIES=receive.message.max.bytes=10000
KAFKA_PRODUCER_PROPERTIES=client.id=myclient,message.max.bytes=100000
KAFKA_CHANGELOG_PROPERTIES=max.in.flight=1000
Commit count: 0

cargo fmt