use mio::channel::{self, Sender}; use mio::timer::{Timeout, Timer, TimerError}; use mio::{Events, Poll, PollOpt, Ready, Token}; use p2p::{Config, Interface, NatMsg, NatState, NatTimer}; use std::any::Any; // use socket_collection::{EpollLoop, Handle, Notifier}; use sodium::crypto::box_; use std::cell::RefCell; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::rc::Rc; use std::thread::{self, JoinHandle}; use std::time::Duration; pub struct Core { nat_states: HashMap>>, peer_states: HashMap>>, core_timer: Timer, nat_timer: Timer, token: usize, config: Config, enc_pk: box_::PublicKey, enc_sk: box_::SecretKey, tx: Sender, // udt_epoll_handle: Handle, } impl Core { #[allow(unused)] pub fn insert_peer_state( &mut self, token: Token, state: Rc>, ) -> Result<(), (Rc>, String)> { if let Entry::Vacant(ve) = self.peer_states.entry(token) { ve.insert(state); Ok(()) } else { Err((state, "Token is already mapped".to_string())) } } pub fn remove_peer_state(&mut self, token: Token) -> Option>> { self.peer_states.remove(&token) } // pub fn udt_epoll_handle(&self) -> Handle { // unreachable!("For later"); // //self.udt_epoll_handle.clone() // } pub fn peer_state(&mut self, token: Token) -> Option>> { self.peer_states.get(&token).cloned() } pub fn set_core_timeout( &mut self, duration: Duration, timer_detail: CoreTimer, ) -> Result { self.core_timer.set_timeout(duration, timer_detail) } pub fn cancel_core_timeout(&mut self, timeout: &Timeout) -> Option { self.core_timer.cancel_timeout(timeout) } fn handle_core_timer(&mut self, poll: &Poll) { while let Some(core_timer) = self.core_timer.poll() { if let Some(peer_state) = self.peer_state(core_timer.associated_peer_state) { peer_state .borrow_mut() .timeout(self, poll, core_timer.timer_id); } } } fn handle_nat_timer(&mut self, poll: &Poll) { while let Some(nat_timer) = self.nat_timer.poll() { if let Some(nat_state) = self.state(nat_timer.associated_nat_state) { nat_state .borrow_mut() .timeout(self, poll, nat_timer.timer_id); } } } fn handle_readiness(&mut self, poll: &Poll, token: Token, kind: Ready) { if let Some(nat_state) = self.state(token) { return nat_state.borrow_mut().ready(self, poll, kind); } if let Some(peer_state) = self.peer_states.get(&token).cloned() { return peer_state.borrow_mut().ready(self, poll, kind); } } } impl Interface for Core { fn insert_state( &mut self, token: Token, state: Rc>, ) -> Result<(), (Rc>, String)> { if let Entry::Vacant(ve) = self.nat_states.entry(token) { ve.insert(state); Ok(()) } else { Err((state, "Token is already mapped".to_string())) } } fn remove_state(&mut self, token: Token) -> Option>> { self.nat_states.remove(&token) } fn state(&mut self, token: Token) -> Option>> { self.nat_states.get(&token).cloned() } fn set_timeout( &mut self, duration: Duration, timer_detail: NatTimer, ) -> Result { self.nat_timer.set_timeout(duration, timer_detail) } fn cancel_timeout(&mut self, timeout: &Timeout) -> Option { self.nat_timer.cancel_timeout(timeout) } fn new_token(&mut self) -> Token { self.token += 1; Token(self.token) } fn config(&self) -> &Config { &self.config } fn enc_pk(&self) -> &box_::PublicKey { &self.enc_pk } fn enc_sk(&self) -> &box_::SecretKey { &self.enc_sk } fn sender(&self) -> &Sender { &self.tx } fn as_any(&mut self) -> &mut Any { self } } pub trait CoreState { fn ready(&mut self, &mut Core, &Poll, Ready); fn write(&mut self, &mut Core, &Poll, Vec) {} fn timeout(&mut self, &mut Core, &Poll, u8) {} fn terminate(&mut self, &mut Core, &Poll); fn as_any(&mut self) -> &mut Any; } pub struct CoreTimer { pub associated_peer_state: Token, pub timer_id: u8, } impl CoreTimer { pub fn new(associated_peer_state: Token, timer_id: u8) -> Self { CoreTimer { associated_peer_state, timer_id, } } } pub struct CoreMsg(Option>); impl CoreMsg { pub fn new(f: F) -> Self { let mut f = Some(f); CoreMsg(Some(Box::new(move |core: &mut Core, poll: &Poll| { if let Some(f) = f.take() { f(core, poll); } }))) } } // pub struct Notify(Sender); // impl Notifier for Notify { // fn notify(&self, event: Event) { // unwrap!(self.0.send(CoreMsg::new(move |core, poll| { // core.handle_readiness(poll, event.token(), event.kind()); // }))); // } // } pub struct El { pub nat_tx: Sender, pub core_tx: Sender, joiner: Option>, } impl Drop for El { fn drop(&mut self) { let _ = self.core_tx.send(CoreMsg(None)); let joiner = unwrap!(self.joiner.take()); unwrap!(joiner.join()); println!("Gracefully shut down mio event loop"); } } pub fn spawn_event_loop(p2p_cfg: Config) -> El { let (core_tx, core_rx) = channel::channel::(); let (nat_tx, nat_rx) = channel::channel(); let nat_tx_cloned = nat_tx.clone(); let core_tx_cloned = core_tx.clone(); let joiner = thread::spawn(move || { const CORE_TIMER_TOKEN: usize = 0; const NAT_TIMER_TOKEN: usize = CORE_TIMER_TOKEN + 1; const CORE_RX_TOKEN: usize = NAT_TIMER_TOKEN + 1; const NAT_RX_TOKEN: usize = CORE_RX_TOKEN + 1; let poll = unwrap!(Poll::new()); let (enc_pk, enc_sk) = box_::gen_keypair(); let core_timer = Timer::default(); let nat_timer = Timer::default(); unwrap!(poll.register( &core_timer, Token(CORE_TIMER_TOKEN), Ready::readable() | Ready::error() | Ready::hup(), PollOpt::edge(), )); unwrap!(poll.register( &nat_timer, Token(NAT_TIMER_TOKEN), Ready::readable() | Ready::error() | Ready::hup(), PollOpt::edge(), )); unwrap!(poll.register( &core_rx, Token(CORE_RX_TOKEN), Ready::readable() | Ready::error() | Ready::hup(), PollOpt::edge(), )); unwrap!(poll.register( &nat_rx, Token(NAT_RX_TOKEN), Ready::readable() | Ready::error() | Ready::hup(), PollOpt::edge(), )); // let notifier = Notify(core_tx); // let epoll_loop = unwrap!(EpollLoop::start_event_loop(notifier)); // let udt_epoll_handle = epoll_loop.handle(); let mut core = Core { nat_states: HashMap::with_capacity(10), peer_states: HashMap::with_capacity(5), core_timer, nat_timer, token: NAT_RX_TOKEN + 1, config: p2p_cfg, enc_pk: enc_pk, enc_sk: enc_sk, tx: nat_tx, // udt_epoll_handle, }; let mut events = Events::with_capacity(1024); 'event_loop: loop { unwrap!(poll.poll(&mut events, None)); for event in events.iter() { match event.token() { Token(t) if t == CORE_TIMER_TOKEN => { assert!(event.kind().is_readable()); core.handle_core_timer(&poll); } Token(t) if t == NAT_TIMER_TOKEN => { assert!(event.kind().is_readable()); core.handle_nat_timer(&poll); } Token(t) if t == CORE_RX_TOKEN => { assert!(event.kind().is_readable()); while let Ok(f) = core_rx.try_recv() { if let Some(mut f) = f.0 { f(&mut core, &poll); } else { break 'event_loop; } } } Token(t) if t == NAT_RX_TOKEN => { assert!(event.kind().is_readable()); while let Ok(f) = nat_rx.try_recv() { f.invoke(&mut core, &poll); } } t => core.handle_readiness(&poll, t, event.kind()), } } } }); El { nat_tx: nat_tx_cloned, core_tx: core_tx_cloned, joiner: Some(joiner), } }