use retty_io::event::Evented; use retty_io::{Events, Poll, PollOpt, Ready, Registration, SetReadiness, Token}; use std::time::Duration; #[test] fn smoke() { let poll = Poll::new().unwrap(); let mut events = Events::with_capacity(128); let (r, set) = Registration::new2(); r.register(&poll, Token(0), Ready::readable(), PollOpt::edge()) .unwrap(); let n = poll .poll(&mut events, Some(Duration::from_millis(0))) .unwrap(); assert_eq!(n, 0); set.set_readiness(Ready::readable()).unwrap(); let n = poll .poll(&mut events, Some(Duration::from_millis(0))) .unwrap(); assert_eq!(n, 1); assert_eq!(events.get(0).unwrap().token(), Token(0)); } #[test] fn set_readiness_before_register() { use std::sync::{Arc, Barrier}; use std::thread; let poll = Poll::new().unwrap(); let mut events = Events::with_capacity(128); for _ in 0..5_000 { let (r, set) = Registration::new2(); let b1 = Arc::new(Barrier::new(2)); let b2 = b1.clone(); let th = thread::spawn(move || { // set readiness before register set.set_readiness(Ready::readable()).unwrap(); // run into barrier so both can pass b2.wait(); }); // wait for readiness b1.wait(); // now register poll.register(&r, Token(123), Ready::readable(), PollOpt::edge()) .unwrap(); loop { let n = poll.poll(&mut events, None).unwrap(); if n == 0 { continue; } assert_eq!(n, 1); assert_eq!(events.get(0).unwrap().token(), Token(123)); break; } th.join().unwrap(); } } #[cfg(any(target_os = "linux", target_os = "macos", target_os = "windows"))] mod stress { use retty_io::event::Evented; use retty_io::{Events, Poll, PollOpt, Ready, Registration, SetReadiness, Token}; use std::time::Duration; #[test] fn single_threaded_poll() { use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::{Acquire, Release}; use std::sync::Arc; use std::thread; const NUM_ATTEMPTS: usize = 30; const NUM_ITERS: usize = 500; const NUM_THREADS: usize = 4; const NUM_REGISTRATIONS: usize = 128; for _ in 0..NUM_ATTEMPTS { let poll = Poll::new().unwrap(); let mut events = Events::with_capacity(NUM_REGISTRATIONS); let registrations: Vec<_> = (0..NUM_REGISTRATIONS) .map(|i| { let (r, s) = Registration::new2(); r.register(&poll, Token(i), Ready::readable(), PollOpt::edge()) .unwrap(); (r, s) }) .collect(); let mut ready: Vec<_> = (0..NUM_REGISTRATIONS).map(|_| Ready::empty()).collect(); let remaining = Arc::new(AtomicUsize::new(NUM_THREADS)); for _ in 0..NUM_THREADS { let remaining = remaining.clone(); let set_readiness: Vec = registrations.iter().map(|r| r.1.clone()).collect(); thread::spawn(move || { for _ in 0..NUM_ITERS { for i in 0..NUM_REGISTRATIONS { set_readiness[i].set_readiness(Ready::readable()).unwrap(); set_readiness[i].set_readiness(Ready::empty()).unwrap(); set_readiness[i].set_readiness(Ready::writable()).unwrap(); set_readiness[i] .set_readiness(Ready::readable() | Ready::writable()) .unwrap(); set_readiness[i].set_readiness(Ready::empty()).unwrap(); } } for i in 0..NUM_REGISTRATIONS { set_readiness[i].set_readiness(Ready::readable()).unwrap(); } remaining.fetch_sub(1, Release); }); } while remaining.load(Acquire) > 0 { // Set interest for (i, &(ref r, _)) in registrations.iter().enumerate() { r.reregister(&poll, Token(i), Ready::writable(), PollOpt::edge()) .unwrap(); } poll.poll(&mut events, Some(Duration::from_millis(0))) .unwrap(); for event in &events { ready[event.token().0] = event.readiness(); } // Update registration // Set interest for (i, &(ref r, _)) in registrations.iter().enumerate() { r.reregister(&poll, Token(i), Ready::readable(), PollOpt::edge()) .unwrap(); } } // Finall polls, repeat until readiness-queue empty loop { // Might not read all events from custom-event-queue at once, implementation dependend poll.poll(&mut events, Some(Duration::from_millis(0))) .unwrap(); if events.is_empty() { // no more events in readiness queue pending break; } for event in &events { ready[event.token().0] = event.readiness(); } } // Everything should be flagged as readable for ready in ready { assert_eq!(ready, Ready::readable()); } } } #[test] fn multi_threaded_poll() { use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::{Relaxed, SeqCst}; use std::sync::{Arc, Barrier}; use std::thread; const ENTRIES: usize = 10_000; const PER_ENTRY: usize = 16; const THREADS: usize = 4; const NUM: usize = ENTRIES * PER_ENTRY; struct Entry { #[allow(dead_code)] registration: Registration, set_readiness: SetReadiness, num: AtomicUsize, } impl Entry { fn fire(&self) { self.set_readiness.set_readiness(Ready::readable()).unwrap(); } } let poll = Arc::new(Poll::new().unwrap()); let mut entries = vec![]; // Create entries for i in 0..ENTRIES { let (registration, set_readiness) = Registration::new2(); registration .register(&poll, Token(i), Ready::readable(), PollOpt::edge()) .unwrap(); entries.push(Entry { registration, set_readiness, num: AtomicUsize::new(0), }); } let total = Arc::new(AtomicUsize::new(0)); let entries = Arc::new(entries); let barrier = Arc::new(Barrier::new(THREADS)); let mut threads = vec![]; for th in 0..THREADS { let poll = poll.clone(); let total = total.clone(); let entries = entries.clone(); let barrier = barrier.clone(); threads.push(thread::spawn(move || { let mut events = Events::with_capacity(128); barrier.wait(); // Prime all the registrations let mut i = th; while i < ENTRIES { entries[i].fire(); i += THREADS; } let mut n = 0; while total.load(SeqCst) < NUM { // A poll timeout is necessary here because there may be more // than one threads blocked in `poll` when the final wakeup // notification arrives (and only notifies one thread). n += poll .poll(&mut events, Some(Duration::from_millis(100))) .unwrap(); let mut num_this_tick = 0; for event in &events { let e = &entries[event.token().0]; let mut num = e.num.load(Relaxed); loop { if num < PER_ENTRY { let actual = e.num.compare_and_swap(num, num + 1, Relaxed); if actual == num { num_this_tick += 1; e.fire(); break; } num = actual; } else { break; } } } total.fetch_add(num_this_tick, SeqCst); } n })); } let _: Vec<_> = threads.into_iter().map(|th| th.join().unwrap()).collect(); for entry in entries.iter() { assert_eq!(PER_ENTRY, entry.num.load(Relaxed)); } } #[test] fn with_small_events_collection() { const N: usize = 8; const ITER: usize = 1_000; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::{Acquire, Release}; use std::sync::{Arc, Barrier}; use std::thread; let poll = Poll::new().unwrap(); let mut registrations = vec![]; let barrier = Arc::new(Barrier::new(N + 1)); let done = Arc::new(AtomicBool::new(false)); for i in 0..N { let (registration, set_readiness) = Registration::new2(); poll.register(®istration, Token(i), Ready::readable(), PollOpt::edge()) .unwrap(); registrations.push(registration); let barrier = barrier.clone(); let done = done.clone(); thread::spawn(move || { barrier.wait(); while !done.load(Acquire) { set_readiness.set_readiness(Ready::readable()).unwrap(); } // Set one last time set_readiness.set_readiness(Ready::readable()).unwrap(); }); } let mut events = Events::with_capacity(4); barrier.wait(); for _ in 0..ITER { poll.poll(&mut events, None).unwrap(); } done.store(true, Release); let mut final_ready = vec![false; N]; for _ in 0..5 { poll.poll(&mut events, None).unwrap(); for event in &events { final_ready[event.token().0] = true; } if final_ready.iter().all(|v| *v) { return; } thread::sleep(Duration::from_millis(10)); } panic!("dead lock?"); } } #[test] fn drop_registration_from_non_main_thread() { use std::sync::mpsc::channel; use std::thread; const THREADS: usize = 8; const ITERS: usize = 50_000; let poll = Poll::new().unwrap(); let mut events = Events::with_capacity(1024); let mut senders = Vec::with_capacity(THREADS); let mut token_index = 0; // spawn threads, which will send messages to single receiver for _ in 0..THREADS { let (tx, rx) = channel::<(Registration, SetReadiness)>(); senders.push(tx); thread::spawn(move || { for (registration, set_readiness) in rx { let _ = set_readiness.set_readiness(Ready::readable()); drop(registration); drop(set_readiness); } }); } let mut index: usize = 0; for _ in 0..ITERS { let (registration, set_readiness) = Registration::new2(); registration .register( &poll, Token(token_index), Ready::readable(), PollOpt::edge(), ) .unwrap(); let _ = senders[index].send((registration, set_readiness)); token_index += 1; index += 1; if index == THREADS { index = 0; let (registration, set_readiness) = Registration::new2(); registration .register( &poll, Token(token_index), Ready::readable(), PollOpt::edge(), ) .unwrap(); let _ = set_readiness.set_readiness(Ready::readable()); drop(registration); drop(set_readiness); token_index += 1; thread::park_timeout(Duration::from_millis(0)); let _ = poll.poll(&mut events, None).unwrap(); } } }