use common::event_loop::{Core, CoreState}; use common::types::{PeerId, PeerMsg, PlainTextMsg}; use maidsafe_utilities::serialisation::{deserialise, serialise}; use mio::{Poll, PollOpt, Ready, Token}; use p2p::{msg_to_read, msg_to_send, Interface, RendezvousInfo}; use socket_collection::TcpSock; use sodium::crypto::box_; use std::any::Any; use std::cell::RefCell; use std::collections::BTreeMap; use std::rc::{Rc, Weak}; use std::{fmt, mem}; pub struct Peer { token: Token, sock: TcpSock, state: CurrentState, peers: Weak>>, } enum CurrentState { AwaitingPeerPk, AwaitingPeerName { pk: box_::PublicKey, key: box_::PrecomputedKey, }, PeerActivated { id: PeerId, key: box_::PrecomputedKey, }, } impl Default for CurrentState { fn default() -> Self { CurrentState::AwaitingPeerPk } } impl fmt::Debug for CurrentState { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { CurrentState::AwaitingPeerPk => write!(f, "CurrentState::AwaitingPeerPk"), CurrentState::AwaitingPeerName { .. } => write!(f, "CurrentState::AwaitingPeerName"), CurrentState::PeerActivated { .. } => write!(f, "CurrentState::PeerActivated"), } } } impl Peer { pub fn start( core: &mut Core, poll: &Poll, sock: TcpSock, peers: Weak>>, ) { let token = core.new_token(); unwrap!(poll.register( &sock, token, Ready::readable() | Ready::writable(), PollOpt::edge(), )); let state = Self { token, sock, state: Default::default(), peers, }; if core .insert_peer_state(token, Rc::new(RefCell::new(state))) .is_err() { panic!("Could not start Overlay !"); } } fn read(&mut self, core: &mut Core, poll: &Poll) { loop { match self.sock.read() { Ok(Some(PeerMsg::PubKey(pk))) => { if !self.handle_peer_pk(core, poll, pk) { return self.terminate(core, poll); } } Ok(Some(PeerMsg::CipherText(ct))) => { if !self.handle_ciphertext(core, poll, &ct) { return self.terminate(core, poll); } } Ok(None) => return, Err(e) => { // TODO Make this debug better as such: // debug!("{:?} - Failed to read from sock: {:?}", self.our_id, e); debug!("Failed to read from sock: {:?}", e); return self.terminate(core, poll); } } } } fn write(&mut self, core: &mut Core, poll: &Poll, m: Option) { if let Err(e) = self.sock.write(m.map(|m| (m, 0))) { debug!("Failed to write to sock: {:?}", e); self.terminate(core, poll); } } fn handle_peer_pk(&mut self, core: &mut Core, poll: &Poll, pk: box_::PublicKey) -> bool { match self.state { CurrentState::AwaitingPeerPk => (), ref x => { info!("Message cannot be handled in the current state: {:?}", x); return false; } } self.state = CurrentState::AwaitingPeerName { pk, key: box_::precompute(&pk, core.enc_sk()), }; let overlay_pk = *core.enc_pk(); self.write(core, poll, Some(PeerMsg::PubKey(overlay_pk))); true } fn handle_ciphertext(&mut self, core: &mut Core, poll: &Poll, ciphertext: &[u8]) -> bool { let plaintext_ser = match self.state { CurrentState::AwaitingPeerName { ref key, .. } | CurrentState::PeerActivated { ref key, .. } => match msg_to_read(ciphertext, key) { Ok(pt) => pt, Err(e) => { info!("Error decrypting: {:?}", e); return false; } }, ref x => { info!("Message cannot be handled in the current state: {:?}", x); return false; } }; let plaintext = match deserialise(&plaintext_ser) { Ok(pt) => pt, Err(e) => { info!("Error deserialising: {:?}", e); return false; } }; match plaintext { PlainTextMsg::ReqUpdateName(name) => self.handle_update_name(core, poll, name), PlainTextMsg::ReqOnlinePeers => self.handle_req_online_peers(core, poll), PlainTextMsg::ExchgRendezvousInfo { src_info, dst_peer } => { self.forward_rendezvous_impl(core, poll, src_info, dst_peer) } x => { info!("Invalid PlainTextMsg: {:?}", x); return false; } } } fn handle_update_name(&mut self, core: &mut Core, poll: &Poll, name: String) -> bool { let peers = match self.peers.upgrade() { Some(peers) => peers, None => { warn!("Peer list unexpectedly unavailable !"); return false; } }; match mem::replace(&mut self.state, Default::default()) { CurrentState::AwaitingPeerName { pk, key } => { let id = PeerId::new(name, pk); let ciphertext = if id.name.is_empty() || id.name.contains(" ") || peers.borrow().contains_key(&id) { trace!("Invalid name or identity already taken - choose a different one"); let resp_ser = unwrap!(serialise(&PlainTextMsg::UpdateNameResp(false))); let ciphertext = unwrap!(msg_to_send(&resp_ser, &key)); self.state = CurrentState::AwaitingPeerName { pk, key }; ciphertext } else { let resp_ser = unwrap!(serialise(&PlainTextMsg::UpdateNameResp(true))); let ciphertext = unwrap!(msg_to_send(&resp_ser, &key)); self.state = CurrentState::PeerActivated { id: id.clone(), key, }; if peers.borrow_mut().insert(id, self.token).is_some() { panic!("Logic Error in updating name: id existed and is now displaced !"); } ciphertext }; self.write(core, poll, Some(PeerMsg::CipherText(ciphertext))); true } x => { info!("Message cannot be handled in the current state: {:?}", x); false } } } fn handle_req_online_peers(&mut self, core: &mut Core, poll: &Poll) -> bool { let ciphertext = match self.state { CurrentState::PeerActivated { ref key, .. } => { let peers = match self.peers.upgrade() { Some(peers) => peers, None => { warn!("Peer list unexpectedly unavailable !"); return false; } }; let peers: Vec = peers.borrow().keys().cloned().collect(); let peers_ser = unwrap!(serialise(&PlainTextMsg::OnlinePeersResp(peers))); unwrap!(msg_to_send(&peers_ser, key)) } ref x => { info!("Message cannot be handled in the current state: {:?}", x); return false; } }; self.write(core, poll, Some(PeerMsg::CipherText(ciphertext))); true } fn forward_rendezvous_impl( &mut self, core: &mut Core, poll: &Poll, src_info: RendezvousInfo, dst_peer: PeerId, ) -> bool { let src_peer = match self.state { CurrentState::PeerActivated { ref id, .. } => id.clone(), ref x => { info!("Message cannot be handled in the current state: {:?}", x); return false; } }; let peers = match self.peers.upgrade() { Some(peers) => peers, None => { warn!("Peer list unexpectedly unavailable !"); return false; } }; let dst_token = match peers.borrow().get(&dst_peer) { Some(&t) => t, None => { trace!("Destination Peer is no longer online."); return true; } }; let dst_peer_state = match core.peer_state(dst_token) { Some(ps) => ps, None => { warn!("Destination Peer is online but does not have a peer-state."); return true; } }; let fwd_info = PlainTextMsg::FwdRendezvousInfo { src_info, src_peer }; let fwd_info_ser = unwrap!(serialise(&fwd_info)); dst_peer_state.borrow_mut().write(core, poll, fwd_info_ser); true } } impl CoreState for Peer { fn ready(&mut self, core: &mut Core, poll: &Poll, kind: Ready) { if kind.is_readable() { self.read(core, poll) } else if kind.is_writable() { self.write(core, poll, None) } else { warn!("Unknown kind: {:?}", kind); } } fn write(&mut self, core: &mut Core, poll: &Poll, data: Vec) { let ciphertext = match self.state { CurrentState::PeerActivated { ref key, .. } => unwrap!(msg_to_send(&data, key)), ref x => { info!("Message cannot be handled in the current state: {:?}", x); return; } }; self.write(core, poll, Some(PeerMsg::CipherText(ciphertext))); } fn terminate(&mut self, core: &mut Core, poll: &Poll) { if let Some(peers) = self.peers.upgrade() { if let CurrentState::PeerActivated { ref id, .. } = self.state { let _ = peers.borrow_mut().remove(&id); } } let _ = poll.deregister(&self.sock); let _ = core.remove_peer_state(self.token); } fn as_any(&mut self) -> &mut Any { self } }