extern crate futures; extern crate comms; use std::collections::VecDeque; use futures::{future, sink, stream, Future, Sink, Stream, Poll, Async, AsyncSink}; use futures::stream::{SplitSink, SplitStream}; use futures::sync::mpsc; use comms::*; enum Msg { Alpha, Beta(bool), Gamma(usize), } type PeerId = u64; type ActorId = u64; type MsgPair = (PeerId, Msg); type FramedTcpSink = sink::SinkFromErr>, IoErrorString>; type FramedTcpStream = stream::FromErr>, IoErrorString>; pub struct LoadBalancePeersForActors { // Currently present peers. peers: Room, FramedTcpStream>, // For adding new peers and assigning them IDs 1..N. peer_id_counter: PeerId, new_peer_rx: Fuse>>, // For adding new actors. new_actor_rx: Fuse)>>, // For adding new messages to send. outgoing_msg_rx: Fuse>, outgoing_msg_queue: VecDeque, // For forwarding messages from peers. actor_peer_counts: HashMap, actor_txs: HashMap>, peer_actor_map: HashMap, } impl LoadBalancePeersForActors { pub fn new(new_peer_rx: mpsc::Receiver, new_actor_rx: mpsc::Receiver<(ActorId, mpsc::Sender)>, outgoing_msg_rx: mpsc::Receiver) -> LoadBalancePeersForActors { LoadBalancePeersForActors { peers: Room::default(), peer_id_counter: 0, new_peer_rx: new_peer_rx.fuse(), new_actor_rx: new_actor_rx.fuse(), outgoing_msg_rx: outgoing_msg_rx.fuse(), outgoing_msg_queue: VecDeque::new(), actor_peer_counts: HashMap::new(), actor_txs: HashMap::new(), peer_actor_map: HashMap::new(), } } fn new_peer_id(&mut self) -> PeerId { let peer_id = self.peer_id_counter + 1; self.peer_id_counter += 1; peer_id } } impl Future for Spectators { type Item = (); type Error = (); fn poll(&mut self) -> Poll<(), ()> { println!("LoadBalancePeersForActors wakeup {:?}", self.peers.ready_ids()); while Ok(Async::Ready(Some(peer))) = self.new_peer_rx.poll() { let (tx, rx) = peer.split(); let peer = Client::new(self.new_peer_id(), ClientTimeout::None, tx, rx); self.peers.insert(peer); // @TODO: Assign to an actor. } while Ok(Async::Ready(Some((actor_id, actor_tx)))) = self.new_actor_rx.poll() { self.actor_peer_counts.insert(actor_id, 0); self.actor_txs.insert(actor_id, actor_tx); } while Ok(Async::Ready(Some(msg))) = self.outgoing_msg_rx.poll() { self.outgoing_msg_queue.push_back(msg); } // This is bottlenecked by the batched nature of `Room::start_send`. Perhaps that // should be a separate type, e.g., `RoomTransmit`. // It's pretty clear we'd want a form of Sink that takes `(I, T::SinkItem)` pairs, // but *also* one that takes `HashMap`. // It might be easiest to have `Room` and `UnmanagedRoom`? `EasyRoom` and `Room`? loop { if let Some((id, msg)) = self.outgoing_msg_queue.pop_front() { let h = HashMap::new(); h.insert(id, msg.clone()); match self.peers.start_send(h) { Ok(AsyncSink::Ready) => {} Ok(AsyncSink::NotReady(h)) => { let msg = h.remove(id); self.outgoing_msg_queue.push_front(msg); } Err(_) => break, } } match self.peers.poll_complete() { Ok(Async::Ready(())) => continue, Ok(Async::NotReady) | Err(_) => break, } } loop { if let Ok(Async::Ready(Some(msgs))) = self.peers.poll() { for (id, msg) in msgs { let actor_id = match self.peer_actor_map.get(&id) { Some(actor_id) => actor_id, None => { println!("Peer {} had not Actor set.", peer_id); continue; } }; match self.actor_txs.get_mut(actor_id) { Some(actor_tx) => { match actor_tx.start_send((id, msg)) { Ok(AsyncSink::Ready) => {} Ok(AsyncSink::NotReady(h)) => { let msg = h.remove(id); self.outgoing_msg_queue.push_front(msg); } // Drop messages on error? Err(_) => break, } } } } } } Ok(Async::NotReady) } }