// Copyright 2021-2022 Ian Jackson and contributors to Hippotat // SPDX-License-Identifier: GPL-3.0-or-later WITH LicenseRef-Hippotat-OpenSSL-Exception // There is NO WARRANTY. #![allow(clippy::style)] #![allow(clippy::unit_arg)] #![allow(clippy::useless_format)] #![allow(clippy::while_let_loop)] use hippotat::prelude::*; mod daemon; mod suser; mod slocal; mod sweb; pub use daemon::Daemoniser; pub use sweb::{WebRequest, WebResponse, WebResponseBody}; pub use suser::User; #[derive(clap::Parser,Debug)] pub struct Opts { #[clap(flatten)] pub log: LogOpts, #[clap(flatten)] pub config: config::CommonOpts, /// Daemonise #[clap(long)] daemon: bool, /// Write our pid to this file #[clap(long)] pidfile: Option, /// Print config item(s), do not actually run /// /// Argument is (comma-separated) list of config keys; /// values will be printed tab-separated. /// The key `pretty` dumps the whole config in a pretty debug format. /// /// If none of the specified config keys are client-specific, /// only one line will be printed. Otherwise the output will /// have one line per client association. /// /// Additional pseudo-config-keys are recognised: /// `client`: our client virtual IP address; /// `server`: server's logical name in the config; /// `link`: the link name including the `[ ]`. #[clap(long)] print_config: Option, } pub const METADATA_MAX_LEN: usize = MAX_OVERHEAD; // ----- Backpressure discussion ----- // These two kinds of channels are sent blockingly, so this means the // task which calls route_packet can get this far ahead, before a // context switch to the receiving task is forced. pub const MAXQUEUE_ROUTE2USER: usize = 15; pub const MAXQUEUE_ROUTE2LOCAL: usize = 50; // This channel is sent with try_send, ie non-blocking. If the user // task becomes overloaded, requests will start to be rejected. pub const MAXQUEUE_WEBREQ2USER: usize = 5; // The user task prioritises 1. returning requests or discarding data, // 2. handling data routed to it. Ie it prefers to drain queues. // // The slocal task prioritises handling routed data and writing it // (synchronously) to the local kernel. So if the local kernel starts // blocking, all tasks may end up blocked waiting for things to drain. #[derive(Debug)] pub struct Global { config: config::InstanceConfigGlobal, local_rx: mpsc::Sender, all_clients: HashMap, } pub struct RoutedPacket { pub data: RoutedPacketData, // pub source: Option, // for eh, tracing, etc. } // not MIME data, valid SLIP (checked) pub type RoutedPacketData = Box<[u8]>; // loop prevention // we don't decrement the ttl (naughty) but loops cannot arise // because only the server has any routing code, and server // has no internal loops, so worst case is // client if -> client -> server -> client' -> client if' // and the ifs will decrement the ttl. mod may_route { #[derive(Clone,Debug)] pub struct MayRoute(()); impl MayRoute { pub fn came_from_outside_hippotatd() -> Self { Self(()) } } } pub use may_route::MayRoute; pub async fn route_packet(global: &Global, transport_conn: &str, source: Option<&ClientName>, packet: RoutedPacketData, daddr: IpAddr, _may_route: MayRoute) { let c = &global.config; let len = packet.len(); let trace = |how: &str, why: &str| { trace!("{} to={:?} came=={} user={} len={} {}", how, daddr, transport_conn, match source { Some(s) => s as &dyn Display, None => &"local", }, len, why); }; let (dest, why) = if daddr == c.vaddr || ! c.vnetwork.iter().any(|n| n.contains(&daddr)) { (Some(&global.local_rx), "via=local") } else if daddr == c.vrelay { (None, " vrelay") } else if let Some(client) = global.all_clients.get(&ClientName(daddr)) { (Some(&client.route), "via=client") } else { (None, "no-client") }; let dest = if let Some(d) = dest { d } else { trace("discard ", why); return; }; let packet = RoutedPacket { data: packet, // source: source.cloned(), }; match dest.send(packet).await { Ok(()) => trace("forward", why), Err(_) => trace("task-crashed!", why), } } fn main() { let opts = ::parse(); let daemon = if opts.daemon && opts.print_config.is_none() { Some(Daemoniser::phase1()) } else { None }; async_main(opts, daemon); } #[tokio::main] async fn async_main(opts: Opts, daemon: Option) { let mut tasks: Vec<( JoinHandle, String, )> = vec![]; config::startup( "hippotatd", LinkEnd::Server, &opts.config, &opts.log, |server_name, ics| { let server_name = server_name.expect("LinkEnd::Server didn't do its job"); let pc = PrintConfigOpt(&opts.print_config); if ics.is_empty() { pc.implement(ics)?; return Ok(None); } let global_config = config::InstanceConfigGlobal::from(&ics); let gc = (&server_name, &global_config); if pc.keys().all(|k| gc.inspect_key(k).is_some()) { pc.implement([&gc])?; } else { pc.implement(ics)?; } Ok(Some(global_config)) }, |global_config, ics| { let global_config = global_config.expect("some instances"); if let Some(pidfile_path) = opts.pidfile.as_ref() { (||{ let mut pidfile = fs::File::create(pidfile_path).context("create")?; writeln!(pidfile, "{}", process::id()).context("write")?; pidfile.flush().context("write (flush)")?; Ok::<_,AE>(()) })().with_context(|| format!("pidfile {:?}", pidfile_path))?; } let ipif = Ipif::start(&global_config.ipif, None)?; let ics = ics.into_iter().map(Arc::new).collect_vec(); let (client_handles_send, client_handles_recv) = ics.iter() .map(|_ic| { let (web_send, web_recv) = mpsc::channel( MAXQUEUE_WEBREQ2USER ); let (route_send, route_recv) = mpsc::channel( MAXQUEUE_ROUTE2USER ); ((web_send, route_send), (web_recv, route_recv)) }).unzip::<_,_,Vec<_>,Vec<_>>(); let all_clients = izip!( &ics, client_handles_send, ).map(|(ic, (web_send, route_send))| { (ic.link.client, User { ic: ic.clone(), web: web_send, route: route_send, }) }).collect(); let (local_rx_send, local_tx_recv) = mpsc::channel( MAXQUEUE_ROUTE2LOCAL ); let global = Arc::new(Global { config: global_config, local_rx: local_rx_send, all_clients, }); for (ic, (web_recv, route_recv)) in izip!( ics, client_handles_recv, ) { let global_ = global.clone(); let ic_ = ic.clone(); tasks.push((tokio::spawn(async move { suser::run(global_, ic_, web_recv, route_recv) .await.void_unwrap_err() }), format!("client {}", &ic))); } for addr in &global.config.addrs { let global_ = global.clone(); let make_service = hyper::service::make_service_fn( move |conn: &hyper::server::conn::AddrStream| { let global_ = global_.clone(); let conn = Arc::new(format!("[{}]", conn.remote_addr())); async { Ok::<_, Void>( hyper::service::service_fn(move |req| { AssertUnwindSafe( sweb::handle(conn.clone(), global_.clone(), req) ) .catch_unwind() .map(|r| r.unwrap_or_else(|_|{ crash(Err("panicked".into()), "webserver request task") })) }) ) } } ); let addr = SocketAddr::new(*addr, global.config.port); let server = hyper::Server::try_bind(&addr) .context("bind")? .http1_preserve_header_case(true) .serve(make_service); info!("listening on {}", &addr); let task = tokio::task::spawn(async move { match server.await { Ok(()) => anyhow!("shut down?!"), Err(e) => e.into(), } }); tasks.push((task, format!("http server {}", addr))); } #[allow(clippy::redundant_clone)] let global_ = global.clone(); let ipif = tokio::task::spawn(async move { slocal::run(global_, local_tx_recv, ipif).await .void_unwrap_err() }); tasks.push((ipif, format!("ipif"))); Ok(()) }); if let Some(daemon) = daemon { daemon.complete(); } let (output, died_i, _) = future::select_all( tasks.iter_mut().map(|e| &mut e.0) ).await; let task = &tasks[died_i].1; let output = output.map_err(|je| je.to_string()); crash(output, task); } pub fn crash(what_happened: Result, task: &str) -> ! { match what_happened { Err(je) => error!("task crashed! {}: {}", task, &je), Ok(e) => error!("task failed! {}: {}", task, &e ), } process::exit(12); } #[test] fn verify_cli() { hippotat::utils::verify_cli::(); }