// Copyright 2021-2022 Ian Jackson and contributors to Hippotat // SPDX-License-Identifier: GPL-3.0-or-later WITH LicenseRef-Hippotat-OpenSSL-Exception // There is NO WARRANTY. use super::*; #[derive(Debug)] pub struct User { pub ic: Arc, pub web: mpsc::Sender, pub route: mpsc::Sender, } pub async fn run(global: Arc, ic: Arc, mut web: mpsc::Receiver, mut routed: mpsc::Receiver) -> Result { struct Outstanding { reply_to: oneshot::Sender, oi: OutstandingInner, } #[derive(Debug)] struct OutstandingInner { deadline: Instant, target_requests_outstanding: u32, max_batch_down: u32, } let mut outstanding: VecDeque = default(); let mut downbound: PacketQueue = default(); let try_send_response = | reply_to: oneshot::Sender, response: WebResponse | { reply_to.send(response) .unwrap_or_else(|_: WebResponse| { /* oh dear */ trace!("unable to send response back to webserver! user={}", &ic.link.client); }); }; loop { let eff_max_batch_down = outstanding .iter() .map(|o| o.oi.max_batch_down) .min() .unwrap_or(ic.max_batch_down) .sat(); let earliest_deadline = outstanding .iter() .map(|o| o.oi.deadline) .min(); if let Some(req) = { let now = Instant::now(); if ! downbound.is_empty() { outstanding.pop_front() } else if let Some((i,_)) = outstanding.iter().enumerate().find({ |(_,o)| { outstanding.len() > o.oi.target_requests_outstanding.sat() || o.oi.deadline < now } }) { Some(outstanding.remove(i).unwrap()) } else { None } } { let mut build: FrameQueueBuf = default(); loop { let next = if let Some(n) = downbound.peek_front() { n } else { break }; // Don't add 1 for the ESC since we will strip one if build.len() + next.len() >= eff_max_batch_down { break } build.esc_push(downbound.pop_front().unwrap()); } if ! build.is_empty() { // skip leading ESC build.advance(1); } let response = WebResponse { data: Ok(build), warnings: default(), }; try_send_response(req.reply_to, response); } let max = usize::saturating_mul( ic.max_requests_outstanding.sat(), eff_max_batch_down, ).saturating_add(1 /* one boundary SLIP_ESC which we'll trim */); while downbound.total_len() > max { let _ = downbound.pop_front(); trace!("{} discarding downbound-queue-full", &ic.link); } select!{ biased; data = routed.recv() => { let data = data.ok_or_else(|| anyhow!("routers shut down!"))?; downbound.push_back(data.data); }, req = web.recv() => { let WebRequest { initial, initial_remaining, length_hint, mut body, boundary_finder, reply_to, conn, mut warnings, may_route, } = req.ok_or_else(|| anyhow!("webservers all shut down!"))?; match async { let initial_used = initial.len() - initial_remaining; let whole_request = read_limited_bytes( ic.max_batch_up.sat(), initial, length_hint, &mut body ).await.context("read request body")?; let (meta, mut comps) = multipart::ComponentIterator::resume_mid_component( &whole_request[initial_used..], boundary_finder ).context("resume parsing body, after auth checks")?; let mut meta = MetadataFieldIterator::new(&meta); macro_rules! meta { { $v:ident, ( $( $badcmp:tt )? ), $ret:expr, let $server:ident, $client:ident $($code:tt)* } => { let $v = (||{ let $server = ic.$v; let $client $($code)* $( if $client $badcmp $server { throw!(anyhow!("mismatch: client={:?} {} server={:?}", $client, stringify!($badcmp), $server)); } )? Ok::<_,AE>($ret) })().context(stringify!($v))?; //dbg!(&$v); } } meta!{ target_requests_outstanding, ( != ), client, let server, client: u32 = meta.need_parse()?; } meta!{ http_timeout, ( > ), client, let server, client = Duration::from_secs(meta.need_parse()?); } meta!{ mtu, ( != ), client, let server, client: u32 = meta.parse()?.unwrap_or(server); } meta!{ max_batch_down, (), min(client, server), let server, client: u32 = meta.parse()?.unwrap_or(server); } meta!{ max_batch_up, ( > ), client, let server, client = meta.parse()?.unwrap_or(server); } let _ = max_batch_up; // we don't use this further while let Some(comp) = comps.next(&mut warnings, PartName::d)? { if comp.name != PartName::d { warnings.add(&format_args!("unexpected part {:?}", comp.name))?; } slip::processn(Mime2Slip, mtu, comp.payload, |header| { let saddr = ip_packet_addr::(header)?; if saddr != ic.link.client.0 { throw!(PE::Src(saddr)) } let daddr = ip_packet_addr::(header)?; Ok(daddr) }, |(daddr,packet)| route_packet( &global, &conn, Some(&ic.link.client), daddr, packet, may_route.clone(), ).map(Ok), |e| Ok::<_,SlipFramesError<_>>({ warnings.add(&e)?; }) ).await?; } let deadline = Instant::now() + http_timeout; let oi = OutstandingInner { target_requests_outstanding, max_batch_down, deadline, }; Ok::<_,AE>(oi) }.await { Ok(oi) => outstanding.push_back(Outstanding { reply_to, oi }), Err(e) => { try_send_response(reply_to, WebResponse { data: Err(e), warnings, }); }, } } () = async {if let Some(deadline) = earliest_deadline { tokio::time::sleep_until(deadline).await; } else { future::pending().await } } => { } } } }