Crates.io | fluvio |
lib.rs | fluvio |
version | 0.23.4 |
source | src |
created_at | 2020-10-02 21:56:19.75164 |
updated_at | 2024-10-31 23:10:23.653653 |
description | The official Fluvio driver for Rust |
homepage | |
repository | https://github.com/infinyon/fluvio |
max_upload_size | |
id | 295563 |
size | 317,444 |
Fluvio is a programmable data streaming platform written in Rust. With Fluvio you can create performant real time applications that scale.
Read more about Fluvio in the official website.
Let's write a very simple solution with Fluvio, in the following demostration we will create a topic using the Fluvio CLI and then we wisll produce some records on this topic. Finally these records will be consumed from the topic and printed to the stdout.
Install Fluvio CLI if you havent already
Create a new topic using the CLI
fluvio topic create "echo-test"
fluvio
, futures
and async-std
cargo add fluvio
cargo add futures
cargo add async-std --features attributes
src/main.rs
use std::time::Duration;
use fluvio::{Offset, RecordKey};
use futures::StreamExt;
const TOPIC: &str = "echo-test";
const MAX_RECORDS: u8 = 10;
#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let producer = fluvio::producer(TOPIC).await?;
let consumer = fluvio::consumer(TOPIC, 0).await?;
let mut consumed_records: u8 = 0;
for i in 0..10 {
producer.send(RecordKey::NULL, format!("Hello from Fluvio {}!", i)).await?;
println!("[PRODUCER] sent record {}", i);
async_std::task::sleep(Duration::from_secs(1)).await;
}
// Fluvio batches records by default, call flush() when done producing
// to ensure all records are sent
producer.flush().await?;
let mut stream = consumer.stream(Offset::beginning()).await?;
while let Some(Ok(record)) = stream.next().await {
let value_str = record.get_value().as_utf8_lossy_string();
println!("[CONSUMER] Got record: {}", value_str);
consumed_records += 1;
if consumed_records >= MAX_RECORDS {
break;
}
}
Ok(())
}
cargo run
and expect the following output[PRODUCER] sent record 0
[PRODUCER] sent record 1
[PRODUCER] sent record 2
[PRODUCER] sent record 3
[PRODUCER] sent record 4
[PRODUCER] sent record 5
[PRODUCER] sent record 6
[PRODUCER] sent record 7
[PRODUCER] sent record 8
[PRODUCER] sent record 9
[CONSUMER] Got record: Hello, Fluvio 0!
[CONSUMER] Got record: Hello, Fluvio 1!
[CONSUMER] Got record: Hello, Fluvio 2!
[CONSUMER] Got record: Hello, Fluvio 3!
[CONSUMER] Got record: Hello, Fluvio 4!
[CONSUMER] Got record: Hello, Fluvio 5!
[CONSUMER] Got record: Hello, Fluvio 6!
[CONSUMER] Got record: Hello, Fluvio 7!
[CONSUMER] Got record: Hello, Fluvio 8!
[CONSUMER] Got record: Hello, Fluvio 9!
fluvio topic delete echo-test
topic "echo-test" deleted
Read on tutorials to get the most from Fluvio and InfinyOn Cloud to scale your streaming solution.
You can use Fluvio to send or receive records from different sources using Connectors.
If you want to filter or transform records on the fly read more about SmartModules.