fluvio

Crates.iofluvio
lib.rsfluvio
version0.24.0
sourcesrc
created_at2020-10-02 21:56:19.75164
updated_at2024-11-15 06:21:20.040113
descriptionThe official Fluvio driver for Rust
homepage
repositoryhttps://github.com/infinyon/fluvio
max_upload_size
id295563
size318,889
fluvio-publisher (github:infinyon:fluvio-publisher)

documentation

README

Fluvio

The programmable data streaming platform

CI Status CD Status fluvio Crates.io version Fluvio client API documentation Fluvio dependency status Fluvio Discord

What's Fluvio?

Fluvio is a programmable data streaming platform written in Rust. With Fluvio you can create performant real time applications that scale.

Read more about Fluvio in the official website.

Getting Started

Let's write a very simple solution with Fluvio, in the following demostration we will create a topic using the Fluvio CLI and then we wisll produce some records on this topic. Finally these records will be consumed from the topic and printed to the stdout.

  1. Install Fluvio CLI if you havent already

  2. Create a new topic using the CLI

fluvio topic create "echo-test"
  1. Create a new cargo project and install fluvio, futures and async-std
cargo add fluvio
cargo add futures
cargo add async-std --features attributes
  1. Copy and paste the following snippet into your src/main.rs
use std::time::Duration;

use fluvio::{Offset, RecordKey};
use futures::StreamExt;

const TOPIC: &str = "echo-test";
const MAX_RECORDS: u8 = 10;

#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let producer = fluvio::producer(TOPIC).await?;
    let consumer = fluvio::consumer(TOPIC, 0).await?;
    let mut consumed_records: u8 = 0;

    for i in 0..10 {
        producer.send(RecordKey::NULL, format!("Hello from Fluvio {}!", i)).await?;
        println!("[PRODUCER] sent record {}", i);
        async_std::task::sleep(Duration::from_secs(1)).await;
    }

    // Fluvio batches records by default, call flush() when done producing
    // to ensure all records are sent
    producer.flush().await?;

    let mut stream = consumer.stream(Offset::beginning()).await?;

    while let Some(Ok(record)) = stream.next().await {
        let value_str = record.get_value().as_utf8_lossy_string();

        println!("[CONSUMER] Got record: {}", value_str);
        consumed_records += 1;

        if consumed_records >= MAX_RECORDS {
            break;
        }
    }

    Ok(())
}
  1. Run cargo run and expect the following output
[PRODUCER] sent record 0
[PRODUCER] sent record 1
[PRODUCER] sent record 2
[PRODUCER] sent record 3
[PRODUCER] sent record 4
[PRODUCER] sent record 5
[PRODUCER] sent record 6
[PRODUCER] sent record 7
[PRODUCER] sent record 8
[PRODUCER] sent record 9
[CONSUMER] Got record: Hello, Fluvio 0!
[CONSUMER] Got record: Hello, Fluvio 1!
[CONSUMER] Got record: Hello, Fluvio 2!
[CONSUMER] Got record: Hello, Fluvio 3!
[CONSUMER] Got record: Hello, Fluvio 4!
[CONSUMER] Got record: Hello, Fluvio 5!
[CONSUMER] Got record: Hello, Fluvio 6!
[CONSUMER] Got record: Hello, Fluvio 7!
[CONSUMER] Got record: Hello, Fluvio 8!
[CONSUMER] Got record: Hello, Fluvio 9!
  1. Clean Up
fluvio topic delete echo-test
topic "echo-test" deleted

Learn More

  • Read on tutorials to get the most from Fluvio and InfinyOn Cloud to scale your streaming solution.

  • You can use Fluvio to send or receive records from different sources using Connectors.

  • If you want to filter or transform records on the fly read more about SmartModules.

Commit count: 2376

cargo fmt