use std::collections::HashSet; use std::thread; use std::time::Duration; use crossbeam_channel::{self as chan, select}; use datachannel::{ ConnectionState, DataChannelHandler, DataChannelInfo, GatheringState, IceCandidate, PeerConnectionHandler, RtcConfig, RtcDataChannel, RtcPeerConnection, SessionDescription, }; #[cfg(feature = "log")] use log as logger; #[cfg(feature = "tracing")] use tracing as logger; enum ConnectionMsg { RemoteDescription { sess_desc: SessionDescription }, RemoteCandidate { cand: IceCandidate }, Stop, } struct Ping { output: chan::Sender, ready: chan::Sender<()>, } impl Ping { fn new(output: chan::Sender, ready: chan::Sender<()>) -> Self { Ping { output, ready } } } impl DataChannelHandler for Ping { fn on_open(&mut self) { logger::info!("DataChannel PING: Open"); self.ready.send(()).ok(); } fn on_message(&mut self, msg: &[u8]) { let msg = String::from_utf8_lossy(msg).to_string(); logger::info!("DataChannel PING: Received message: {}", &msg); self.output.send(msg).ok(); } } #[derive(Clone)] struct Pong { output: chan::Sender, } impl Pong { fn new(output: chan::Sender) -> Self { Pong { output } } } impl DataChannelHandler for Pong { fn on_message(&mut self, msg: &[u8]) { let msg = String::from_utf8_lossy(msg).to_string(); logger::info!("DataChannel PONG: Received message: {}", &msg); self.output.send(msg).ok(); } } struct LocalConn { id: usize, signaling: chan::Sender, pong: Pong, dc: Option>>, } impl LocalConn { fn new(id: usize, pong: Pong, signaling: chan::Sender) -> Self { LocalConn { id, signaling, pong, dc: None, } } } impl PeerConnectionHandler for LocalConn { type DCH = Pong; fn data_channel_handler(&mut self, _info: DataChannelInfo) -> Pong { self.pong.clone() } fn on_description(&mut self, sess_desc: SessionDescription) { logger::info!("Description {}: {:?}", self.id, &sess_desc); self.signaling .send(ConnectionMsg::RemoteDescription { sess_desc }) .ok(); } fn on_candidate(&mut self, cand: IceCandidate) { logger::info!("Candidate {}: {} {}", self.id, &cand.candidate, &cand.mid); self.signaling .send(ConnectionMsg::RemoteCandidate { cand }) .ok(); } fn on_connection_state_change(&mut self, state: ConnectionState) { logger::info!("State {}: {:?}", self.id, state); } fn on_gathering_state_change(&mut self, state: GatheringState) { logger::info!("Gathering state {}: {:?}", self.id, state); } fn on_data_channel(&mut self, mut dc: Box>) { logger::info!( "PeerConnection {}: Received DataChannel with label={}, protocol={:?}, reliability={:?}", self.id, dc.label(), dc.protocol(), dc.reliability() ); dc.send(format!("PONG from {}", self.id).as_bytes()).ok(); self.dc.replace(dc); } } #[test] fn test_connectivity() { #[cfg(feature = "tracing")] { tracing::subscriber::set_global_default( tracing_subscriber::FmtSubscriber::builder() .with_max_level(tracing::Level::INFO) .finish(), ) .ok(); datachannel::configure_logging(tracing::Level::INFO); } #[cfg(feature = "log")] { std::env::set_var("RUST_LOG", "info"); let _ = env_logger::try_init(); } let (tx_res, rx_res) = chan::unbounded::(); let (tx_peer1, rx_peer1) = chan::unbounded::(); let (tx_peer2, rx_peer2) = chan::unbounded::(); let id1 = 1; let id2 = 2; let pong1 = Pong::new(tx_res.clone()); let pong2 = Pong::new(tx_res.clone()); let conn1 = LocalConn::new(id1, pong1, tx_peer2.clone()); let conn2 = LocalConn::new(id2, pong2, tx_peer1.clone()); let ice_servers = vec!["stun:stun.l.google.com:19302"]; let conf = RtcConfig::new(&ice_servers); let mut pc1 = RtcPeerConnection::new(&conf, conn1).unwrap(); let mut pc2 = RtcPeerConnection::new(&conf, conn2).unwrap(); let t2 = thread::spawn(move || { while let Ok(msg) = rx_peer2.recv() { match msg { ConnectionMsg::RemoteDescription { sess_desc } => { pc2.set_remote_description(&sess_desc).ok(); } ConnectionMsg::RemoteCandidate { cand } => { pc2.add_remote_candidate(&cand).ok(); } ConnectionMsg::Stop => break, } } }); let t1 = thread::spawn(move || { let (tx_ready, rx_ready) = chan::unbounded(); let ping = Ping::new(tx_res.clone(), tx_ready); let mut dc = pc1.create_data_channel("ping-pong", ping).unwrap(); loop { select! { recv(rx_peer1) -> msg => { match msg.unwrap() { ConnectionMsg::RemoteDescription { sess_desc } => { pc1.set_remote_description(&sess_desc).ok(); } ConnectionMsg::RemoteCandidate { cand } => { pc1.add_remote_candidate(&cand).ok(); }, ConnectionMsg::Stop => break, } }, recv(rx_ready) -> _ => { dc.send(format!("PING from {}", id1).as_bytes()).ok(); } } } }); let mut expected = HashSet::new(); expected.insert("PING from 1".to_string()); expected.insert("PONG from 2".to_string()); let mut res = HashSet::new(); res.insert(rx_res.recv_timeout(Duration::from_secs(10)).unwrap()); res.insert(rx_res.recv_timeout(Duration::from_secs(10)).unwrap()); assert_eq!(expected, res); tx_peer1.send(ConnectionMsg::Stop).unwrap(); tx_peer2.send(ConnectionMsg::Stop).unwrap(); t2.join().unwrap(); t1.join().unwrap(); }