use std::{ collections::HashMap, fmt, net::{Ipv4Addr, SocketAddrV4}, str::FromStr, }; use anyhow::{bail, Context, Result}; use bytes::Bytes; use clap::Parser; use ed25519_dalek::Signature; use futures_lite::StreamExt; use iroh_base::base32; use iroh_gossip::{ net::{Event, Gossip, GossipEvent, GossipReceiver, GOSSIP_ALPN}, proto::TopicId, }; use iroh_net::{ key::{PublicKey, SecretKey}, relay::{RelayMap, RelayMode, RelayUrl}, Endpoint, NodeAddr, }; use serde::{Deserialize, Serialize}; use tracing::warn; /// Chat over iroh-gossip /// /// This broadcasts signed messages over iroh-gossip and verifies signatures /// on received messages. /// /// By default a new node id is created when starting the example. To reuse your identity, /// set the `--secret-key` flag with the secret key printed on a previous invocation. /// /// By default, the relay server run by n0 is used. To use a local relay server, run /// cargo run --bin iroh-relay --features iroh-relay -- --dev /// in another terminal and then set the `-d http://localhost:3340` flag on this example. #[derive(Parser, Debug)] struct Args { /// secret key to derive our node id from. #[clap(long)] secret_key: Option, /// Set a custom relay server. By default, the relay server hosted by n0 will be used. #[clap(short, long)] relay: Option, /// Disable relay completely. #[clap(long)] no_relay: bool, /// Set your nickname. #[clap(short, long)] name: Option, /// Set the bind port for our socket. By default, a random port will be used. #[clap(short, long, default_value = "0")] bind_port: u16, #[clap(subcommand)] command: Command, } #[derive(Parser, Debug)] enum Command { /// Open a chat room for a topic and print a ticket for others to join. /// /// If no topic is provided, a new topic will be created. Open { /// Optionally set the topic id (32 bytes, as base32 string). topic: Option, }, /// Join a chat room from a ticket. Join { /// The ticket, as base32 string. ticket: String, }, } #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt::init(); let args = Args::parse(); // parse the cli command let (topic, peers) = match &args.command { Command::Open { topic } => { let topic = topic.unwrap_or_else(|| TopicId::from_bytes(rand::random())); println!("> opening chat room for topic {topic}"); (topic, vec![]) } Command::Join { ticket } => { let Ticket { topic, peers } = Ticket::from_str(ticket)?; println!("> joining chat room for topic {topic}"); (topic, peers) } }; // parse or generate our secret key let secret_key = match args.secret_key { None => SecretKey::generate(), Some(key) => key.parse()?, }; println!("> our secret key: {secret_key}"); // configure our relay map let relay_mode = match (args.no_relay, args.relay) { (false, None) => RelayMode::Default, (false, Some(url)) => RelayMode::Custom(RelayMap::from_url(url)), (true, None) => RelayMode::Disabled, (true, Some(_)) => bail!("You cannot set --no-relay and --relay at the same time"), }; println!("> using relay servers: {}", fmt_relay_mode(&relay_mode)); // build our magic endpoint let endpoint = Endpoint::builder() .secret_key(secret_key) .alpns(vec![GOSSIP_ALPN.to_vec()]) .relay_mode(relay_mode) .bind_addr_v4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, args.bind_port)) .bind() .await?; println!("> our node id: {}", endpoint.node_id()); let my_addr = endpoint.node_addr().await?; // create the gossip protocol let gossip = Gossip::from_endpoint(endpoint.clone(), Default::default(), &my_addr.info); // print a ticket that includes our own node id and endpoint addresses let ticket = { let me = endpoint.node_addr().await?; let peers = peers.iter().cloned().chain([me]).collect(); Ticket { topic, peers } }; println!("> ticket to join us: {ticket}"); // spawn our endpoint loop that forwards incoming connections to the gossiper tokio::spawn(endpoint_loop(endpoint.clone(), gossip.clone())); // join the gossip topic by connecting to known peers, if any let peer_ids = peers.iter().map(|p| p.node_id).collect(); if peers.is_empty() { println!("> waiting for peers to join us..."); } else { println!("> trying to connect to {} peers...", peers.len()); // add the peer addrs from the ticket to our endpoint's addressbook so that they can be dialed for peer in peers.into_iter() { endpoint.add_node_addr(peer)?; } }; let (sender, receiver) = gossip.join(topic, peer_ids).await?.split(); println!("> connected!"); // broadcast our name, if set if let Some(name) = args.name { let message = Message::AboutMe { name }; let encoded_message = SignedMessage::sign_and_encode(endpoint.secret_key(), &message)?; sender.broadcast(encoded_message).await?; } // subscribe and print loop tokio::spawn(subscribe_loop(receiver)); // spawn an input thread that reads stdin // not using tokio here because they recommend this for "technical reasons" let (line_tx, mut line_rx) = tokio::sync::mpsc::channel(1); std::thread::spawn(move || input_loop(line_tx)); // broadcast each line we type println!("> type a message and hit enter to broadcast..."); while let Some(text) = line_rx.recv().await { let message = Message::Message { text: text.clone() }; let encoded_message = SignedMessage::sign_and_encode(endpoint.secret_key(), &message)?; sender.broadcast(encoded_message).await?; println!("> sent: {text}"); } Ok(()) } async fn subscribe_loop(mut receiver: GossipReceiver) -> Result<()> { // init a peerid -> name hashmap let mut names = HashMap::new(); while let Some(event) = receiver.try_next().await? { if let Event::Gossip(GossipEvent::Received(msg)) = event { let (from, message) = SignedMessage::verify_and_decode(&msg.content)?; match message { Message::AboutMe { name } => { names.insert(from, name.clone()); println!("> {} is now known as {}", from.fmt_short(), name); } Message::Message { text } => { let name = names .get(&from) .map_or_else(|| from.fmt_short(), String::to_string); println!("{}: {}", name, text); } } } } Ok(()) } async fn endpoint_loop(endpoint: Endpoint, gossip: Gossip) { while let Some(incoming) = endpoint.accept().await { let conn = match incoming.accept() { Ok(conn) => conn, Err(err) => { warn!("incoming connection failed: {err:#}"); // we can carry on in these cases: // this can be caused by retransmitted datagrams continue; } }; let gossip = gossip.clone(); tokio::spawn(async move { if let Err(err) = handle_connection(conn, gossip).await { println!("> connection closed: {err}"); } }); } } async fn handle_connection( mut conn: iroh_net::endpoint::Connecting, gossip: Gossip, ) -> anyhow::Result<()> { let alpn = conn.alpn().await?; let conn = conn.await?; let peer_id = iroh_net::endpoint::get_remote_node_id(&conn)?; match alpn.as_ref() { GOSSIP_ALPN => gossip.handle_connection(conn).await.context(format!( "connection to {peer_id} with ALPN {} failed", String::from_utf8_lossy(&alpn) ))?, _ => println!("> ignoring connection from {peer_id}: unsupported ALPN protocol"), } Ok(()) } fn input_loop(line_tx: tokio::sync::mpsc::Sender) -> Result<()> { let mut buffer = String::new(); let stdin = std::io::stdin(); // We get `Stdin` here. loop { stdin.read_line(&mut buffer)?; line_tx.blocking_send(buffer.clone())?; buffer.clear(); } } #[derive(Debug, Serialize, Deserialize)] struct SignedMessage { from: PublicKey, data: Bytes, signature: Signature, } impl SignedMessage { pub fn verify_and_decode(bytes: &[u8]) -> Result<(PublicKey, Message)> { let signed_message: Self = postcard::from_bytes(bytes)?; let key: PublicKey = signed_message.from; key.verify(&signed_message.data, &signed_message.signature)?; let message: Message = postcard::from_bytes(&signed_message.data)?; Ok((signed_message.from, message)) } pub fn sign_and_encode(secret_key: &SecretKey, message: &Message) -> Result { let data: Bytes = postcard::to_stdvec(&message)?.into(); let signature = secret_key.sign(&data); let from: PublicKey = secret_key.public(); let signed_message = Self { from, data, signature, }; let encoded = postcard::to_stdvec(&signed_message)?; Ok(encoded.into()) } } #[derive(Debug, Serialize, Deserialize)] enum Message { AboutMe { name: String }, Message { text: String }, } #[derive(Debug, Serialize, Deserialize)] struct Ticket { topic: TopicId, peers: Vec, } impl Ticket { /// Deserializes from bytes. fn from_bytes(bytes: &[u8]) -> Result { postcard::from_bytes(bytes).map_err(Into::into) } /// Serializes to bytes. pub fn to_bytes(&self) -> Vec { postcard::to_stdvec(self).expect("postcard::to_stdvec is infallible") } } /// Serializes to base32. impl fmt::Display for Ticket { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{}", base32::fmt(self.to_bytes())) } } /// Deserializes from base32. impl FromStr for Ticket { type Err = anyhow::Error; fn from_str(s: &str) -> Result { Self::from_bytes(&base32::parse_vec(s)?) } } // helpers fn fmt_relay_mode(relay_mode: &RelayMode) -> String { match relay_mode { RelayMode::Disabled => "None".to_string(), RelayMode::Default => "Default Relay (production) servers".to_string(), RelayMode::Staging => "Default Relay (staging) servers".to_string(), RelayMode::Custom(map) => map .urls() .map(|url| url.to_string()) .collect::>() .join(", "), } }