Crates.io | streex |
lib.rs | streex |
version | 0.1.10 |
created_at | 2025-06-29 14:31:59.241877+00 |
updated_at | 2025-09-19 14:28:33.894126+00 |
description | Kafka store runner |
homepage | https://codeberg.org/raffaeleragni/streex |
repository | https://codeberg.org/raffaeleragni/streex |
max_upload_size | |
id | 1730781 |
size | 189,692 |
streex
A kafka store service.
Reads a topic and hosts in memory all its items, each item as last updated, and serves its items for being fetched by id.
This works similarly to how kafka stores work, by having a consumer group on the input topic and ingesting the elements in memory, while also being capable of recovering from restarts by writing also into a changelog topic.
The changelog topic is a uniquely reserved topic for this store, and the store will also have a unique id. Both specified by the user via configuration.
The in memory store is eventually consistent between write flush operations, however the changelog topic is always kept up to date and written even if the flush did not happen. The flush opeartion only affects in memory data.
Key and Values are intended to be TryFrom<Vec<u8>>
and Into<Vec<u8>>
.
In case of the stantalone web service they will also need to be serde
Serialize
and Deserialize
.
The store accepts 2 distinct Value types if they are different and need
conversion. The value of the store needs to implement From<X>
where X is
the value type of the topic. See types in docs.
The minimal configuration needed is to specify the store id and the kafka variables. An example is, either by env or .env:
STORE_ID="test"
SOURCE_TOPIC="test-input"
CHANGELOG_TOPIC="test-changelog"
KAFKA_BROKERS="localhost:9092"
This will launch a store and the http endpoint for getting its items.
Assuming the types Key
and Value
defined by the user:
#[tokio::main]
async fn main() {
streex::SingleStoreApp::<Key, Value>::new()
.await.start()
.await.unwrap();
}
Assuming the configuration shown before, endpoints published will be:
GET
http://localhost:8080/stores/test/<key>
to get the value for key.POST
http://localhost:8080/flush-store/test
to flush the write queue.The indexmap is a structure that keeps its buckets contiguous and the store will use swap_remove instructions to keep it like that. The last items of the bucket array will be used to fill in back deleted items. The initial capacity of the map is 0 so it will cause numerous allocation at the beginning or when restarting from the changelog.
To avoid that, the map can also be pre allocated with an initial size with:
INITIAL_CAPACITY=5000000
The store uses internally a ChiralMap.
This means that the store will contain 2 copies of the data that will be swapped when a flush happens. The flush can be requested with the endpoint or configured with the autoflush variables. The values shown here are their default:
AUTO_FLUSH_COUNT=1000
AUTO_FLUSH_SECONDS=1
The readers of the map have a limited pool. This comes down to the nature of mixing them with async/web runtimes. The reader themselves are lock-free and they are not affected by the write operations but they are acquired from a synchronized pool.
The size of the pool determines how many concurrent reads can happen. Defaults to 1000. It can be configured as:
READER_POOL_SIZE=1000
The store can be configured to only listen to specific partitions of the source topic, and along with that, only manage those partitions also in the changelog. This allows to have multiple instances of the store and split them by the list of their partitions. The default for the store is to always listen to all partitions, however a list of them can be configured as:
PARTITIONS=0,1,2,3...
The store can also hold replicas of data which are read only and for redundancy and high availability purposes. In an example multi node scenario where there are partitions 1,2,3 it's possible to setup 3 nodes each of which will have these partitions as primary (node 1 > partition 1 etc) plus a copy of the other partitions so if a node goes down or is in the middle of restarting the data can still be queried from the replica of another node.
These copied partitions do not write into the changelog and don't consume from the source topic, they only read from the changelog to keep a copy and keep listening to the changelog (where the primary ones will write) to keep up to date with the newly processed data.
# which partition to keep as an extra copy in this instance
PARTITIONS_REPLICA=4,5
# when this is true this node will be a replica only, ignoring all the primary
# partitions and source topic, only being a read only copy of the changelog
USE_REPLICA_ONLY=true
# when this is true this node will wait at bootstrap for the replica to be
# filled up from the changelog. Otherwise only the primary will block on
# startup and the replicas will be asynchronously building in the background.
# Use this to true if you need consistency of replicas immediately after boot,
# often used in nodes that are replica only.
WAIT_FOR_REPLICAS=true
The store uses librdkafka internally from rdkafka.
There are 3 instances of kafka clients: consumer
, producer
and changelog
.
Each of them can be configured additionally with they key/value
properties
of librdkafka.
The instances are:
consumer
: used to consume the input topic.producer
: used to write into the changelog.changelog
: used to consume the changelog at bootstrap.Example configuration:
KAFKA_CONSUMER_PROPERTIES=receive.message.max.bytes=10000
KAFKA_PRODUCER_PROPERTIES=client.id=myclient,message.max.bytes=100000
KAFKA_CHANGELOG_PROPERTIES=max.in.flight=1000