venta

Crates.ioventa
lib.rsventa
version0.9.0
sourcesrc
created_at2020-11-09 09:01:19.880984
updated_at2022-03-28 13:47:34.55872
descriptionReliable producer interface for Pulsar
homepagehttps://github.com/vmalloc/venta
repositoryhttps://github.com/vmalloc/venta
max_upload_size
id310223
size59,403
Rotem Yaari (vmalloc)

documentation

https://docs.rs/venta

README

Overview

Venta is a Rust library implementing a robust, ergonomic and non-blocking producer for Apache Pulsar. It builds upon pulsar-rs, but adds some missing pieces:

  1. Venta publishes messages in the background, meaning that enqueuing a message happens immediately, given enough queue space. This is useful for applications that do not want to block on the actual publishing operation

  2. Venta adds retries and timeouts on top of pulsar-rs, allowing it to recover from errors which cause pulsar-rs to get stuck or return with errors.

  3. Message event times are automatically set to the time of adding them to the queue, allowing consumers to reason about original message times

  4. Venta is more ergonomic for the common use cases (e.g. publishing json messages, adding properties etc.)

Usage

For a simple use case, in which you would like to configure a producer with a topic name and a producer name, you can use spawn_simple:

    async fn f() -> anyhow::Result<()> {
        let producer = venta::BackgroundProducer::spawn_simple("pulsar://127.0.0.1", "topic_name", Some("producer_name".into())).await?;
        //...
        Ok(())
    }

For cases in which you would like more fine-grained control over how the producer is built, you can use the spawn constructor, receiving a closure for creating the producer:

    async fn f() -> anyhow::Result<()> {
        let producer = venta::BackgroundProducer::spawn(|| async {
            pulsar::Pulsar::builder("pulsar://127.0.0.1", pulsar::TokioExecutor)
              .build()
              .await?
              .producer()
              .with_topic("topic").build()
              .await
        }).await?;
        //...
        Ok(())
    }

See the documentation for pulsar-rs for more information on how to contstruct the underlying client and producer.

Once a producer is initialized, enqueueing json messages is relatively simple

    use serde_json::json;
    
    async fn f() -> anyhow::Result<()> {
        let producer = venta::BackgroundProducer::spawn_simple("pulsar://127.0.0.1", "topic_name", Some("producer_name".into())).await?;

        producer.produce().json(&json!({
            "message": "here"
        })).enqueue()
    }

Venta producers are Clone, meaning they can be passed around to various parts of your code without any issues.

Commit count: 44

cargo fmt