/*
* This file is part of Futures ZMQ.
*
* Copyright © 2018 Riley Trautman
*
* Futures ZMQ is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Futures ZMQ is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Futures ZMQ. If not, see .
*/
extern crate env_logger;
extern crate futures;
extern crate futures_zmq;
extern crate log;
extern crate tokio;
extern crate zmq;
use std::{sync::Arc, thread, time::Duration};
use futures::{stream::iter_ok, Future, Sink, Stream};
use futures_zmq::{prelude::*, Error, Multipart, Pub, Rep, Req, Sub};
// On my quad-core i7, if I run with too many threads, the context switching takes too long and
// some messages get dropped. 2 subscribers can properly retrieve 1 million messages each, though.
//
// For the nice results, lower the messages and increas the subscribers.
const SUBSCRIBERS: usize = 10;
const MESSAGES: usize = 1_000;
struct Stop;
impl EndHandler for Stop {
fn should_stop(&mut self, item: &Multipart) -> bool {
if let Some(msg) = item.get(0) {
if let Some(msg) = msg.as_str() {
if msg == "END" {
return true;
}
}
}
false
}
}
fn publisher_thread() {
let ctx = Arc::new(zmq::Context::new());
let publisher_fut = Pub::builder(Arc::clone(&ctx)).bind("tcp://*:5561").build();
let syncservice_fut = Rep::builder(ctx).bind("tcp://*:5562").build();
println!("Waiting for subscribers");
let runner = publisher_fut
.join(syncservice_fut)
.and_then(|(publisher, syncservice)| {
let (sync_sink, sync_stream) = syncservice.sink_stream(25).split();
iter_ok(0..SUBSCRIBERS)
.zip(sync_stream)
.map(|(_, _)| zmq::Message::from("").into())
.forward(sync_sink)
.and_then(move |_| {
println!("Broadcasting message");
iter_ok(0..MESSAGES)
.map(|_| zmq::Message::from("Rhubarb").into())
.forward(publisher.sink(25))
})
.and_then(|(_stream, sink)| {
let msg = zmq::Message::from("END");
sink.send(msg.into())
})
});
tokio::run(runner.map(|_| ()).or_else(|e| {
println!("Error in publisher: {:?}", e);
Ok(())
}));
}
fn subscriber_thread() {
let ctx = Arc::new(zmq::Context::new());
let subscriber_fut = Sub::builder(Arc::clone(&ctx))
.connect("tcp://localhost:5561")
.filter(b"")
.build();
let syncclient_fut = Req::builder(ctx).connect("tcp://localhost:5562").build();
let msg = zmq::Message::from("");
let runner = subscriber_fut
.join(syncclient_fut)
.and_then(|(subscriber, syncclient)| {
syncclient
.send(msg.into())
.and_then(|syncclient| syncclient.recv())
.and_then(move |_| {
subscriber
.stream()
.with_end_handler(Stop)
.fold(0, |counter, _| Ok(counter + 1) as Result)
.and_then(|total| {
println!("Received {} updates", total);
Ok(())
})
})
});
tokio::run(runner.map(|_| ()).or_else(|e| {
println!("Error in subscriber: {:?}", e);
Ok(())
}));
}
fn main() {
let mut threads = Vec::new();
threads.push(thread::spawn(publisher_thread));
for _ in 0..SUBSCRIBERS {
thread::sleep(Duration::from_millis(400));
threads.push(thread::spawn(subscriber_thread));
}
for thread in threads {
thread.join().unwrap();
}
}