/*
* This file is part of Futures ZMQ.
*
* Copyright © 2018 Riley Trautman
*
* Futures ZMQ is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Futures ZMQ is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Futures ZMQ. If not, see .
*/
extern crate futures;
extern crate futures_zmq;
extern crate tokio;
extern crate tokio_executor;
extern crate zmq;
use std::sync::Arc;
use futures::{Future, Stream};
use futures_zmq::{prelude::*, Multipart, Pub, Pull, Sub};
pub struct Stop;
impl ControlHandler for Stop {
fn should_stop(&mut self, _: Multipart) -> bool {
println!("Got stop signal");
true
}
}
fn main() {
let ctx = Arc::new(zmq::Context::new());
let cmd_fut = Sub::builder(Arc::clone(&ctx))
.connect("tcp://localhost:5559")
.filter(b"")
.build();
let conn_fut = Pull::builder(Arc::clone(&ctx)).bind("tcp://*:5558").build();
let send_cmd_fut = Pub::builder(ctx).bind("tcp://*:5559").build();
let process = cmd_fut
.join(conn_fut)
.join(send_cmd_fut)
.and_then(|((cmd, conn), send_cmd)| {
conn.stream()
.controlled(cmd.stream(), Stop)
.filter_map(|multipart| {
multipart
.into_iter()
.filter_map(|msg| {
let stop = if let Some(s_msg) = msg.as_str() {
println!("msg: '{}'", s_msg);
s_msg == "STOP"
} else {
false
};
if stop {
Some(msg)
} else {
None
}
})
.collect::>()
.pop()
.map(Multipart::from)
})
.forward(send_cmd.sink(25))
});
tokio::run(process.map(|_| ()).or_else(|e| {
println!("Error: {:?}", e);
Ok(())
}));
}