tmq

Crates.iotmq
lib.rstmq
version0.5.0
sourcesrc
created_at2018-10-12 01:31:37.932618
updated_at2024-10-01 16:08:32.700998
descriptionZeroMQ bindings for Tokio
homepage
repositoryhttps://github.com/cetra3/tmq
max_upload_size
id89379
size127,893
(cetra3)

documentation

README

TMQ - Rust ZeroMQ bindings for Tokio

This crate bridges Tokio and ZeroMQ to allow for ZeroMQ in the async world.

Crates.io Docs.rs MIT licensed

Changelog

0.5.0 - Extra setters/getters & new rust edition

  • Added setters for curve key encryption #45

  • Remove Redundant Imports #43

  • Add more socket options & bump edition #46

0.4.0 - Bump Deps

Bump Deps & Pin future in RequestSender #39

0.3.1 - Iter Mut for Multipart

Adds an iter_mut() method to Multipart

0.3.0 - Tokio 1.0 Support

0.3.0 adds support for tokio 1.0 thanks to YushiOMOTE!

Currently Implemented Sockets

  • Request/Reply
  • Publish/Subscribe
  • Dealer/Router
  • Push/Pull

Examples

Please see the examples directory for a full set of examples. They are paired together based upon the socket types.

Usage

Usage is made to be simple, but opinionated. See the examples for working code, but in general, you need to import tokio and tmq::*

Publish

To publish messages to all connected subscribers, you can use the publish function:

use tmq::{publish, Context, Result};

use futures::SinkExt;
use log::info;
use std::env;
use std::time::Duration;
use tokio::time::delay_for;

#[tokio::main]
async fn main() -> Result<()> {

    let mut socket = publish(&Context::new()).bind("tcp://127.0.0.1:7899")?;

    let mut i = 0;

    loop {
        i += 1;

        socket
            .send(vec!["topic", &format!("Broadcast #{}", i)])
            .await?;

        delay_for(Duration::from_secs(1)).await;
    }
}

Subscribe

a subscribe socket is a Stream that reads in values from a publish socket. You specify the filter prefix using the subscribe method, using "" for all messages.

use futures::StreamExt;

use tmq::{subscribe, Context, Result};

use std::env;

#[tokio::main]
async fn main() -> Result<()> {

    let mut socket = subscribe(&Context::new())
        .connect("tcp://127.0.0.1:7899")?
        .subscribe(b"topic")?;

    while let Some(msg) = socket.next().await {
        println!(
            "Subscribe: {:?}",
            msg?.iter()
                .map(|item| item.as_str().unwrap_or("invalid text"))
                .collect::<Vec<&str>>()
        );
    }
    Ok(())
}
Commit count: 112

cargo fmt