use std::{env::args, io::stderr, sync::mpsc, thread}; use ip::{traits::PrefixSet as _, Any, Prefix, PrefixSet}; use irrc::{Error, IrrClient, Query, ResponseItem}; use rpsl::names::{AsSet, AutNum}; struct Collector { sender: Option>>, join_handle: thread::JoinHandle>, } impl Collector { fn spawn() -> Self { let (tx, rx) = mpsc::channel(); let sender = Some(tx); tracing::debug!("starting collector thread"); let join_handle = thread::spawn(move || { rx.iter() .inspect(|prefix| tracing::trace!("adding prefix {} to prefix set", prefix)) .collect::>() }); Self { sender, join_handle, } } fn sender(&mut self) -> Option { self.sender.take().map(Sender) } fn print(self) { tracing::debug!("trying to join collector thread"); match self.join_handle.join() { Ok(set) => set.ranges().for_each(|range| println!("{}", range)), Err(err) => tracing::error!("failed to join set builder thread: {:?}", err), } } } #[derive(Clone)] struct Sender(mpsc::Sender>); impl Sender { fn collect(&self, item: ResponseItem>) { let prefix = item.into_content(); tracing::trace!("sending prefix {prefix} to collector"); if let Err(err) = self.0.send(prefix) { tracing::warn!("failed to send prefix to collector: {}", err); } } } struct QueryThread(thread::JoinHandle>); impl QueryThread { fn spawn(host: String, object: AsSet, collector: &mut Collector) -> Self { let sender = collector .sender() .expect("failed to take collector send handle"); let join_handle = thread::spawn(move || -> Result<(), Error> { IrrClient::new(host) .connect()? .pipeline_from_initial(Query::AsSetMembersRecursive(object), |item| { item.map(into_routes_queries).map_err(log_warning).ok() })? .responses() .filter_map(|item| item.map_err(log_warning).ok()) .for_each(|item| sender.collect(item)); Ok(()) }); Self(join_handle) } fn join(self) -> Result<(), Error> { self.0.join().expect("failed to join query thread") } } fn log_warning(err: E) -> E { tracing::warn!("failed to parse item: {}", err); err } fn into_routes_queries(item: ResponseItem) -> [Query; 2] { let autnum = item.into_content(); [Query::Ipv4Routes(autnum), Query::Ipv6Routes(autnum)] } fn main() -> Result<(), Box> { tracing_subscriber::fmt() .with_max_level(tracing::Level::WARN) .with_writer(stderr) .try_init()?; let args: Vec = args().collect(); let host = format!("{}:43", args[1]); let object = args[2].parse().unwrap(); let mut collector = Collector::spawn(); let query_thread = QueryThread::spawn(host, object, &mut collector); query_thread.join()?; collector.print(); Ok(()) }