mod blockfetch; mod chainsync; mod channel; mod stream; mod txsubmit; pub mod packet; use cardano_sdk::chaininfo::ChainInfo; use cardano_sdk::protocol::{ BlockFetch, ChainSync, DiffusionMode, Point, Protocol, TxSubmit, Version, }; use std::collections::HashMap; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::str::FromStr; use stream::StreamStatsShared; use thiserror::*; use tokio::net::TcpStream; use tracing::{debug, info}; use trust_dns_resolver::{config::*, TokioAsyncResolver}; pub use blockfetch::BlockFetcher; pub use chainsync::ChainIntersection; pub use channel::Channel; use packet::rawchan::{ChannelWriter, RawChannel}; pub use packet::{frame::PacketBytes, ProtocolError}; pub use txsubmit::SubmitNext; #[derive(Clone, Debug)] pub struct NetworkDescription { pub anchor_hosts: Vec<(String, u16)>, pub chain_info: ChainInfo, pub net_versions: Vec, pub known_points: Vec<(u64, Point)>, } impl NetworkDescription { pub fn mainnet() -> Self { Self { anchor_hosts: vec![(String::from("relays-new.cardano-mainnet.iohk.io."), 3001)], chain_info: ChainInfo::MAINNET, net_versions: vec![Version::V6, Version::V7, Version::V8], known_points: vec![( 1, Point::from_raw( 0, "f0f7892b5c333cffc4b3c4344de48af4cc63f55e44936196f365a9ef2244134f", ) .unwrap(), )], } } pub fn testnet() -> Self { Self { anchor_hosts: vec![(String::from("relays-new.cardano-testnet.iohkdev.io."), 3001)], chain_info: ChainInfo::TESTNET, net_versions: vec![Version::V6, Version::V7, Version::V8], known_points: vec![ ( 1, Point::from_raw( 1031, "388a82f053603f3552717d61644a353188f2d5500f4c6354cc1ad27a36a7ea91", ) .unwrap(), ), // shelley ( 1597133, Point::from_raw( 1598400, "02b1c561715da9e540411123a6135ee319b02f60b9a11a603d3305556c04329f", ) .unwrap(), ), // after shelley ( 2349176, Point::from_raw( 19742400, "80095b5d08fe5a97581f4ba940cb91410eb88673f581fc61785608718553fab6", ) .unwrap(), ), ], } } pub fn preprod() -> Self { Self { anchor_hosts: vec![(String::from("preprod-node.world.dev.cardano.org."), 30000)], chain_info: ChainInfo::PREPROD, net_versions: vec![Version::V6, Version::V7, Version::V8], known_points: vec![( // shelley 46, Point::from_raw( 86400, "c4a1595c5cc7a31eda9e544986fe9387af4e3491afe0ca9a80714f01951bbd5c", ) .unwrap(), )], } } pub fn preview() -> Self { Self { anchor_hosts: vec![(String::from("preview-node.world.dev.cardano.org."), 30002)], chain_info: ChainInfo::PREVIEW, net_versions: vec![Version::V6, Version::V7, Version::V8], known_points: vec![( // shelley 505, Point::from_raw( 86400, "283b30cedbfc17647564abb43012bd2f17871ec0be748535c38a52088ced6a9f", ) .unwrap(), )], } } pub fn override_hosts(mut self, v: &[(String, u16)]) -> Self { self.anchor_hosts = v.to_owned(); self } } async fn resolve_name(dest: &str) -> Result, ()> { let ip = match Ipv4Addr::from_str(dest) { Ok(addr) => Some(IpAddr::V4(addr)), Err(_) => match Ipv6Addr::from_str(dest) { Ok(addr6) => Some(IpAddr::V6(addr6)), Err(_) => None, }, }; match ip { // possibly a host then None => { let resolver = TokioAsyncResolver::tokio(ResolverConfig::default(), ResolverOpts::default()) .unwrap(); let response = resolver.lookup_ip(dest).await.unwrap(); let addresses = response.iter().collect::>(); Ok(addresses) } Some(ip) => Ok(vec![ip]), } } async fn connect_to( destinations: &[(String, u16)], ) -> Result<(SocketAddr, TcpStream), Vec> { let mut errors = Vec::new(); for (dest, port) in destinations { let ip_addresses = resolve_name(dest).await.unwrap(); // try to connect from (resolved) ip addresses at the expected port for ip_addr in ip_addresses { let addr = SocketAddr::new(ip_addr, *port); debug!("trying to connect to {}:{} ({})", dest, port, ip_addr); match TcpStream::connect(&addr).await { Err(e) => errors.push(e), Ok(stream) => { info!("connected to {}:{} ({})", dest, port, ip_addr); return Ok((addr, stream)); } } } } return Err(errors); } pub struct NetworkHandle { #[allow(dead_code)] handle: tokio::task::JoinHandle<()>, /// the socket address of the opposite side pub sockaddr: SocketAddr, /// chainsync sub protocol pub chainsync: Channel, /// txsubmit sub protocol pub txsubmit: Channel, /// blockfetch sub protocol pub blockfetch: Channel, /// Stats for the stream pub stats: StreamStatsShared, } #[derive(Debug, Error)] pub enum NetworkError { #[error("protocol error {0}")] ProtocolError(#[from] ProtocolError), #[error("handshake error {0}")] HandshakeError(#[from] stream::HandshakeError), #[error("Connect I/O error {0:?}")] ConnectError(Vec), } fn channel( writer: tokio::sync::mpsc::Sender, ) -> (ChannelWriter, Channel

) { let (sender, receiver) = tokio::sync::mpsc::channel(16); let channel = RawChannel::new(P::NUMBER, writer, receiver); (ChannelWriter(sender), Channel::new(channel)) } impl NetworkHandle { /// Connect to the network with the expected parameters, perform the protocol handshake /// and in the background wait for packet to send and receive. pub async fn start( network_description: &NetworkDescription, ) -> Result { // try to connect to one of the known anchors in network_description let (sockaddr, tcp) = connect_to(&network_description.anchor_hosts) .await .map_err(|e| NetworkError::ConnectError(e))?; let (tx_sender, tx_receiver) = tokio::sync::mpsc::channel(64); let (chainsync_w, chainsync) = channel(tx_sender.clone()); let (txsubmit_w, txsubmit) = channel(tx_sender.clone()); let (blockfetch_w, blockfetch) = channel(tx_sender); let channels = HashMap::from([ (ChainSync::NUMBER, chainsync_w), (TxSubmit::NUMBER, txsubmit_w), (BlockFetch::NUMBER, blockfetch_w), ]); let stats = stream::StreamStatsShared::default(); // create a cardano stream, perform the handshake with the connected client // and then create the receiving / sending loop let mut stream = stream::Stream::new(stats.clone(), tcp, tx_receiver, channels); stream .handshake( network_description.chain_info.protocol_magic, DiffusionMode::InitiatorOnly, &network_description.net_versions, ) .await??; let handle = tokio::spawn(async move { stream.process_fragment().await }); Ok(NetworkHandle { sockaddr, handle, chainsync, blockfetch, txsubmit, stats, }) } pub async fn stop(self) { self.handle.abort() } }