/*
* This file is part of Futures ZMQ.
*
* Copyright © 2018 Riley Trautman
*
* Futures ZMQ is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Futures ZMQ is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Futures ZMQ. If not, see .
*/
extern crate env_logger;
extern crate futures;
extern crate futures_zmq;
extern crate log;
extern crate rand;
extern crate tokio;
extern crate zmq;
use std::{env, fmt, sync::Arc, thread, time::Duration};
use futures::{sync::mpsc, Future, Sink, Stream};
use futures_zmq::{prelude::*, Multipart, Pub, Req, Router, Sub};
use rand::RngCore;
const NUM_CLIENTS: usize = 1000;
const NUM_WORKERS: usize = 5;
const BATCH_SIZE: usize = 10;
/* ----------------------------------Error----------------------------------- */
#[derive(Debug)]
enum Error {
Zmq(zmq::Error),
TokioZmq(futures_zmq::Error),
WorkerSend,
WorkerRecv,
NotEnoughMessages,
TooManyMessages,
MsgNotEmpty,
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::Zmq(ref e) => write!(f, "Error in ZeroMQ: {}", e),
Error::TokioZmq(ref e) => write!(f, "Error in Futures ZMQ: {}", e),
Error::WorkerSend => write!(f, "Error sending to worker"),
Error::WorkerRecv => write!(f, "Error receiving from worker"),
Error::NotEnoughMessages => write!(f, "Not enough messages"),
Error::TooManyMessages => write!(f, "Too many messages"),
Error::MsgNotEmpty => write!(f, "Message not empty"),
}
}
}
impl From for Error {
fn from(e: futures_zmq::Error) -> Self {
Error::TokioZmq(e)
}
}
impl From for Error {
fn from(e: zmq::Error) -> Self {
Error::Zmq(e)
}
}
/* --------------------------------Envelope---------------------------------- */
struct Envelope {
addr: zmq::Message,
empty: zmq::Message,
request: zmq::Message,
}
impl Envelope {
fn addr(&self) -> &zmq::Message {
&self.addr
}
fn request(&self) -> &zmq::Message {
&self.request
}
fn set_request(&mut self, msg: zmq::Message) {
self.request = msg;
}
fn from_multipart(mut m: Multipart) -> Result {
let addr = m.pop_front().ok_or(Error::NotEnoughMessages)?;
let empty = m.pop_front().ok_or(Error::NotEnoughMessages)?;
if !empty.is_empty() {
return Err(Error::MsgNotEmpty);
}
let request = m.pop_front().ok_or(Error::NotEnoughMessages)?;
if !m.is_empty() {
return Err(Error::TooManyMessages);
}
Ok(Envelope {
addr,
empty,
request,
})
}
}
impl From for Multipart {
fn from(e: Envelope) -> Self {
let mut multipart = Multipart::new();
multipart.push_back(e.addr);
multipart.push_back(e.empty);
multipart.push_back(e.request);
multipart
}
}
/* -----------------------------------Stop----------------------------------- */
struct Stop(&'static str, usize);
impl ControlHandler for Stop {
fn should_stop(&mut self, _: Multipart) -> bool {
println!("Received stop signal! {}/{}", self.0, self.1);
true
}
}
/* ----------------------------------client---------------------------------- */
fn client_task(client_num: usize) -> usize {
let context = Arc::new(zmq::Context::new());
let client_fut = Req::builder(context)
.identity(format!("c{}", client_num).as_bytes())
.connect("tcp://localhost:5672")
.build();
let msg = zmq::Message::from("HELLO");
let fut = client_fut.and_then(move |client| {
client
.send(msg.into())
.from_err()
.and_then(|client| client.recv())
.and_then(move |(multipart, _)| {
if let Some(msg) = multipart.get(0) {
println!("Client {}: {}", client_num, msg.as_str().unwrap());
}
Ok(())
})
});
tokio::run(fut.map(|_| ()).or_else(|e| {
println!("Error in client: {}, {:?}", e, e);
Ok(())
}));
client_num
}
/* ----------------------------------worker---------------------------------- */
fn worker_task(worker_num: usize) -> usize {
let context = Arc::new(zmq::Context::new());
let control_fut = Sub::builder(Arc::clone(&context))
.connect("tcp://localhost:5674")
.filter(b"")
.build();
let worker_fut = Req::builder(context)
.identity(format!("w{}", worker_num).as_bytes())
.connect("tcp://localhost:5673")
.build();
let msg = zmq::Message::from("READY");
let fut = worker_fut
.join(control_fut)
.from_err()
.and_then(move |(worker, control)| {
worker
.send(msg.into())
.map_err(Error::from)
.and_then(move |worker| {
let (sink, stream) = worker.sink_stream(25).split();
stream
.controlled(control.stream(), Stop("worker", worker_num))
.map_err(Error::from)
.and_then(move |multipart| {
let mut envelope: Envelope = Envelope::from_multipart(multipart)?;
println!(
"Worker {}: {} from {}",
worker_num,
envelope.request().as_str().unwrap(),
envelope.addr().as_str().unwrap()
);
let msg = zmq::Message::from("OK");
envelope.set_request(msg);
Ok(envelope.into())
})
.forward(sink)
})
});
tokio::run(fut.map(|_| ()).or_else(|e| {
println!("Error in worker: {}, {:?}", e, e);
Ok(())
}));
println!("Worker {} is done", worker_num);
worker_num
}
/* ----------------------------------broker---------------------------------- */
fn broker_task() {
let context = Arc::new(zmq::Context::new());
let frontend_fut = Router::builder(Arc::clone(&context))
.bind("tcp://*:5672")
.build();
let control0_fut = Sub::builder(Arc::clone(&context))
.connect("tcp://localhost:5674")
.filter(b"")
.build();
let control1_fut = Sub::builder(Arc::clone(&context))
.connect("tcp://localhost:5674")
.filter(b"")
.build();
let backend_fut = Router::builder(context).bind("tcp://*:5673").build();
let runner = frontend_fut
.join(backend_fut)
.join(control0_fut.join(control1_fut))
.from_err()
.and_then(|((frontend, backend), (control0, control1))| {
let (worker_send, worker_recv) = mpsc::channel::(10);
let (frontend_sink, frontend_stream) = frontend.sink_stream(25).split();
let (backend_sink, backend_stream) = backend.sink_stream(25).split();
let back2front = backend_stream
.controlled(control0.stream(), Stop("broker", 0))
.map_err(Error::from)
.and_then(|mut multipart| {
let worker_id = multipart.pop_front().ok_or(Error::NotEnoughMessages)?;
Ok((multipart, worker_id))
})
.and_then(move |(multipart, worker_id)| {
worker_send
.clone()
.send(worker_id)
.map(|_| multipart)
.map_err(|_| Error::WorkerSend)
})
.filter_map(|mut multipart| {
let empty = multipart.pop_front().unwrap();
assert!(empty.is_empty());
let client_id = multipart.pop_front().unwrap();
if &*client_id == b"READY" {
None
} else {
Some((multipart, client_id))
}
})
.and_then(|(mut multipart, client_id)| {
let empty = multipart.pop_front().ok_or(Error::NotEnoughMessages)?;
assert!(empty.is_empty());
let reply = multipart.pop_front().ok_or(Error::NotEnoughMessages)?;
let mut response = Multipart::new();
response.push_back(client_id);
response.push_back(empty);
response.push_back(reply);
Ok(response)
})
.forward(frontend_sink);
let front2back = frontend_stream
.controlled(control1.stream(), Stop("broker", 1))
.map_err(Error::from)
.zip(worker_recv.map_err(|_| Error::WorkerRecv))
.and_then(|(mut multipart, worker_id)| {
let client_id = multipart.pop_front().ok_or(Error::NotEnoughMessages)?;
let empty = multipart.pop_front().ok_or(Error::NotEnoughMessages)?;
assert!(empty.is_empty());
let request = multipart.pop_front().ok_or(Error::NotEnoughMessages)?;
let mut response = Multipart::new();
response.push_back(worker_id);
response.push_back(empty);
response.push_back(client_id);
response.push_back(zmq::Message::new());
response.push_back(request);
Ok(response)
})
.forward(backend_sink);
front2back.join(back2front)
});
tokio::run(runner.map(|_| ()).or_else(|e| {
println!("Error in broker: {}, {:?}", e, e);
Ok(())
}));
println!("Broker is done");
}
/* --------------------------------use_broker-------------------------------- */
#[derive(Clone, Debug, Eq, PartialEq)]
enum ProcessKind {
Broker,
Worker,
Client,
All,
}
/* -----------------------------------main----------------------------------- */
fn main() {
env_logger::init();
let use_broker = match env::var("PROCESS_KIND")
.unwrap_or_else(|_| "all".to_owned())
.as_str()
{
"broker" => ProcessKind::Broker,
"worker" => ProcessKind::Worker,
"client" => ProcessKind::Client,
_ => ProcessKind::All,
};
let mut broker_thread = None;
match use_broker {
ProcessKind::Broker | ProcessKind::All => {
// Spawn threads
broker_thread = Some(thread::spawn(broker_task));
}
_ => (),
};
let workers = match use_broker {
ProcessKind::Worker => vec![thread::spawn(move || {
worker_task(rand::thread_rng().next_u32() as usize)
})],
ProcessKind::All => (0..NUM_WORKERS)
.map(|worker_num| thread::spawn(move || worker_task(worker_num)))
.collect::>(),
_ => Vec::new(),
};
match use_broker {
ProcessKind::Client | ProcessKind::All => {
let clients = (0..NUM_CLIENTS)
.map(|client_num| {
if client_num % BATCH_SIZE == 0 {
println!("Sleeping to avoid too many open files");
thread::sleep(Duration::from_millis(50));
}
thread::spawn(move || client_task(client_num))
})
.collect::>();
// Wait for clients to finish
for client in clients {
client.join().unwrap();
}
// Set up control socket
let context = Arc::new(zmq::Context::new());
let control_fut = Pub::builder(context).bind("tcp://*:5674").build();
thread::sleep(Duration::from_secs(1));
// Signal end when all clients have joined
tokio::run(
control_fut
.and_then(|control| control.send(zmq::Message::new().into()))
.map(|_| ())
.or_else(|e| {
println!("Error in main loop {}, {:?}", e, e);
Ok(())
}),
);
}
_ => (),
};
for worker in workers {
let worker_num = worker.join().unwrap();
println!("Joined Worker {}", worker_num);
}
if let Some(broker_thread) = broker_thread {
broker_thread.join().unwrap();
println!("Joined Broker");
}
}