pub use self::active_peer::ActivePeer; pub use self::event::Event; pub use self::overlay_connect::OverlayConnect; use common::event_loop::{spawn_event_loop, CoreMsg, El}; use common::read_config; use common::types::PlainTextMsg; use maidsafe_utilities::serialisation::serialise; use maidsafe_utilities::thread::{self, Joiner}; use mio::Token; use p2p::{Config, Handle, RendezvousInfo}; use std::io; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::{mpsc, Arc, Mutex}; use std::time::Duration; use std::time::Instant; mod active_peer; mod event; mod overlay_connect; #[derive(Serialize, Deserialize)] pub struct FullConfig { pub peer_cfg: PeerConfig, pub p2p_cfg: Config, } #[derive(Serialize, Deserialize)] pub struct PeerConfig { overlay_addr: SocketAddr, } #[derive(Debug)] pub enum PeerState { Discovered, CreatingRendezvousInfo { mediator_token: Token, peer_info: Option, }, AwaitingPeerRendezvous { since: Instant, p2p_handle: Handle, }, AwaitingHolePunchResult, Connected(Token), } impl Default for PeerState { fn default() -> Self { PeerState::Discovered } } const MENU: &str = " ------------------------------------------- | ====== | | Menu | | ====== | 1) Show Online & Connected (*) Peers | 2) Refresh Online Peers List | 3) Connect to | 4) Chat with | 5) Information about inputs to (2) and (3) | 0) Quit/Exit ------------------------------------------- "; pub fn entry_point() { let cfg: FullConfig = read_config("./peer-config"); let el = spawn_event_loop(cfg.p2p_cfg); let peer_cfg = cfg.peer_cfg; println!("Enter Name [Name must be unique (preferably) and cannot contain spaces]:"); let mut name = String::new(); loop { unwrap!(io::stdin().read_line(&mut name)); name = name.trim().to_string(); if name.is_empty() || name.contains(" ") { println!("Invalid Name. Choose a valid Name:"); name.clear(); } else { break; } } let (event_tx, event_rx) = mpsc::channel(); let peers = Arc::new(Mutex::new(Default::default())); { let tx = event_tx.clone(); let name = name.clone(); let peers = peers.clone(); let overlay = peer_cfg.overlay_addr; unwrap!(el.core_tx.send(CoreMsg::new(move |core, poll| { OverlayConnect::start(core, poll, &overlay, name, peers, tx); }))); } let event = unwrap!(event_rx.recv_timeout(Duration::from_secs(5))); println!("{}", event); let overlay_token = match event { Event::OverlayConnected(t) => t, x => panic!("Unexpected event: {:?}", x), }; let _j = print_events(event_rx); let mut choice = String::new(); loop { println!("\n{}\nChoose an option:", MENU); unwrap!(io::stdin().read_line(&mut choice)); choice = choice.trim().to_string(); if choice == "1" { let mut list = String::new(); unwrap!(peers.lock()) .iter() .for_each(|(ref id, ref peer_state)| { list.push_str(&format!( "{} {}\n", id, if let PeerState::Connected(_) = peer_state { "*" } else { "" } )) }); if list.is_empty() { list = "List is empty. Try refreshing.".to_string(); } println!("List:\n{}", list); } else if choice == "2" { print!("Refreshing... "); unwrap!(el.core_tx.send(CoreMsg::new(move |core, poll| { if let Some(overlay) = core.peer_state(overlay_token) { let m = unwrap!(serialise(&PlainTextMsg::ReqOnlinePeers)); overlay.borrow_mut().write(core, poll, m); } }))); } else if choice == "3" { println!("Enter peer id. Partial id/name can be given:"); let mut peer_choice = String::new(); unwrap!(io::stdin().read_line(&mut peer_choice)); peer_choice = peer_choice.trim().to_string(); let mut found_peer = None; { let peers_guard = unwrap!(peers.lock()); for (id, peer_state) in &*peers_guard { let peer_fmt = format!("{}", id); if peer_fmt.contains(&peer_choice) { if let PeerState::Discovered = peer_state { if found_peer.is_some() { println!( "Ambiguous, multiple matches found. More qualification needed." ); found_peer = None; break; } else { found_peer = Some(id.clone()); } } else { println!( "Peer is either in the process of being connected or is already \ connected. Check status after sometime and retry if necessary." ); found_peer = None; break; } } } } if let Some(peer_id) = found_peer { unwrap!(el.core_tx.send(CoreMsg::new(move |core, poll| { let overlay = unwrap!(core.peer_state(overlay_token)); let mut overlay = overlay.borrow_mut(); let overlay_connect = unwrap!(overlay.as_any().downcast_mut::()); overlay_connect.start_connect_with_peer(core, poll, peer_id.clone()); }))); } else { println!("Aborting due to previous errors (if printed) or due to peer not found."); } } else if choice == "4" { println!("Enter peer id. Partial id/name can be given:"); let mut peer_choice = String::new(); unwrap!(io::stdin().read_line(&mut peer_choice)); peer_choice = peer_choice.trim().to_string(); let mut found_peer = None; { let peers_guard = unwrap!(peers.lock()); for (id, peer_state) in &*peers_guard { let peer_fmt = format!("{}", id); if peer_fmt.contains(&peer_choice) { if let PeerState::Connected(token) = peer_state { if found_peer.is_some() { println!( "Ambiguous, multiple matches found. More qualification needed." ); found_peer = None; break; } else { found_peer = Some(*token); } } else { println!( "Peer is not connected. Check status after sometime and retry if \ necessary." ); found_peer = None; break; } } } } if let Some(token) = found_peer { let (tx, rx) = mpsc::channel(); let tx_clone = tx.clone(); unwrap!(el.core_tx.send(CoreMsg::new(move |core, poll| { let peer = unwrap!(core.peer_state(token)); let mut peer = peer.borrow_mut(); let active_peer = unwrap!(peer.as_any().downcast_mut::()); active_peer.flush_and_stop_buffering(); unwrap!(tx_clone.send(())); }))); unwrap!(rx.recv()); let disconnected = start_chat(&el, token); if !disconnected { unwrap!(el.core_tx.send(CoreMsg::new(move |core, poll| { let peer = unwrap!(core.peer_state(token)); let mut peer = peer.borrow_mut(); let active_peer = unwrap!(peer.as_any().downcast_mut::()); active_peer.start_buffering(); unwrap!(tx.send(())); }))); unwrap!(rx.recv()); } } else { println!("Aborting due to previous errors (if printed) or due to peer not found."); } } else if choice == "5" { println!( "E.g. if the list has a peer id as \"Blah (1baf3e..)\" you may enter the \ whole thing as is, or just \"Blah\" or just \"lah\" or just \"af3e\" etc. all \ without quotes. However if the list also contains someone else like \"Blah-blah\" \ then \"lah\" will match that too and return error for non unique match. In such \ cases qualify more until you have a unique match, giving the whole ID in the worst \ case." ); } else if choice == "0" { break; } else { println!("Invalid option !"); } choice.clear(); } unwrap!(event_tx.send(Event::Quit)); } fn start_chat(el: &El, peer: Token) -> bool { println!("Enter (without quotes) \"\" to exit this chat."); let mut disconnected = false; let (tx, rx) = mpsc::channel(); loop { let mut input = String::new(); let _ = unwrap!(io::stdin().read_line(&mut input)); input = input.trim().to_owned(); if input == "" { break; } let m = unwrap!(serialise(&PlainTextMsg::Chat(input))); let tx = tx.clone(); unwrap!(el.core_tx.send(CoreMsg::new(move |core, poll| { if let Some(active_peer) = core.peer_state(peer) { active_peer.borrow_mut().write(core, poll, m); unwrap!(tx.send(true)); } else { unwrap!(tx.send(false)); } }))); if !unwrap!(rx.recv()) { println!("Peer is now disconnected. Try reconnecting to chat again."); disconnected = true; break; } } disconnected } fn print_events(rx: mpsc::Receiver) -> Joiner { thread::named("Event-Rx", move || { for event in rx.iter() { match event { Event::Quit => break, e => println!("{}", e), } } }) }