#![warn(rust_2018_idioms)] #![cfg(all(unix, feature = "full", not(miri)))] use std::os::unix::io::{AsRawFd, RawFd}; use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, }; use std::time::Duration; use std::{ future::Future, io::{self, ErrorKind, Read, Write}, task::{Context, Waker}, }; use nix::unistd::{read, write}; use futures::poll; use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard}; use tokio::io::Interest; use tokio_test::{assert_err, assert_pending}; struct TestWaker { inner: Arc, waker: Waker, } #[derive(Default)] struct TestWakerInner { awoken: AtomicBool, } impl futures::task::ArcWake for TestWakerInner { fn wake_by_ref(arc_self: &Arc) { arc_self.awoken.store(true, Ordering::SeqCst); } } impl TestWaker { fn new() -> Self { let inner: Arc = Default::default(); Self { inner: inner.clone(), waker: futures::task::waker(inner), } } fn awoken(&self) -> bool { self.inner.awoken.swap(false, Ordering::SeqCst) } fn context(&self) -> Context<'_> { Context::from_waker(&self.waker) } } #[derive(Debug)] struct FileDescriptor { fd: std::os::fd::OwnedFd, } impl AsRawFd for FileDescriptor { fn as_raw_fd(&self) -> RawFd { self.fd.as_raw_fd() } } impl Read for &FileDescriptor { fn read(&mut self, buf: &mut [u8]) -> io::Result { read(self.fd.as_raw_fd(), buf).map_err(io::Error::from) } } impl Read for FileDescriptor { fn read(&mut self, buf: &mut [u8]) -> io::Result { (self as &Self).read(buf) } } impl Write for &FileDescriptor { fn write(&mut self, buf: &[u8]) -> io::Result { write(&self.fd, buf).map_err(io::Error::from) } fn flush(&mut self) -> io::Result<()> { Ok(()) } } impl Write for FileDescriptor { fn write(&mut self, buf: &[u8]) -> io::Result { (self as &Self).write(buf) } fn flush(&mut self) -> io::Result<()> { (self as &Self).flush() } } fn set_nonblocking(fd: RawFd) { use nix::fcntl::{OFlag, F_GETFL, F_SETFL}; let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFD)"); if flags < 0 { panic!( "bad return value from fcntl(F_GETFL): {} ({:?})", flags, nix::Error::last() ); } let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK; nix::fcntl::fcntl(fd, F_SETFL(flags)).expect("fcntl(F_SETFD)"); } fn socketpair() -> (FileDescriptor, FileDescriptor) { use nix::sys::socket::{self, AddressFamily, SockFlag, SockType}; let (fd_a, fd_b) = socket::socketpair( AddressFamily::Unix, SockType::Stream, None, SockFlag::empty(), ) .expect("socketpair"); let fds = (FileDescriptor { fd: fd_a }, FileDescriptor { fd: fd_b }); set_nonblocking(fds.0.fd.as_raw_fd()); set_nonblocking(fds.1.fd.as_raw_fd()); fds } fn drain(mut fd: &FileDescriptor, mut amt: usize) { let mut buf = [0u8; 512]; while amt > 0 { match fd.read(&mut buf[..]) { Err(e) if e.kind() == ErrorKind::WouldBlock => {} Ok(0) => panic!("unexpected EOF"), Err(e) => panic!("unexpected error: {:?}", e), Ok(x) => amt -= x, } } } #[tokio::test] async fn initially_writable() { let (a, b) = socketpair(); let afd_a = AsyncFd::new(a).unwrap(); let afd_b = AsyncFd::new(b).unwrap(); afd_a.writable().await.unwrap().clear_ready(); afd_b.writable().await.unwrap().clear_ready(); tokio::select! { biased; _ = tokio::time::sleep(Duration::from_millis(10)) => {}, _ = afd_a.readable() => panic!("Unexpected readable state"), _ = afd_b.readable() => panic!("Unexpected readable state"), } } #[tokio::test] async fn reset_readable() { let (a, mut b) = socketpair(); let afd_a = AsyncFd::new(a).unwrap(); let readable = afd_a.readable(); tokio::pin!(readable); tokio::select! { _ = readable.as_mut() => panic!(), _ = tokio::time::sleep(Duration::from_millis(10)) => {} } b.write_all(b"0").unwrap(); let mut guard = readable.await.unwrap(); guard .try_io(|_| afd_a.get_ref().read(&mut [0])) .unwrap() .unwrap(); // `a` is not readable, but the reactor still thinks it is // (because we have not observed a not-ready error yet) afd_a.readable().await.unwrap().retain_ready(); // Explicitly clear the ready state guard.clear_ready(); let readable = afd_a.readable(); tokio::pin!(readable); tokio::select! { _ = readable.as_mut() => panic!(), _ = tokio::time::sleep(Duration::from_millis(10)) => {} } b.write_all(b"0").unwrap(); // We can observe the new readable event afd_a.readable().await.unwrap().clear_ready(); } #[tokio::test] async fn reset_writable() { let (a, b) = socketpair(); let afd_a = AsyncFd::new(a).unwrap(); let mut guard = afd_a.writable().await.unwrap(); // Write until we get a WouldBlock. This also clears the ready state. let mut bytes = 0; while let Ok(Ok(amt)) = guard.try_io(|_| afd_a.get_ref().write(&[0; 512][..])) { bytes += amt; } // Writable state should be cleared now. let writable = afd_a.writable(); tokio::pin!(writable); tokio::select! { _ = writable.as_mut() => panic!(), _ = tokio::time::sleep(Duration::from_millis(10)) => {} } // Read from the other side; we should become writable now. drain(&b, bytes); let _ = writable.await.unwrap(); } #[derive(Debug)] struct ArcFd(Arc); impl AsRawFd for ArcFd { fn as_raw_fd(&self) -> RawFd { self.0.as_raw_fd() } } #[tokio::test] async fn drop_closes() { let (a, mut b) = socketpair(); let afd_a = AsyncFd::new(a).unwrap(); assert_eq!( ErrorKind::WouldBlock, b.read(&mut [0]).err().unwrap().kind() ); std::mem::drop(afd_a); assert_eq!(0, b.read(&mut [0]).unwrap()); // into_inner does not close the fd let (a, mut b) = socketpair(); let afd_a = AsyncFd::new(a).unwrap(); let _a: FileDescriptor = afd_a.into_inner(); assert_eq!( ErrorKind::WouldBlock, b.read(&mut [0]).err().unwrap().kind() ); // Drop closure behavior is delegated to the inner object let (a, mut b) = socketpair(); let arc_fd = Arc::new(a); let afd_a = AsyncFd::new(ArcFd(arc_fd.clone())).unwrap(); std::mem::drop(afd_a); assert_eq!( ErrorKind::WouldBlock, b.read(&mut [0]).err().unwrap().kind() ); std::mem::drop(arc_fd); // suppress unnecessary clone clippy warning } #[tokio::test] async fn reregister() { let (a, _b) = socketpair(); let afd_a = AsyncFd::new(a).unwrap(); let a = afd_a.into_inner(); AsyncFd::new(a).unwrap(); } #[tokio::test] async fn try_io() { let (a, mut b) = socketpair(); b.write_all(b"0").unwrap(); let afd_a = AsyncFd::new(a).unwrap(); let mut guard = afd_a.readable().await.unwrap(); afd_a.get_ref().read_exact(&mut [0]).unwrap(); // Should not clear the readable state let _ = guard.try_io(|_| Ok(())); // Still readable... let _ = afd_a.readable().await.unwrap(); // Should clear the readable state let _ = guard.try_io(|_| io::Result::<()>::Err(ErrorKind::WouldBlock.into())); // Assert not readable let readable = afd_a.readable(); tokio::pin!(readable); tokio::select! { _ = readable.as_mut() => panic!(), _ = tokio::time::sleep(Duration::from_millis(10)) => {} } // Write something down b again and make sure we're reawoken b.write_all(b"0").unwrap(); let _ = readable.await.unwrap(); } #[tokio::test] async fn multiple_waiters() { let (a, mut b) = socketpair(); let afd_a = Arc::new(AsyncFd::new(a).unwrap()); let barrier = Arc::new(tokio::sync::Barrier::new(11)); let mut tasks = Vec::new(); for _ in 0..10 { let afd_a = afd_a.clone(); let barrier = barrier.clone(); let f = async move { let notify_barrier = async { barrier.wait().await; futures::future::pending::<()>().await; }; tokio::select! { biased; guard = afd_a.readable() => { tokio::task::yield_now().await; guard.unwrap().clear_ready() }, _ = notify_barrier => unreachable!(), } std::mem::drop(afd_a); }; tasks.push(tokio::spawn(f)); } let mut all_tasks = futures::future::try_join_all(tasks); tokio::select! { r = std::pin::Pin::new(&mut all_tasks) => { r.unwrap(); // propagate panic panic!("Tasks exited unexpectedly") }, _ = barrier.wait() => {} } b.write_all(b"0").unwrap(); all_tasks.await.unwrap(); } #[tokio::test] async fn poll_fns() { let (a, b) = socketpair(); let afd_a = Arc::new(AsyncFd::new(a).unwrap()); let afd_b = Arc::new(AsyncFd::new(b).unwrap()); // Fill up the write side of A let mut bytes = 0; while let Ok(amt) = afd_a.get_ref().write(&[0; 512]) { bytes += amt; } let waker = TestWaker::new(); assert_pending!(afd_a.as_ref().poll_read_ready(&mut waker.context())); let afd_a_2 = afd_a.clone(); let r_barrier = Arc::new(tokio::sync::Barrier::new(2)); let barrier_clone = r_barrier.clone(); let read_fut = tokio::spawn(async move { // Move waker onto this task first assert_pending!(poll!(std::future::poll_fn(|cx| afd_a_2 .as_ref() .poll_read_ready(cx)))); barrier_clone.wait().await; let _ = std::future::poll_fn(|cx| afd_a_2.as_ref().poll_read_ready(cx)).await; }); let afd_a_2 = afd_a.clone(); let w_barrier = Arc::new(tokio::sync::Barrier::new(2)); let barrier_clone = w_barrier.clone(); let mut write_fut = tokio::spawn(async move { // Move waker onto this task first assert_pending!(poll!(std::future::poll_fn(|cx| afd_a_2 .as_ref() .poll_write_ready(cx)))); barrier_clone.wait().await; let _ = std::future::poll_fn(|cx| afd_a_2.as_ref().poll_write_ready(cx)).await; }); r_barrier.wait().await; w_barrier.wait().await; let readable = afd_a.readable(); tokio::pin!(readable); tokio::select! { _ = &mut readable => unreachable!(), _ = tokio::task::yield_now() => {} } // Make A readable. We expect that 'readable' and 'read_fut' will both complete quickly afd_b.get_ref().write_all(b"0").unwrap(); let _ = tokio::join!(readable, read_fut); // Our original waker should _not_ be awoken (poll_read_ready retains only the last context) assert!(!waker.awoken()); // The writable side should not be awoken tokio::select! { _ = &mut write_fut => unreachable!(), _ = tokio::time::sleep(Duration::from_millis(5)) => {} } // Make it writable now drain(afd_b.get_ref(), bytes); // now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side) let _ = write_fut.await; } fn assert_pending>(f: F) -> std::pin::Pin> { let mut pinned = Box::pin(f); assert_pending!(pinned .as_mut() .poll(&mut Context::from_waker(futures::task::noop_waker_ref()))); pinned } fn rt() -> tokio::runtime::Runtime { tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap() } #[test] fn driver_shutdown_wakes_currently_pending() { let rt = rt(); let (a, _b) = socketpair(); let afd_a = { let _enter = rt.enter(); AsyncFd::new(a).unwrap() }; let readable = assert_pending(afd_a.readable()); std::mem::drop(rt); // The future was initialized **before** dropping the rt assert_err!(futures::executor::block_on(readable)); // The future is initialized **after** dropping the rt. assert_err!(futures::executor::block_on(afd_a.readable())); } #[test] fn driver_shutdown_wakes_future_pending() { let rt = rt(); let (a, _b) = socketpair(); let afd_a = { let _enter = rt.enter(); AsyncFd::new(a).unwrap() }; std::mem::drop(rt); assert_err!(futures::executor::block_on(afd_a.readable())); } #[test] fn driver_shutdown_wakes_pending_race() { // TODO: make this a loom test for _ in 0..100 { let rt = rt(); let (a, _b) = socketpair(); let afd_a = { let _enter = rt.enter(); AsyncFd::new(a).unwrap() }; let _ = std::thread::spawn(move || std::mem::drop(rt)); // This may or may not return an error (but will be awoken) let _ = futures::executor::block_on(afd_a.readable()); // However retrying will always return an error assert_err!(futures::executor::block_on(afd_a.readable())); } } async fn poll_readable(fd: &AsyncFd) -> std::io::Result> { std::future::poll_fn(|cx| fd.poll_read_ready(cx)).await } async fn poll_writable(fd: &AsyncFd) -> std::io::Result> { std::future::poll_fn(|cx| fd.poll_write_ready(cx)).await } #[test] fn driver_shutdown_wakes_currently_pending_polls() { let rt = rt(); let (a, _b) = socketpair(); let afd_a = { let _enter = rt.enter(); AsyncFd::new(a).unwrap() }; while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable let readable = assert_pending(poll_readable(&afd_a)); let writable = assert_pending(poll_writable(&afd_a)); std::mem::drop(rt); // Attempting to poll readiness when the rt is dropped is an error assert_err!(futures::executor::block_on(readable)); assert_err!(futures::executor::block_on(writable)); } #[test] fn driver_shutdown_wakes_poll() { let rt = rt(); let (a, _b) = socketpair(); let afd_a = { let _enter = rt.enter(); AsyncFd::new(a).unwrap() }; std::mem::drop(rt); assert_err!(futures::executor::block_on(poll_readable(&afd_a))); assert_err!(futures::executor::block_on(poll_writable(&afd_a))); } #[test] fn driver_shutdown_then_clear_readiness() { let rt = rt(); let (a, _b) = socketpair(); let afd_a = { let _enter = rt.enter(); AsyncFd::new(a).unwrap() }; let mut write_ready = rt.block_on(afd_a.writable()).unwrap(); std::mem::drop(rt); write_ready.clear_ready(); } #[test] fn driver_shutdown_wakes_poll_race() { // TODO: make this a loom test for _ in 0..100 { let rt = rt(); let (a, _b) = socketpair(); let afd_a = { let _enter = rt.enter(); AsyncFd::new(a).unwrap() }; while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable let _ = std::thread::spawn(move || std::mem::drop(rt)); // The poll variants will always return an error in this case assert_err!(futures::executor::block_on(poll_readable(&afd_a))); assert_err!(futures::executor::block_on(poll_writable(&afd_a))); } } #[tokio::test] #[cfg(any(target_os = "linux", target_os = "android"))] async fn priority_event_on_oob_data() { use std::net::SocketAddr; use tokio::io::Interest; let addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); let listener = std::net::TcpListener::bind(addr).unwrap(); let addr = listener.local_addr().unwrap(); let client = std::net::TcpStream::connect(addr).unwrap(); let client = AsyncFd::with_interest(client, Interest::PRIORITY).unwrap(); let (stream, _) = listener.accept().unwrap(); // Sending out of band data should trigger priority event. send_oob_data(&stream, b"hello").unwrap(); let _ = client.ready(Interest::PRIORITY).await.unwrap(); } #[cfg(any(target_os = "linux", target_os = "android"))] fn send_oob_data(stream: &S, data: &[u8]) -> io::Result { unsafe { let res = libc::send( stream.as_raw_fd(), data.as_ptr().cast(), data.len(), libc::MSG_OOB, ); if res == -1 { Err(io::Error::last_os_error()) } else { Ok(res as usize) } } } #[tokio::test] #[cfg_attr(miri, ignore)] async fn clear_ready_matching_clears_ready() { use tokio::io::{Interest, Ready}; let (a, mut b) = socketpair(); let afd_a = AsyncFd::new(a).unwrap(); b.write_all(b"0").unwrap(); let mut guard = afd_a .ready(Interest::READABLE | Interest::WRITABLE) .await .unwrap(); assert_eq!(guard.ready(), Ready::READABLE | Ready::WRITABLE); guard.clear_ready_matching(Ready::READABLE); assert_eq!(guard.ready(), Ready::WRITABLE); guard.clear_ready_matching(Ready::WRITABLE); assert_eq!(guard.ready(), Ready::EMPTY); } #[tokio::test] #[cfg_attr(miri, ignore)] async fn clear_ready_matching_clears_ready_mut() { use tokio::io::{Interest, Ready}; let (a, mut b) = socketpair(); let mut afd_a = AsyncFd::new(a).unwrap(); b.write_all(b"0").unwrap(); let mut guard = afd_a .ready_mut(Interest::READABLE | Interest::WRITABLE) .await .unwrap(); assert_eq!(guard.ready(), Ready::READABLE | Ready::WRITABLE); guard.clear_ready_matching(Ready::READABLE); assert_eq!(guard.ready(), Ready::WRITABLE); guard.clear_ready_matching(Ready::WRITABLE); assert_eq!(guard.ready(), Ready::EMPTY); } #[tokio::test] #[cfg(target_os = "linux")] #[cfg_attr(miri, ignore)] async fn await_error_readiness_timestamping() { use std::net::{Ipv4Addr, SocketAddr}; use tokio::io::{Interest, Ready}; let address_a = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); let address_b = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); let socket = std::net::UdpSocket::bind(address_a).unwrap(); socket.set_nonblocking(true).unwrap(); // configure send timestamps configure_timestamping_socket(&socket).unwrap(); socket.connect(address_b).unwrap(); let fd = AsyncFd::new(socket).unwrap(); tokio::select! { _ = fd.ready(Interest::ERROR) => panic!(), _ = tokio::time::sleep(Duration::from_millis(10)) => {} } let buf = b"hello there"; fd.get_ref().send(buf).unwrap(); // the send timestamp should now be in the error queue let guard = fd.ready(Interest::ERROR).await.unwrap(); assert_eq!(guard.ready(), Ready::ERROR); } #[cfg(target_os = "linux")] fn configure_timestamping_socket(udp_socket: &std::net::UdpSocket) -> std::io::Result { // enable software timestamping, and specifically software send timestamping let options = libc::SOF_TIMESTAMPING_SOFTWARE | libc::SOF_TIMESTAMPING_TX_SOFTWARE; let res = unsafe { libc::setsockopt( udp_socket.as_raw_fd(), libc::SOL_SOCKET, libc::SO_TIMESTAMP, &options as *const _ as *const libc::c_void, std::mem::size_of_val(&options) as libc::socklen_t, ) }; if res == -1 { Err(std::io::Error::last_os_error()) } else { Ok(res) } } #[tokio::test] #[cfg(target_os = "linux")] #[cfg_attr(miri, ignore)] async fn await_error_readiness_invalid_address() { use std::net::{Ipv4Addr, SocketAddr}; use tokio::io::{Interest, Ready}; let socket_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0)); let socket = std::net::UdpSocket::bind(socket_addr).unwrap(); let socket_fd = socket.as_raw_fd(); // Enable IP_RECVERR option to receive error messages // https://man7.org/linux/man-pages/man7/ip.7.html has some extra information let recv_err: libc::c_int = 1; unsafe { let res = libc::setsockopt( socket.as_raw_fd(), libc::SOL_IP, libc::IP_RECVERR, &recv_err as *const _ as *const libc::c_void, std::mem::size_of_val(&recv_err) as libc::socklen_t, ); if res == -1 { panic!("{:?}", std::io::Error::last_os_error()); } } // Spawn a separate thread for sending messages tokio::spawn(async move { // Set the destination address. This address is invalid in this context. the OS will notice // that nobody is listening on port this port. Normally this is ignored (UDP is "fire and forget"), // but because IP_RECVERR is enabled, the error will actually be reported to the sending socket let mut dest_addr = unsafe { std::mem::MaybeUninit::::zeroed().assume_init() }; dest_addr.sin_family = libc::AF_INET as _; // based on https://en.wikipedia.org/wiki/Ephemeral_port, we should pick a port number // below 1024 to guarantee that other tests don't select this port by accident when they // use port 0 to select an ephemeral port. dest_addr.sin_port = 512u16.to_be(); // Destination port // Prepare the message data let message = "Hello, Socket!"; // Prepare the message structure for sendmsg let mut iov = libc::iovec { iov_base: message.as_ptr() as *mut libc::c_void, iov_len: message.len(), }; // Prepare the destination address for the sendmsg call let dest_sockaddr: *const libc::sockaddr = &dest_addr as *const _ as *const libc::sockaddr; let dest_addrlen: libc::socklen_t = std::mem::size_of_val(&dest_addr) as libc::socklen_t; let mut msg: libc::msghdr = unsafe { std::mem::MaybeUninit::zeroed().assume_init() }; msg.msg_name = dest_sockaddr as *mut libc::c_void; msg.msg_namelen = dest_addrlen; msg.msg_iov = &mut iov; msg.msg_iovlen = 1; if unsafe { libc::sendmsg(socket_fd, &msg, 0) } == -1 { panic!("{:?}", std::io::Error::last_os_error()) } }); let fd = AsyncFd::new(socket).unwrap(); let guard = fd.ready(Interest::ERROR).await.unwrap(); assert_eq!(guard.ready(), Ready::ERROR); } #[derive(Debug, PartialEq, Eq)] struct InvalidSource; impl AsRawFd for InvalidSource { fn as_raw_fd(&self) -> RawFd { -1 } } #[tokio::test] async fn try_new() { let original = Arc::new(InvalidSource); let error = AsyncFd::try_new(original.clone()).unwrap_err(); let (returned, _cause) = error.into_parts(); assert!(Arc::ptr_eq(&original, &returned)); } #[tokio::test] async fn try_with_interest() { let original = Arc::new(InvalidSource); let error = AsyncFd::try_with_interest(original.clone(), Interest::READABLE).unwrap_err(); let (returned, _cause) = error.into_parts(); assert!(Arc::ptr_eq(&original, &returned)); }