use shuttle::scheduler::PctScheduler; use shuttle::sync::{mpsc::channel, RwLock}; use shuttle::{check, check_dfs, check_random, thread, Runner}; use std::collections::HashSet; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, TryLockError}; use test_log::test; #[test] fn reader_concurrency() { let saw_concurrent_reads = Arc::new(AtomicBool::new(false)); { let saw_concurrent_reads = Arc::clone(&saw_concurrent_reads); check_random( move || { let rwlock = Arc::new(RwLock::new(0usize)); let readers = Arc::new(AtomicUsize::new(0)); { let rwlock = Arc::clone(&rwlock); let readers = Arc::clone(&readers); let saw_concurrent_reads = Arc::clone(&saw_concurrent_reads); thread::spawn(move || { let counter = rwlock.read().unwrap(); assert_eq!(*counter, 0); readers.fetch_add(1, Ordering::SeqCst); thread::yield_now(); if readers.load(Ordering::SeqCst) == 2 { saw_concurrent_reads.store(true, Ordering::SeqCst); } readers.fetch_sub(1, Ordering::SeqCst); }); } let counter = rwlock.read().unwrap(); assert_eq!(*counter, 0); readers.fetch_add(1, Ordering::SeqCst); thread::yield_now(); if readers.load(Ordering::SeqCst) == 2 { saw_concurrent_reads.store(true, Ordering::SeqCst); } readers.fetch_sub(1, Ordering::SeqCst); }, 100, ); } assert!(saw_concurrent_reads.load(Ordering::SeqCst)); } fn deadlock() { let lock1 = Arc::new(RwLock::new(0usize)); let lock2 = Arc::new(RwLock::new(0usize)); let lock1_clone = Arc::clone(&lock1); let lock2_clone = Arc::clone(&lock2); thread::spawn(move || { let _l1 = lock1_clone.read().unwrap(); let _l2 = lock2_clone.read().unwrap(); }); let _l2 = lock2.write().unwrap(); let _l1 = lock1.write().unwrap(); } #[test] #[should_panic(expected = "deadlock")] fn deadlock_default() { // Round-robin should always fail this deadlock test check(deadlock); } #[test] #[should_panic(expected = "deadlock")] fn deadlock_random() { // 200 tries should be enough to find a deadlocking execution check_random(deadlock, 200); } #[test] #[should_panic(expected = "deadlock")] fn deadlock_pct() { // 200 tries should be enough to find a deadlocking execution let scheduler = PctScheduler::new(2, 100); let runner = Runner::new(scheduler, Default::default()); runner.run(deadlock); } // Test case for a bug we found in Loom: https://github.com/tokio-rs/loom/pull/135 #[test] fn rwlock_two_writers() { check_random( || { let lock = Arc::new(RwLock::new(1)); let lock2 = lock.clone(); thread::spawn(move || { let mut w = lock.write().unwrap(); *w += 1; thread::yield_now(); }); thread::spawn(move || { let mut w = lock2.write().unwrap(); *w += 1; thread::yield_now(); }); }, 100, ); } // Check that multiple readers are allowed to read at a time // This test should never deadlock. #[test] fn rwlock_allows_multiple_readers() { check_dfs( || { let lock1 = Arc::new(RwLock::new(1)); let lock2 = lock1.clone(); let (s1, r1) = channel::(); let (s2, r2) = channel::(); thread::spawn(move || { let w = lock1.read().unwrap(); s1.send(*w).unwrap(); // Send value to other thread let r = r2.recv().unwrap(); // Wait for value from other thread assert_eq!(r, 1); }); thread::spawn(move || { let w = lock2.read().unwrap(); s2.send(*w).unwrap(); let r = r1.recv().unwrap(); assert_eq!(r, 1); }); }, None, ); } // Ensure that a writer is never woken up when a reader is using the lock fn two_readers_and_one_writer() { let lock1 = Arc::new(RwLock::new(1)); // Spawn two readers, each tries to acquire both locks for _ in 0..2 { let rlock1 = lock1.clone(); thread::spawn(move || { let r1 = rlock1.read().unwrap(); thread::yield_now(); assert!(*r1 > 0); }); } // Writer runs on the main thread let mut w = lock1.write().unwrap(); *w += 1; } #[test] fn rwlock_two_readers_and_one_writer_exhaustive() { check_dfs(two_readers_and_one_writer, None); } #[test] fn rwlock_default() { struct Point(u32, u32); impl Default for Point { fn default() -> Self { Self(21, 42) } } check_dfs( || { let point: RwLock = Default::default(); let r = point.read().unwrap(); assert_eq!(r.0, 21); assert_eq!(r.1, 42); }, None, ); } #[test] fn rwlock_into_inner() { check_dfs( || { let lock = Arc::new(RwLock::new(0u64)); let threads = (0..2) .map(|_| { let lock = lock.clone(); thread::spawn(move || { *lock.write().unwrap() += 1; }) }) .collect::>(); for thread in threads { thread.join().unwrap(); } let lock = Arc::try_unwrap(lock).unwrap(); assert_eq!(lock.into_inner().unwrap(), 2); }, None, ) } /// Two concurrent threads trying to do an atomic increment using `try_write`. /// One `try_write` must succeed, while the other may or may not succeed. /// Thus we expect to see final values 1 and 2. #[test] fn concurrent_try_increment() { let observed_values = Arc::new(std::sync::Mutex::new(HashSet::new())); let observed_values_clone = Arc::clone(&observed_values); check_dfs( move || { let lock = Arc::new(RwLock::new(0usize)); let threads = (0..2) .map(|_| { let lock = Arc::clone(&lock); thread::spawn(move || { match lock.try_write() { Ok(mut guard) => { *guard += 1; } Err(TryLockError::WouldBlock) => (), Err(_) => panic!("unexpected TryLockError"), }; }) }) .collect::>(); for thd in threads { thd.join().unwrap(); } let value = Arc::try_unwrap(lock).unwrap().into_inner().unwrap(); observed_values_clone.lock().unwrap().insert(value); }, None, ); let observed_values = Arc::try_unwrap(observed_values).unwrap().into_inner().unwrap(); assert_eq!(observed_values, HashSet::from([1, 2])); } /// Run some threads that try to acquire the lock in both read and write modes, and check that any /// execution allowed by our `RwLock` implementation is allowed by `std`. #[test] fn try_lock_implies_std() { check_random( move || { let lock = Arc::new(RwLock::new(())); let reference_lock = Arc::new(std::sync::RwLock::new(())); let threads = (0..3) .map(|_| { let lock = Arc::clone(&lock); let reference_lock = Arc::clone(&reference_lock); thread::spawn(move || { for _ in 0..3 { { let _r = lock.try_read(); if _r.is_ok() { assert!(reference_lock.try_read().is_ok()); } } { let _w = lock.try_write(); if _w.is_ok() { assert!(reference_lock.try_write().is_ok()); } } } }) }) .collect::>(); for thd in threads { thd.join().unwrap(); } }, 5000, ); } /// Run some threads that try to acquire the lock in both read and write modes, and check that any /// execution allowed by `std` is allowed by our `RwLock` implementation. (This implication isn't /// true in general -- see `double_try_read` -- but is true for this test). #[test] fn try_lock_implied_by_std() { check_random( move || { let lock = Arc::new(RwLock::new(())); let reference_lock = Arc::new(std::sync::RwLock::new(())); let threads = (0..3) .map(|_| { let lock = Arc::clone(&lock); let reference_lock = Arc::clone(&reference_lock); thread::spawn(move || { for _ in 0..5 { { let _r = reference_lock.try_read(); if _r.is_ok() { assert!(lock.try_read().is_ok()); } } { let _w = reference_lock.try_write(); if _w.is_ok() { assert!(lock.try_write().is_ok()); } } } }) }) .collect::>(); for thd in threads { thd.join().unwrap(); } }, 5000, ); } /// Three concurrent threads, one doing an atomic increment by 1 using `write`, one trying to do an /// atomic increment by 1 followed by trying to do an atomic increment by 2 using `try_write`, and a /// third that peeks at the value using `try_read.` The `write` must succeed, while each `try_write` /// may or may not succeed. #[test] fn concurrent_write_try_write_try_read() { let observed_values = Arc::new(std::sync::Mutex::new(HashSet::new())); let observed_values_clone = Arc::clone(&observed_values); check_dfs( move || { let lock = Arc::new(RwLock::new(0usize)); let write_thread = { let lock = Arc::clone(&lock); thread::spawn(move || { *lock.write().unwrap() += 1; }) }; let try_write_thread = { let lock = Arc::clone(&lock); thread::spawn(move || { for n in 1..3 { match lock.try_write() { Ok(mut guard) => { *guard += n; } Err(TryLockError::WouldBlock) => (), Err(_) => panic!("unexpected TryLockError"), }; } }) }; let read_value = match lock.try_read() { Ok(guard) => Some(*guard), Err(TryLockError::WouldBlock) => None, Err(_) => panic!("unexpected TryLockError"), }; write_thread.join().unwrap(); try_write_thread.join().unwrap(); let final_value = Arc::try_unwrap(lock).unwrap().into_inner().unwrap(); observed_values_clone.lock().unwrap().insert((final_value, read_value)); }, None, ); let observed_values = Arc::try_unwrap(observed_values).unwrap().into_inner().unwrap(); // The idea here is that the `try_read` can interleave anywhere between the (successful) writes, // but can also just fail. let expected_values = HashSet::from([ // Both `try_write`s fail (1, None), (1, Some(0)), (1, Some(1)), // Second `try_write` fails (2, None), (2, Some(0)), (2, Some(1)), (2, Some(2)), // First `try_write` fails (3, None), (3, Some(0)), (3, Some(1)), // (3, Some(2)), // If first `try_write` failed, the value of the lock is never 2 (3, Some(3)), // Both `try_write`s succeed (4, None), (4, Some(0)), (4, Some(1)), (4, Some(2)), (4, Some(3)), (4, Some(4)), ]); assert_eq!(observed_values, expected_values); } /// This behavior is _sometimes_ allowed in `std`, but advised against, as it can lead to deadlocks /// on some platforms if the second `read` races with another thread's `write`. We conservatively /// rule it out in all cases to better detect potential deadlocks. #[test] #[should_panic(expected = "tried to acquire a RwLock it already holds")] fn double_read() { check_dfs( || { let rwlock = RwLock::new(()); let _guard_1 = rwlock.read().unwrap(); let _guard_2 = rwlock.read(); }, None, ) } #[test] #[should_panic(expected = "tried to acquire a RwLock it already holds")] fn double_write() { check_dfs( || { let rwlock = RwLock::new(()); let _guard_1 = rwlock.write().unwrap(); let _guard_2 = rwlock.write(); }, None, ) } #[test] #[should_panic(expected = "tried to acquire a RwLock it already holds")] fn read_upgrade() { check_dfs( || { let rwlock = RwLock::new(()); let _guard_1 = rwlock.read().unwrap(); let _guard_2 = rwlock.write(); }, None, ) } #[test] #[should_panic(expected = "tried to acquire a RwLock it already holds")] fn write_downgrade() { check_dfs( || { let rwlock = RwLock::new(()); let _guard_1 = rwlock.write().unwrap(); let _guard_2 = rwlock.read(); }, None, ) } /// This behavior isn't consistent with `std`, which seems to suggest that `try_read` succeeds if /// the current thread already holds a read lock. We assume it always fails so that we can more /// easily diagnose potential deadlocks, especially with async tasks that might migrate across /// threads in real implementations. #[test] fn double_try_read() { check_dfs( || { let rwlock = RwLock::new(()); let _guard_1 = rwlock.try_read().unwrap(); assert!(matches!(rwlock.try_read(), Err(TryLockError::WouldBlock))); }, None, ) } /// As with `double_try_read`, this isn't consistent with `std`. #[test] fn read_try_read() { check_dfs( || { let rwlock = RwLock::new(()); let _guard_1 = rwlock.read().unwrap(); assert!(matches!(rwlock.try_read(), Err(TryLockError::WouldBlock))); }, None, ) } #[test] fn double_try_write() { check_dfs( || { let rwlock = RwLock::new(()); let _guard_1 = rwlock.try_write().unwrap(); assert!(matches!(rwlock.try_write(), Err(TryLockError::WouldBlock))); }, None, ) } #[test] fn write_try_write() { check_dfs( || { let rwlock = RwLock::new(()); let _guard_1 = rwlock.write().unwrap(); assert!(matches!(rwlock.try_write(), Err(TryLockError::WouldBlock))); }, None, ) } #[test] fn try_read_upgrade() { check_dfs( || { let rwlock = RwLock::new(()); let _guard_1 = rwlock.try_read().unwrap(); assert!(matches!(rwlock.try_write(), Err(TryLockError::WouldBlock))); }, None, ) } #[test] fn try_write_downgrade() { check_dfs( || { let rwlock = RwLock::new(()); let _guard_1 = rwlock.try_write().unwrap(); assert!(matches!(rwlock.try_read(), Err(TryLockError::WouldBlock))); }, None, ) }