use std::{cell::Cell, fmt}; use ntex_bytes::{BytesVec, PoolRef}; use ntex_util::future::Either; use crate::IoRef; #[derive(Default)] pub(crate) struct Buffer(Cell>, Cell>); impl fmt::Debug for Buffer { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let b0 = self.0.take(); let b1 = self.1.take(); let res = f .debug_struct("Buffer") .field("0", &b0) .field("1", &b1) .finish(); self.0.set(b0); self.1.set(b1); res } } #[derive(Debug)] pub struct Stack { len: usize, buffers: Either<[Buffer; 3], Vec>, } impl Stack { pub(crate) fn new() -> Self { Self { len: 1, buffers: Either::Left(Default::default()), } } pub(crate) fn add_layer(&mut self) { match &mut self.buffers { Either::Left(b) => { // move to vec if self.len == 3 { let mut vec = vec![Buffer(Cell::new(None), Cell::new(None))]; for item in b.iter_mut().take(self.len) { vec.push(Buffer( Cell::new(item.0.take()), Cell::new(item.1.take()), )); } self.len += 1; self.buffers = Either::Right(vec); } else { let mut idx = self.len; while idx > 0 { let item = Buffer( Cell::new(b[idx - 1].0.take()), Cell::new(b[idx - 1].1.take()), ); b[idx] = item; idx -= 1; } b[0] = Buffer(Cell::new(None), Cell::new(None)); self.len += 1; } } Either::Right(vec) => { self.len += 1; vec.insert(0, Buffer(Cell::new(None), Cell::new(None))); } } } fn get_buffers(&self, idx: usize, f: F) -> R where F: FnOnce(&Buffer, &Buffer) -> R, { let buffers = match self.buffers { Either::Left(ref b) => &b[..], Either::Right(ref b) => &b[..], }; let next = idx + 1; if self.len > next { f(&buffers[idx], &buffers[next]) } else { let curr = Buffer(Cell::new(buffers[idx].0.take()), Cell::new(None)); let next = Buffer(Cell::new(None), Cell::new(buffers[idx].1.take())); let result = f(&curr, &next); buffers[idx].0.set(curr.0.take()); buffers[idx].1.set(next.1.take()); result } } fn get_first_level(&self) -> &Buffer { match &self.buffers { Either::Left(b) => &b[0], Either::Right(b) => &b[0], } } fn get_last_level(&self) -> &Buffer { match &self.buffers { Either::Left(b) => &b[self.len - 1], Either::Right(b) => &b[self.len - 1], } } pub(crate) fn read_buf(&self, io: &IoRef, idx: usize, nbytes: usize, f: F) -> R where F: FnOnce(&ReadBuf<'_>) -> R, { self.get_buffers(idx, |curr, next| { let buf = ReadBuf { io, nbytes, curr, next, need_write: Cell::new(false), }; f(&buf) }) } pub(crate) fn write_buf(&self, io: &IoRef, idx: usize, f: F) -> R where F: FnOnce(&WriteBuf<'_>) -> R, { self.get_buffers(idx, |curr, next| { let buf = WriteBuf { io, curr, next, need_write: Cell::new(false), }; f(&buf) }) } pub(crate) fn with_read_source(&self, io: &IoRef, f: F) -> R where F: FnOnce(&mut BytesVec) -> R, { let item = self.get_last_level(); let mut rb = item.0.take(); if rb.is_none() { rb = Some(io.memory_pool().get_read_buf()); } let result = f(rb.as_mut().unwrap()); if let Some(b) = rb { if b.is_empty() { io.memory_pool().release_read_buf(b); } else { item.0.set(Some(b)); } } result } pub(crate) fn with_read_destination(&self, io: &IoRef, f: F) -> R where F: FnOnce(&mut BytesVec) -> R, { let item = self.get_first_level(); let mut rb = item.0.take(); if rb.is_none() { rb = Some(io.memory_pool().get_read_buf()); } let result = f(rb.as_mut().unwrap()); // check nested updates if item.0.take().is_some() { log::error!("Nested read io operation is detected"); io.force_close(); } if let Some(b) = rb { if b.is_empty() { io.memory_pool().release_read_buf(b); } else { item.0.set(Some(b)); } } result } pub(crate) fn with_write_source(&self, io: &IoRef, f: F) -> R where F: FnOnce(&mut BytesVec) -> R, { let item = self.get_first_level(); let mut wb = item.1.take(); if wb.is_none() { wb = Some(io.memory_pool().get_write_buf()); } let result = f(wb.as_mut().unwrap()); if let Some(b) = wb { if b.is_empty() { io.memory_pool().release_write_buf(b); } else { item.1.set(Some(b)); } } result } pub(crate) fn with_write_destination(&self, io: &IoRef, f: F) -> R where F: FnOnce(&mut Option) -> R, { let item = self.get_last_level(); let mut wb = item.1.take(); let result = f(&mut wb); // check nested updates if item.1.take().is_some() { log::error!("Nested write io operation is detected"); io.force_close(); } if let Some(b) = wb { if b.is_empty() { io.memory_pool().release_write_buf(b); } else { item.1.set(Some(b)); } } result } pub(crate) fn read_destination_size(&self) -> usize { let item = self.get_first_level(); let rb = item.0.take(); let size = rb.as_ref().map(|b| b.len()).unwrap_or(0); item.0.set(rb); size } pub(crate) fn write_destination_size(&self) -> usize { let item = self.get_last_level(); let wb = item.1.take(); let size = wb.as_ref().map(|b| b.len()).unwrap_or(0); item.1.set(wb); size } pub(crate) fn release(&self, pool: PoolRef) { let items = match &self.buffers { Either::Left(b) => &b[..], Either::Right(b) => &b[..], }; for item in items { if let Some(buf) = item.0.take() { pool.release_read_buf(buf); } if let Some(buf) = item.1.take() { pool.release_write_buf(buf); } } } pub(crate) fn set_memory_pool(&self, pool: PoolRef) { let items = match &self.buffers { Either::Left(b) => &b[..], Either::Right(b) => &b[..], }; for item in items { if let Some(mut b) = item.0.take() { pool.move_vec_in(&mut b); item.0.set(Some(b)); } if let Some(mut b) = item.1.take() { pool.move_vec_in(&mut b); item.1.set(Some(b)); } } } } #[derive(Debug)] pub struct ReadBuf<'a> { pub(crate) io: &'a IoRef, pub(crate) curr: &'a Buffer, pub(crate) next: &'a Buffer, pub(crate) nbytes: usize, pub(crate) need_write: Cell, } impl<'a> ReadBuf<'a> { #[inline] /// Get number of newly added bytes pub fn nbytes(&self) -> usize { self.nbytes } #[inline] /// Initiate graceful io stream shutdown pub fn want_shutdown(&self) { self.io.want_shutdown() } #[inline] /// Make sure buffer has enough free space pub fn resize_buf(&self, buf: &mut BytesVec) { self.io.memory_pool().resize_read_buf(buf); } #[inline] /// Get reference to source read buffer pub fn with_src(&self, f: F) -> R where F: FnOnce(&mut Option) -> R, { let mut item = self.next.0.take(); let result = f(&mut item); if let Some(b) = item { if b.is_empty() { self.io.memory_pool().release_read_buf(b); } else { self.next.0.set(Some(b)); } } result } #[inline] /// Get reference to destination read buffer pub fn with_dst(&self, f: F) -> R where F: FnOnce(&mut BytesVec) -> R, { let mut item = self.curr.0.take(); if item.is_none() { item = Some(self.io.memory_pool().get_read_buf()); } let result = f(item.as_mut().unwrap()); if let Some(b) = item { if b.is_empty() { self.io.memory_pool().release_read_buf(b); } else { self.curr.0.set(Some(b)); } } result } #[inline] /// Take source read buffer pub fn take_src(&self) -> Option { self.next.0.take().and_then(|b| { if b.is_empty() { self.io.memory_pool().release_read_buf(b); None } else { Some(b) } }) } #[inline] /// Set source read buffer pub fn set_src(&self, src: Option) { if let Some(src) = src { if src.is_empty() { self.io.memory_pool().release_read_buf(src); } else if let Some(mut buf) = self.next.0.take() { buf.extend_from_slice(&src); self.next.0.set(Some(buf)); self.io.memory_pool().release_read_buf(src); } else { self.next.0.set(Some(src)); } } } #[inline] /// Take destination read buffer pub fn take_dst(&self) -> BytesVec { self.curr .0 .take() .unwrap_or_else(|| self.io.memory_pool().get_read_buf()) } #[inline] /// Set destination read buffer pub fn set_dst(&self, dst: Option) { if let Some(dst) = dst { if dst.is_empty() { self.io.memory_pool().release_read_buf(dst); } else if let Some(mut buf) = self.curr.0.take() { buf.extend_from_slice(&dst); self.curr.0.set(Some(buf)); self.io.memory_pool().release_read_buf(dst); } else { self.curr.0.set(Some(dst)); } } } #[inline] pub fn with_write_buf<'b, F, R>(&'b self, f: F) -> R where F: FnOnce(&WriteBuf<'b>) -> R, { let mut buf = WriteBuf { io: self.io, curr: self.curr, next: self.next, need_write: Cell::new(self.need_write.get()), }; let result = f(&mut buf); self.need_write.set(buf.need_write.get()); result } } #[derive(Debug)] pub struct WriteBuf<'a> { pub(crate) io: &'a IoRef, pub(crate) curr: &'a Buffer, pub(crate) next: &'a Buffer, pub(crate) need_write: Cell, } impl<'a> WriteBuf<'a> { #[inline] /// Initiate graceful io stream shutdown pub fn want_shutdown(&self) { self.io.want_shutdown() } #[inline] /// Make sure buffer has enough free space pub fn resize_buf(&self, buf: &mut BytesVec) { self.io.memory_pool().resize_write_buf(buf); } #[inline] /// Get reference to source write buffer pub fn with_src(&self, f: F) -> R where F: FnOnce(&mut Option) -> R, { let mut item = self.curr.1.take(); let result = f(&mut item); if let Some(b) = item { if b.is_empty() { self.io.memory_pool().release_write_buf(b); } else { self.curr.1.set(Some(b)); } } result } #[inline] /// Get reference to destination write buffer pub fn with_dst(&self, f: F) -> R where F: FnOnce(&mut BytesVec) -> R, { let mut item = self.next.1.take(); if item.is_none() { item = Some(self.io.memory_pool().get_write_buf()); } let buf = item.as_mut().unwrap(); let total = buf.len(); let result = f(buf); if buf.is_empty() { self.io.memory_pool().release_write_buf(item.unwrap()); } else { self.need_write .set(self.need_write.get() | (total != buf.len())); self.next.1.set(item); } result } #[inline] /// Take source write buffer pub fn take_src(&self) -> Option { self.curr.1.take().and_then(|b| { if b.is_empty() { self.io.memory_pool().release_write_buf(b); None } else { Some(b) } }) } #[inline] /// Set source write buffer pub fn set_src(&self, src: Option) { if let Some(src) = src { if src.is_empty() { self.io.memory_pool().release_write_buf(src); } else if let Some(mut buf) = self.curr.1.take() { buf.extend_from_slice(&src); self.curr.1.set(Some(buf)); self.io.memory_pool().release_write_buf(src); } else { self.curr.1.set(Some(src)); } } } #[inline] /// Take destination write buffer pub fn take_dst(&self) -> BytesVec { self.next .1 .take() .unwrap_or_else(|| self.io.memory_pool().get_write_buf()) } #[inline] /// Set destination write buffer pub fn set_dst(&self, dst: Option) { if let Some(dst) = dst { if dst.is_empty() { self.io.memory_pool().release_write_buf(dst); } else { self.need_write.set(true); if let Some(mut buf) = self.next.1.take() { buf.extend_from_slice(&dst); self.next.1.set(Some(buf)); self.io.memory_pool().release_write_buf(dst); } else { self.next.1.set(Some(dst)); } } } } #[inline] pub fn with_read_buf<'b, F, R>(&'b self, f: F) -> R where F: FnOnce(&ReadBuf<'b>) -> R, { let mut buf = ReadBuf { io: self.io, curr: self.curr, next: self.next, nbytes: 0, need_write: Cell::new(self.need_write.get()), }; let result = f(&mut buf); self.need_write.set(buf.need_write.get()); result } }