#![warn(rust_2018_idioms)] #![cfg(all(target_os = "freebsd", feature = "net"))] use mio_aio::{AioFsyncMode, SourceApi}; use std::{ future::Future, io, mem, os::unix::io::{AsRawFd, RawFd}, pin::{pin, Pin}, task::{Context, Poll}, }; use tempfile::tempfile; use tokio::io::bsd::{Aio, AioSource}; use tokio_test::assert_pending; mod aio { use super::*; #[derive(Debug)] struct TokioSource(mio_aio::Source); impl AioSource for TokioSource { fn register(&mut self, kq: RawFd, token: usize) { self.0.register_raw(kq, token) } fn deregister(&mut self) { self.0.deregister_raw() } } /// A very crude implementation of an AIO-based future struct FsyncFut(Aio); impl FsyncFut { pub fn submit(self: Pin<&mut Self>) -> io::Result<()> { let p = unsafe { self.map_unchecked_mut(|s| &mut s.0 .0) }; match p.submit() { Ok(()) => Ok(()), Err(e) => Err(io::Error::from_raw_os_error(e as i32)), } } } impl Future for FsyncFut { type Output = io::Result<()>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let poll_result = self.0.poll_ready(cx); match poll_result { Poll::Pending => Poll::Pending, Poll::Ready(Err(e)) => Poll::Ready(Err(e)), Poll::Ready(Ok(_ev)) => { // At this point, we could clear readiness. But there's no // point, since we're about to drop the Aio. let p = unsafe { self.map_unchecked_mut(|s| &mut s.0 .0) }; let result = p.aio_return(); match result { Ok(r) => Poll::Ready(Ok(r)), Err(e) => Poll::Ready(Err(io::Error::from_raw_os_error(e as i32))), } } } } } /// Low-level AIO Source /// /// An example bypassing mio_aio and Nix to demonstrate how the kevent /// registration actually works, under the hood. struct LlSource(Pin>); impl LlSource { fn fsync(mut self: Pin<&mut Self>) { let r = unsafe { let p = self.0.as_mut().get_unchecked_mut(); libc::aio_fsync(libc::O_SYNC, p) }; assert_eq!(0, r); } } impl AioSource for LlSource { fn register(&mut self, kq: RawFd, token: usize) { let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() }; sev.sigev_notify = libc::SIGEV_KEVENT; sev.sigev_signo = kq; sev.sigev_value = libc::sigval { sival_ptr: token as *mut libc::c_void, }; self.0.aio_sigevent = sev; } fn deregister(&mut self) { unsafe { self.0.aio_sigevent = mem::zeroed(); } } } struct LlFut(Aio); impl LlFut { pub fn fsync(self: Pin<&mut Self>) { let p = unsafe { self.map_unchecked_mut(|s| &mut *(s.0)) }; p.fsync(); } } impl Future for LlFut { type Output = std::io::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let poll_result = self.0.poll_ready(cx); match poll_result { Poll::Pending => Poll::Pending, Poll::Ready(Err(e)) => Poll::Ready(Err(e)), Poll::Ready(Ok(ev)) => { // Clearing readiness makes the future non-idempotent; the // caller can't poll it repeatedly after it has already // returned Ready. But that's ok; most futures behave this // way. self.0.clear_ready(ev); let r = unsafe { libc::aio_return(self.0 .0.as_mut().get_unchecked_mut()) }; if r >= 0 { Poll::Ready(Ok(r as usize)) } else { Poll::Ready(Err(io::Error::last_os_error())) } } } } } #[tokio::test] async fn fsync() { let f = tempfile().unwrap(); let fd = f.as_raw_fd(); let mode = AioFsyncMode::O_SYNC; let source = TokioSource(mio_aio::Fsync::fsync(fd, mode, 0)); let poll_aio = Aio::new_for_aio(source).unwrap(); let mut fut = pin!(FsyncFut(poll_aio)); fut.as_mut().submit().unwrap(); fut.await.unwrap(); } #[tokio::test] async fn ll_fsync() { let f = tempfile().unwrap(); let fd = f.as_raw_fd(); let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() }; aiocb.aio_fildes = fd; let source = LlSource(Box::pin(aiocb)); let mut poll_aio = Aio::new_for_aio(source).unwrap(); let r = unsafe { let p = poll_aio.0.as_mut().get_unchecked_mut(); libc::aio_fsync(libc::O_SYNC, p) }; assert_eq!(0, r); let fut = LlFut(poll_aio); fut.await.unwrap(); } /// A suitably crafted future type can reuse an Aio object #[tokio::test] async fn reuse() { let f = tempfile().unwrap(); let fd = f.as_raw_fd(); let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() }; aiocb.aio_fildes = fd; let source = LlSource(Box::pin(aiocb)); let poll_aio = Aio::new_for_aio(source).unwrap(); // Send the operation to the kernel the first time let mut fut = LlFut(poll_aio); { let mut pfut = Pin::new(&mut fut); pfut.as_mut().fsync(); pfut.as_mut().await.unwrap(); } // Check that readiness was cleared let mut ctx = Context::from_waker(futures::task::noop_waker_ref()); assert_pending!(fut.0.poll_ready(&mut ctx)); // and reuse the future and its Aio object { let mut pfut = Pin::new(&mut fut); pfut.as_mut().fsync(); pfut.as_mut().await.unwrap(); } } } mod lio { use super::*; /// Low-level source based on lio_listio /// /// An example demonstrating using AIO with `Interest::Lio`. mio_aio 0.8 /// doesn't include any bindings for lio_listio, so we've got to go /// low-level. struct LioSource<'a> { aiocb: Pin<&'a mut [&'a mut libc::aiocb; 1]>, sev: libc::sigevent, } impl<'a> LioSource<'a> { fn new(aiocb: Pin<&'a mut [&'a mut libc::aiocb; 1]>) -> Self { LioSource { aiocb, sev: unsafe { mem::zeroed() }, } } fn submit(mut self: Pin<&mut Self>) { let p: *const *mut libc::aiocb = unsafe { self.aiocb.as_mut().get_unchecked_mut() } as *const _ as *const *mut _; let r = unsafe { libc::lio_listio(libc::LIO_NOWAIT, p, 1, &mut self.sev) }; assert_eq!(r, 0); } } impl<'a> AioSource for LioSource<'a> { fn register(&mut self, kq: RawFd, token: usize) { let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() }; sev.sigev_notify = libc::SIGEV_KEVENT; sev.sigev_signo = kq; sev.sigev_value = libc::sigval { sival_ptr: token as *mut libc::c_void, }; self.sev = sev; } fn deregister(&mut self) { unsafe { self.sev = mem::zeroed(); } } } struct LioFut<'a>(Aio>); impl<'a> LioFut<'a> { pub fn submit(self: Pin<&mut Self>) { let p = unsafe { self.map_unchecked_mut(|s| &mut *(s.0)) }; p.submit(); } } impl<'a> Future for LioFut<'a> { type Output = std::io::Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let poll_result = self.0.poll_ready(cx); match poll_result { Poll::Pending => Poll::Pending, Poll::Ready(Err(e)) => Poll::Ready(Err(e)), Poll::Ready(Ok(ev)) => { // Clearing readiness makes the future non-idempotent; the // caller can't poll it repeatedly after it has already // returned Ready. But that's ok; most futures behave this // way. Clearing readiness is especially useful for // lio_listio, because sometimes some operations will be // ready but not all. self.0.clear_ready(ev); let r = unsafe { let p1 = self.get_unchecked_mut(); let p2: &mut [&mut libc::aiocb; 1] = p1.0.aiocb.as_mut().get_unchecked_mut(); let p3: &mut libc::aiocb = p2[0]; libc::aio_return(p3) }; if r >= 0 { Poll::Ready(Ok(r as usize)) } else { Poll::Ready(Err(io::Error::last_os_error())) } } } } } /// An lio_listio operation with one fsync element #[tokio::test] async fn onewrite() { const WBUF: &[u8] = b"abcdef"; let f = tempfile().unwrap(); let mut aiocb: libc::aiocb = unsafe { mem::zeroed() }; aiocb.aio_fildes = f.as_raw_fd(); aiocb.aio_lio_opcode = libc::LIO_WRITE; aiocb.aio_nbytes = WBUF.len(); aiocb.aio_buf = WBUF.as_ptr() as *mut _; let aiocb = pin!([&mut aiocb]); let source = LioSource::new(aiocb); let poll_aio = Aio::new_for_lio(source).unwrap(); // Send the operation to the kernel let mut fut = pin!(LioFut(poll_aio)); fut.as_mut().submit(); fut.await.unwrap(); } /// A suitably crafted future type can reuse an Aio object #[tokio::test] async fn reuse() { const WBUF: &[u8] = b"abcdef"; let f = tempfile().unwrap(); let mut aiocb: libc::aiocb = unsafe { mem::zeroed() }; aiocb.aio_fildes = f.as_raw_fd(); aiocb.aio_lio_opcode = libc::LIO_WRITE; aiocb.aio_nbytes = WBUF.len(); aiocb.aio_buf = WBUF.as_ptr() as *mut _; let aiocb = pin!([&mut aiocb]); let source = LioSource::new(aiocb); let poll_aio = Aio::new_for_lio(source).unwrap(); // Send the operation to the kernel the first time let mut fut = LioFut(poll_aio); { let mut pfut = Pin::new(&mut fut); pfut.as_mut().submit(); pfut.as_mut().await.unwrap(); } // Check that readiness was cleared let mut ctx = Context::from_waker(futures::task::noop_waker_ref()); assert_pending!(fut.0.poll_ready(&mut ctx)); // and reuse the future and its Aio object { let mut pfut = Pin::new(&mut fut); pfut.as_mut().submit(); pfut.as_mut().await.unwrap(); } } }