/*
* This file is part of Futures ZMQ.
*
* Copyright © 2018 Riley Trautman
*
* Futures ZMQ is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Futures ZMQ is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Futures ZMQ. If not, see .
*/
extern crate futures;
extern crate futures_zmq;
extern crate tokio;
extern crate tokio_timer;
extern crate zmq;
use std::{
io,
sync::Arc,
time::{Duration, Instant},
};
use futures::{stream::iter_ok, Future, Stream};
use futures_zmq::prelude::*;
use futures_zmq::{Error as ZmqFutError, Push};
use tokio_timer::{Error as TimerError, Interval};
#[derive(Debug)]
enum Error {
ZmqFut(ZmqFutError),
Zmq(zmq::Error),
Io(io::Error),
Timer(TimerError),
}
impl From for Error {
fn from(e: ZmqFutError) -> Self {
Error::ZmqFut(e)
}
}
impl From for Error {
fn from(e: zmq::Error) -> Self {
Error::Zmq(e)
}
}
impl From for Error {
fn from(e: io::Error) -> Self {
Error::Io(e)
}
}
impl From for Error {
fn from(e: TimerError) -> Self {
Error::Timer(e)
}
}
fn main() {
let ctx = Arc::new(zmq::Context::new());
let workers_fut = Push::builder(Arc::clone(&ctx)).bind("tcp://*:5557").build();
let sink_fut = Push::builder(Arc::clone(&ctx))
.connect("tcp://localhost:5558")
.build();
let sink2_fut = Push::builder(ctx).connect("tcp://localhost:5558").build();
let start_msg = zmq::Message::from("START").into();
let stop_msg = zmq::Message::from("STOP").into();
let interval = Interval::new(Instant::now(), Duration::from_millis(200));
let process = workers_fut
.join(sink_fut)
.join(sink2_fut)
.from_err()
.and_then(move |((workers, sink), sink2)| {
sink.send(start_msg).map_err(Error::from).and_then(|_| {
iter_ok(0..10)
.zip(interval)
.map_err(Error::from)
.and_then(|(i, _)| {
println!("Sending: {}", i);
let msg = zmq::Message::from(&format!("{}", i));
Ok(msg.into())
})
.forward(workers.sink(25))
.and_then(move |_| sink2.send(stop_msg).map_err(Error::from))
})
});
tokio::run(process.map(|_| ()).or_else(|e| {
println!("Error: {:?}", e);
Ok(())
}));
}