// Copyright 2021-2022 Ian Jackson and contributors to Hippotat // SPDX-License-Identifier: GPL-3.0-or-later WITH LicenseRef-Hippotat-OpenSSL-Exception // There is NO WARRANTY. #![allow(clippy::style)] #![allow(clippy::unit_arg)] #![allow(clippy::useless_format)] #![allow(clippy::while_let_loop)] use hippotat::prelude::*; use hippotat_macros::into_crlfs; #[derive(clap::Parser,Debug)] pub struct Opts { #[clap(flatten)] log: LogOpts, #[clap(flatten)] config: config::CommonOpts, /// Print config item(s), do not actually run /// /// Argument is (comma-separated) list of config keys; /// values will be printed tab-separated. /// The key `pretty` dumps the whole config in a pretty debug format. /// /// One line is output for each association. /// Additional pseudo-config-keys are recognised: /// `client`: our client virtual IP address; /// `server`: server's logical name in the config; /// `link`: the link name including the `[ ]`. #[clap(long)] print_config: Option, } type OutstandingRequest<'r> = Pin>> + Send + 'r >>; impl Hcc for T where T: hyper::client::connect::Connect + Clone + Send + Sync + 'static { } trait Hcc: hyper::client::connect::Connect + Clone + Send + Sync + 'static { } struct ClientContext<'c,C> { ic: &'c InstanceConfig, hclient: &'c Arc>, reporter: &'c parking_lot::Mutex>, } #[derive(Debug)] struct TxQueued { expires: Instant, data: Box<[u8]>, } #[throws(AE)] fn submit_request<'r, 'c:'r, C:Hcc>( c: &'c ClientContext, req_num: &mut ReqNum, reqs: &mut Vec>, upbound: FramesData, ) { let show_timeout = c.ic.http_timeout .saturating_add(Duration::from_nanos(999_999_999)) .as_secs(); let time_t = time_t_now(); let time_t = format!("{:x}", time_t); let hmac = token_hmac(c.ic.secret.0.as_bytes(), time_t.as_bytes()); //dbg!(DumpHex(&hmac)); let mut token = time_t; write!(token, " ").unwrap(); BASE64_CONFIG.encode_string(hmac, &mut token); let req_num = { *req_num += 1; *req_num }; let prefix1 = format!(into_crlfs!( r#"--b Content-Type: text/plain; charset="utf-8" Content-Disposition: form-data; name="m" {} {} {} {} {} {} {}"#), &c.ic.link.client, token, c.ic.target_requests_outstanding, show_timeout, c.ic.mtu, c.ic.max_batch_down, c.ic.max_batch_up, ); let prefix2 = format!(into_crlfs!( r#" --b Content-Type: application/octet-stream Content-Disposition: form-data; name="d" "#), ); let suffix = format!(into_crlfs!( r#" --b-- "#), ); macro_rules! content { { $out:ty, $iter:ident, $into:ident, } => { itertools::chain![ IntoIterator::into_iter([ prefix1.$into(), prefix2.$into(), ]).take( if upbound.is_empty() { 1 } else { 2 } ), Itertools::intersperse( upbound.$iter().map(|u| { let out: $out = u.$into(); out }), SLIP_END_SLICE.$into() ), [ suffix.$into() ], ] }} let body_len: usize = content!( &[u8], iter, as_ref, ).map(|b| b.len()).sum(); trace!("{} #{}: req; tx body_len={} frames={}", &c.ic, req_num, body_len, upbound.len()); let body = hyper::body::Body::wrap_stream( futures::stream::iter( content!( Bytes, into_iter, into, ).map(Ok::) ) ); let req = hyper::Request::post(&c.ic.url) .header("Content-Type", r#"multipart/form-data; boundary="b""#) .header("Content-Length", body_len) .body(body) .context("construct request")?; let resp = c.hclient.request(req); let fut = Box::pin(async move { let r = async { tokio::time::timeout( c.ic.effective_http_timeout, async { let resp = resp.await.context("make request")?; let status = resp.status(); let mut resp = resp.into_body(); let max_body = c.ic.max_batch_down.sat() + MAX_OVERHEAD; let resp = read_limited_bytes( max_body, default(), default(), &mut resp ).await .discard_data().context("fetching response body")?; if ! status.is_success() { throw!(anyhow!("HTTP error status={} body={:?}", &status, String::from_utf8_lossy(&resp))); } Ok::<_,AE>(resp) }).await? }.await; let r = c.reporter.lock().filter(Some(req_num), r); if let Some(r) = &r { trace!("{} #{}: rok; rx bytes={}", &c.ic, req_num, r.len()); } else { tokio::time::sleep(c.ic.http_retry).await; } r }); reqs.push(fut); } async fn run_client( ic: InstanceConfig, hclient: Arc> ) -> Result { debug!("{}: config: {:?}", &ic, &ic); let reporter = parking_lot::Mutex::new(Reporter::new(&ic)); let c = ClientContext { reporter: &reporter, hclient: &hclient, ic: &ic, }; let mut ipif = Ipif::start(&ic.ipif, Some(ic.to_string()))?; let mut req_num: ReqNum = 0; let mut tx_queue: VecDeque = default(); let mut upbound = Frames::default(); let mut reqs: Vec = Vec::with_capacity(ic.max_requests_outstanding.sat()); let mut rx_queue: FrameQueueBuf = default(); let trouble = async { loop { let rx_queue_space = if rx_queue.remaining() < ic.max_batch_down.sat() { Ok(()) } else { Err(()) }; select! { biased; y = ipif.rx.write_all_buf(&mut rx_queue), if ! rx_queue.is_empty() => { let () = y.context("write rx data to ipif")?; }, () = async { let expires = tx_queue.front().unwrap().expires; tokio::time::sleep_until(expires).await }, if ! tx_queue.is_empty() => { let _ = tx_queue.pop_front(); }, data = Ipif::next_frame(&mut ipif.tx), if tx_queue.is_empty() => { let data = data?; //eprintln!("data={:?}", DumpHex(&data)); match slip::process1(Slip2Mime, ic.mtu, &data, |header| { let saddr = ip_packet_addr::(header)?; if saddr != ic.link.client.0 { throw!(PE::Src(saddr)) } Ok(()) }) { Ok((data, ())) => tx_queue.push_back(TxQueued { data, expires: Instant::now() + ic.max_queue_time }), Err(PE::Empty) => { }, Err(e@ PE::Src(_)) => debug!("{}: tx discarding: {}", &ic, e), Err(e) => error!("{}: tx discarding: {}", &ic, e), }; }, _ = async { }, if ! upbound.tried_full() && ! tx_queue.is_empty() => { while let Some(TxQueued { data, expires }) = tx_queue.pop_front() { match upbound.add(ic.max_batch_up, data.into()/*todo:504*/) { Err(data) => { tx_queue.push_front(TxQueued { data: data.into(), expires }); break; } Ok(()) => { }, } } }, _ = async { }, if rx_queue_space.is_ok() && (reqs.len() < ic.target_requests_outstanding.sat() || (reqs.len() < ic.max_requests_outstanding.sat() && ! upbound.is_empty())) => { submit_request(&c, &mut req_num, &mut reqs, mem::take(&mut upbound).into())?; }, (got, goti, _) = async { future::select_all(&mut reqs).await }, if ! reqs.is_empty() => { // This future was Ready and has returned the value, // which is in `got`. We don't want the completed future. let _: Pin>> = reqs.swap_remove(goti); if let Some(got) = got { //eprintln!("got={:?}", DumpHex(&got)); match slip::processn(SlipNoConv,ic.mtu, &got, |header| { let addr = ip_packet_addr::(header)?; if addr != ic.link.client.0 { throw!(PE::Dst(addr)) } Ok(()) }, |(o,())| future::ready(Ok({ rx_queue.push_esc(o); })), |e| Ok::<_,SlipFramesError>( { error!("{} #{}: rx discarding: {}", &ic, req_num, e); })).await { Ok(()) => reporter.lock().success(), Err(SlipFramesError::ErrorOnlyBad) => { reqs.push(Box::pin(async { tokio::time::sleep(ic.http_retry).await; None })); }, Err(SlipFramesError::Other(v)) => unreachable!("{}", v), } } }, _ = tokio::time::sleep(c.ic.effective_http_timeout), if rx_queue_space.is_err() => { reporter.lock().filter(None, Err::( anyhow!("rx queue full, blocked") )); }, } } }.await; ipif.quitting(Some(&ic)).await; trouble } #[tokio::main] async fn main() { let opts = ::parse(); let (ics,) = config::startup( "hippotat", LinkEnd::Client, &opts.config, &opts.log, |_, ics| { PrintConfigOpt(&opts.print_config) .implement(ics, )?; Ok(()) }, |_, ics| { Ok((ics,)) }); let https = HttpsConnector::new(); let hclient = hyper::Client::builder() .http1_preserve_header_case(true) .build::<_, hyper::Body>(https); let hclient = Arc::new(hclient); info!("starting"); let () = future::select_all( ics.into_iter().map(|ic| Box::pin(async { let assocname = ic.to_string(); info!("{} starting", &assocname); let hclient = hclient.clone(); let join = task::spawn(async { run_client(ic, hclient).await.void_unwrap_err() }); match join.await { Ok(e) => { error!("{} failed: {}", &assocname, e); }, Err(je) => { error!("{} panicked!", &assocname); panic::resume_unwind(je.into_panic()); }, } })) ).await.0; error!("quitting because one of your client connections crashed"); process::exit(16); } #[test] fn verify_cli() { hippotat::utils::verify_cli::(); }