// Copyright 2020-2021 Ian Jackson and contributors to Otter // SPDX-License-Identifier: AGPL-3.0-or-later // There is NO WARRANTY. #![allow(clippy::while_let_loop)] #![allow(clippy::blocks_in_if_conditions)] use otter::prelude::*; use super::*; // ---------- basic definitions ---------- const UPDATE_READER_SIZE: usize = 1024*32; const UPDATE_KEEPALIVE: Duration = Duration::from_secs(14); const UPDATE_EXPIRE: Duration = Duration::from_secs(66); struct UpdateReaderWN { player: PlayerId, client: ClientId, to_send: UpdateId, } #[derive(Deref)] // no DerefMut struct UpdateReader { #[deref] wn: UpdateReaderWN, overflow: Option>>, gref: InstanceRef, keepalives: Wrapping, ending_send: Option>>, init_confirmation_send: iter::Once<()>, } impl UpdateReaderWN { #[throws(io::Error)] fn write_next(&mut self, mut buf: &mut U, tz: &Timezone, next: &PreparedUpdate) where U: Write { let tu = next.for_transmit(tz, self.player, self.client); write!(buf, "data: ")?; serde_json::to_writer(&mut buf, &tu)?; write!(buf, "\n\ id: {}\n\n", self.to_send)?; debug!("sending to {:?} {:?}: #{} {:?}", &self.player, &self.client, self.to_send, &tu); self.to_send.try_increment().unwrap(); } fn trouble(&self, m: &'static str, info: T) -> io::Error { error!("update sending error: {}: {} {}: {:?}", m, &self.player, &self.client, &info); io::Error::new(io::ErrorKind::Other, anyhow!("internal error")) } } type BufForSend = Vec; #[derive(Debug, Default)] struct BufForRead { buf: Vec, } impl Write for BufForRead { #[throws(io::Error)] fn write(&mut self, d: &[u8]) -> usize { self.buf.write(d)? } #[throws(io::Error)] fn flush(&mut self) { self.buf.flush()? } } impl BufForRead { fn reset_to_start(&mut self) { self.buf.truncate(0) } fn at_start(&self) -> bool { self.buf.len() == 0 } fn len(&self) -> usize { self.buf.len() } fn copy_from(&mut self, mut read: R) { let rbuf = read.fill_buf().unwrap(); let did = self.write(rbuf).unwrap(); read.consume(did); } fn just_copy_from(mut self, read: R) -> BufForSend { self.copy_from(read); self.finish() } fn finish(self) -> BufForSend { assert!(! self.buf.is_empty()); self.buf } fn finish_eof() -> BufForSend { vec![] } } trait InfallibleBufRead: BufRead { } impl InfallibleBufRead for io::Cursor where io::Cursor: BufRead { } impl InfallibleBufRead for &mut T where T: InfallibleBufRead { } #[derive(Error,Debug)] pub enum SSEUpdateGenerationError { ImpossibleIoWriteError(#[from] io::Error), // write of Vec failed GameBeingDestroyed(#[from] GameBeingDestroyed), } display_as_debug!{SSEUpdateGenerationError} impl UpdateReader { #[throws(SSEUpdateGenerationError)] async fn read(&mut self) -> BufForSend { let mut buf = BufForRead::default(); if let Some(ref mut ending) = self.ending_send { return buf.just_copy_from(ending); } let mut ig = self.gref.lock()?; if self.init_confirmation_send.next().is_some() { write!(buf, "event: commsworking\n\ data: init {} {} G{}\n\n", self.player, self.client, ig.gs.gen)?; } let g = &mut *ig; let iplayer = &mut match g.iplayers.get_mut(self.player) { Some(x) => x, None => { // Ideally this would be handled by us throwing, and // the content unfold handling the error. let data = format!("event: player-gone\n\ data: No longer in the game\n\n") .into_bytes().into_boxed_slice(); assert_eq!(self.ending_send, None); let ending = self.ending_send.get_or_insert(io::Cursor::new(data)); buf.reset_to_start(); return buf.just_copy_from(ending); }, }; let pu = &mut iplayer.u; loop { if let Some(ref mut overflow) = self.overflow { buf.copy_from(&mut *overflow); if usize::try_from(overflow.position()).unwrap() == overflow.get_ref().len() { self.overflow = None } debug!("read from overflow {} {}", &self.player, &self.client); } let next = match pu.read_log().get(self.to_send) { Some(next) => next, None => { if self.to_send < pu.read_log().front_index() && buf.at_start() { write!(buf, "event: updates-expired\ndata: {}\n\n", self.to_send) .map_err(|e| self.wn.trouble("notify updates expired", &e))?; debug!("updates expired for {} {}, telling client (#{})", &self.wn.player, &self.wn.client, self.to_send); self.wn.to_send = UpdateId::max_value(); // ^ just stops us spewing, hopefully client will notice } break } }; self.wn.write_next(&mut buf, &iplayer.ipl.tz, next) .map_err(|e| self.wn.trouble("UpdateReader.write_next",&e))?; if buf.len() >= UPDATE_READER_SIZE { return buf.finish() } let before = next.when - UPDATE_EXPIRE; pu.expire_upto(before); } let cv = pu.get_cv(); if buf.len() > 0 { return buf.finish(); } if (||{ (*ig).gs.players.get(self.player)?; let client = ig.clients.get_mut(self.client)?; client.lastseen = Instant::now(); Some(()) })() == None { return BufForRead::finish_eof() } let was_gen = ig.gs.gen; match tokio::time::timeout( UPDATE_KEEPALIVE, cv.wait_no_relock(ig.c) ).await { Err(_elapsed) => { }, Ok(baton) => baton.dispose(), }; write!(buf, "event: commsworking\n\ data: online {} {} G{}\n\n", self.player, self.client, was_gen)?; self.keepalives += Wrapping(1); return buf.finish(); } } // ---------- entrypoint for dribbling the http response ---------- #[throws(Fatal)] pub fn content(iad: InstanceAccessDetails, gen: Generation) -> Pin>>> { let client = iad.ident; let update_reader = { let mut g = iad.gref.lock()?; let _g = &mut g.gs; let cl = g.clients.byid(client)?; let player = cl.player; trace!("updates content iad={:?} player={:?} cl={:?}", &iad, &player, &cl); let gref = iad.gref.clone(); let log = &g.iplayers.byid(player)?.u.read_log(); let to_send = match log.into_iter().rev() .find(|(_,update)| update.gen <= gen) { None => UpdateId::min_value(), Some((mut i,_)) => { i.try_increment(); i }, }; UpdateReader { keepalives: Wrapping(0), overflow: None, gref, ending_send: default(), init_confirmation_send: iter::once(()), wn: UpdateReaderWN { player, client, to_send, }, } }; Box::pin(futures::stream::try_unfold( update_reader, |mut update_reader| async { let got = update_reader.read().await?; Ok(if got.len() > 0 { Some((Bytes::from(got), update_reader)) } else { None }) })) as _ }