# `NonBlockingMutex` [github](https://github.com/ivanivanyuk1993/utility.non_blocking_mutex) [crates.io](https://crates.io/crates/non_blocking_mutex) [docs.rs](https://docs.rs/non_blocking_mutex/latest/non_blocking_mutex/non_blocking_mutex/struct.NonBlockingMutex.html#nonblockingmutex) [![Build and test Rust](https://github.com/ivanivanyuk1993/utility.non_blocking_mutex/actions/workflows/rust.yml/badge.svg)](https://github.com/ivanivanyuk1993/utility.non_blocking_mutex/actions/workflows/rust.yml) ## Why you should use `NonBlockingMutex` `NonBlockingMutex` is currently the fastest way to do expensive calculations under lock, or do cheap calculations under lock when concurrency/load/contention is very high - see benchmarks in directory `benches` and run them with ```bash cargo bench ``` ## Installation ```bash cargo add non_blocking_mutex ``` ## Examples ### Optimized for 1 type of `NonBlockingMutexTask` ```rust use non_blocking_mutex::mutex_guard::MutexGuard; use non_blocking_mutex::non_blocking_mutex::NonBlockingMutex; use std::thread::{available_parallelism}; /// How many threads can physically access [NonBlockingMutex] /// simultaneously, needed for computing `shard_count` of [ShardedQueue], /// used to store queue of tasks let max_concurrent_thread_count = available_parallelism().unwrap().get(); let non_blocking_mutex = NonBlockingMutex::new(max_concurrent_thread_count, 0); /// Will infer exact type and size(0) of this [FnOnce] and /// make sized [NonBlockingMutex] which takes only this exact [FnOnce] /// without ever requiring [Box]-ing or dynamic dispatch non_blocking_mutex.run_if_first_or_schedule_on_first(|mut state: MutexGuard| { *state += 1; }); ``` ### Easy to use with any `FnOnce`, but may `Box` tasks and use dynamic dispatch when can't acquire lock on first try ```rust use non_blocking_mutex::dynamic_non_blocking_mutex::DynamicNonBlockingMutex; use std::thread::{available_parallelism, scope}; let mut state_snapshot_before_increment = 0; let mut state_snapshot_after_increment = 0; let mut state_snapshot_before_decrement = 0; let mut state_snapshot_after_decrement = 0; { /// How many threads can physically access [NonBlockingMutex] /// simultaneously, needed for computing `shard_count` of [ShardedQueue], /// used to store queue of tasks let max_concurrent_thread_count = available_parallelism().unwrap().get(); /// Will work with any [FnOnce] and is easy to use, /// but will [Box] tasks and use dynamic dispatch /// when can't acquire lock on first try let non_blocking_mutex = DynamicNonBlockingMutex::new(max_concurrent_thread_count, 0); scope(|scope| { scope.spawn(|| { non_blocking_mutex.run_fn_once_if_first_or_schedule_on_first(|mut state| { *(&mut state_snapshot_before_increment) = *state; *state += 1; *(&mut state_snapshot_after_increment) = *state; }); non_blocking_mutex.run_fn_once_if_first_or_schedule_on_first(|mut state| { *(&mut state_snapshot_before_decrement) = *state; *state -= 1; *(&mut state_snapshot_after_decrement) = *state; }); }); }); } assert_eq!(state_snapshot_before_increment, 0); assert_eq!(state_snapshot_after_increment, 1); assert_eq!(state_snapshot_before_decrement, 1); assert_eq!(state_snapshot_after_decrement, 0); ``` ### Optimized for multiple known types of `NonBlockingMutexTask` which capture variables ```rust use non_blocking_mutex::mutex_guard::MutexGuard; use non_blocking_mutex::non_blocking_mutex::NonBlockingMutex; use non_blocking_mutex::non_blocking_mutex_task::NonBlockingMutexTask; use std::thread::{available_parallelism, scope}; let mut state_snapshot_before_increment = 0; let mut state_snapshot_after_increment = 0; let mut state_snapshot_before_decrement = 0; let mut state_snapshot_after_decrement = 0; { /// How many threads can physically access [NonBlockingMutex] /// simultaneously, needed for computing `shard_count` of [ShardedQueue], /// used to store queue of tasks let max_concurrent_thread_count = available_parallelism().unwrap().get(); /// Will infer exact type and size of struct [Task] and /// make sized [NonBlockingMutex] which takes only [Task] /// without ever requiring [Box]-ing or dynamic dispatch let non_blocking_mutex = NonBlockingMutex::new(max_concurrent_thread_count, 0); scope(|scope| { scope.spawn(|| { non_blocking_mutex.run_if_first_or_schedule_on_first( Task::new_increment_and_store_snapshots( &mut state_snapshot_before_increment, &mut state_snapshot_after_increment, ), ); non_blocking_mutex.run_if_first_or_schedule_on_first( Task::new_decrement_and_store_snapshots( &mut state_snapshot_before_decrement, &mut state_snapshot_after_decrement, ), ); }); }); } assert_eq!(state_snapshot_before_increment, 0); assert_eq!(state_snapshot_after_increment, 1); assert_eq!(state_snapshot_before_decrement, 1); assert_eq!(state_snapshot_after_decrement, 0); struct SnapshotsBeforeAndAfterChangeRefs< 'snapshot_before_change_ref, 'snapshot_after_change_ref, > { /// Where to write snapshot of `State` before applying function to `State` snapshot_before_change_ref: &'snapshot_before_change_ref mut usize, /// Where to write snapshot of `State` after applying function to `State snapshot_after_change_ref: &'snapshot_after_change_ref mut usize, } enum TaskType<'snapshot_before_change_ref, 'snapshot_after_change_ref> { IncrementAndStoreSnapshots( SnapshotsBeforeAndAfterChangeRefs< 'snapshot_before_change_ref, 'snapshot_after_change_ref, >, ), DecrementAndStoreSnapshots( SnapshotsBeforeAndAfterChangeRefs< 'snapshot_before_change_ref, 'snapshot_after_change_ref, >, ), } struct Task<'snapshot_before_change_ref, 'snapshot_after_change_ref> { task_type: TaskType<'snapshot_before_change_ref, 'snapshot_after_change_ref>, } impl<'snapshot_before_change_ref, 'snapshot_after_change_ref> Task<'snapshot_before_change_ref, 'snapshot_after_change_ref> { fn new_increment_and_store_snapshots( // Where to write snapshot of `State` before applying function to `State` snapshot_before_change_ref: &'snapshot_before_change_ref mut usize, // Where to write snapshot of `State` after applying function to `State snapshot_after_change_ref: &'snapshot_after_change_ref mut usize, ) -> Self { Self { task_type: TaskType::IncrementAndStoreSnapshots( SnapshotsBeforeAndAfterChangeRefs { /// Where to write snapshot of `State` before applying function to `State` snapshot_before_change_ref, /// Where to write snapshot of `State` after applying function to `State snapshot_after_change_ref, }, ), } } fn new_decrement_and_store_snapshots( // Where to write snapshot of `State` before applying function to `State` snapshot_before_change_ref: &'snapshot_before_change_ref mut usize, // Where to write snapshot of `State` after applying function to `State snapshot_after_change_ref: &'snapshot_after_change_ref mut usize, ) -> Self { Self { task_type: TaskType::DecrementAndStoreSnapshots( SnapshotsBeforeAndAfterChangeRefs { /// Where to write snapshot of `State` before applying function to `State` snapshot_before_change_ref, /// Where to write snapshot of `State` after applying function to `State snapshot_after_change_ref, }, ), } } } impl<'snapshot_before_change_ref, 'snapshot_after_change_ref> NonBlockingMutexTask for Task<'snapshot_before_change_ref, 'snapshot_after_change_ref> { fn run_with_state(self, mut state: MutexGuard) { match self.task_type { TaskType::IncrementAndStoreSnapshots(SnapshotsBeforeAndAfterChangeRefs { snapshot_before_change_ref, snapshot_after_change_ref, }) => { *snapshot_before_change_ref = *state; *state += 1; *snapshot_after_change_ref = *state; } TaskType::DecrementAndStoreSnapshots(SnapshotsBeforeAndAfterChangeRefs { snapshot_before_change_ref, snapshot_after_change_ref, }) => { *snapshot_before_change_ref = *state; *state -= 1; *snapshot_after_change_ref = *state; } } } } ``` ## Why you may want to not use `NonBlockingMutex` - `NonBlockingMutex` forces first thread to enter synchronized block to do all tasks(including added while it is running, potentially running forever if tasks are being added forever) - It is more difficult to continue execution on same thread after synchronized logic is run, you need to schedule continuation on some scheduler when you want to continue after end of synchronized logic in new thread or introduce other synchronization primitives, like channels, or `WaitGroup`-s, or similar - `NonBlockingMutex` performs worse than `std::sync::Mutex` when concurrency/load/contention is low - Similar to `std::sync::Mutex`, `NonBlockingMutex` doesn't guarantee order of execution, only atomicity of operations is guaranteed ## Benchmarks See benchmark logic in directory `benches` and reproduce results by running ```bash cargo bench ``` ### Single fast operation in single thread without contention `DynamicNonBlockingMutex` performs only a little bit slower than `Mutex` when there is only 1 thread and 1 operation (because `DynamicNonBlockingMutex` doesn't `Box` and store in `ShardedQueue` first operation in loop), while `NonBlockingMutex` outperforms other synchronization options when there is only 1 thread and 1 operation | benchmark_name | time | |:------------------------------------------------|----------:| | increment_once_without_mutex | 0.228 ns | | increment_once_under_non_blocking_mutex_static | 8.544 ns | | increment_once_under_non_blocking_mutex_dynamic | 9.445 ns | | increment_once_under_mutex_blockingly | 8.851 ns | | increment_once_under_mutex_spinny | 10.603 ns | ### Emulating expensive operation by spinning N times under lock with many threads and highest contention With higher contention(caused by long time under lock in our case, but can also be caused by higher CPU count), `NonBlockingMutex` starts to perform better than `std::sync::Mutex` | Benchmark name | Operation count per thread | Spin under lock count | Concurrent thread count | average_time | |:--------------------------------------------------------|---------------------------:|----------------------:|------------------------:|-------------:| | increment_under_non_blocking_mutex_concurrently_static | 1_000 | 0 | 24 | 2.313 ms | | increment_under_non_blocking_mutex_concurrently_dynamic | 1_000 | 0 | 24 | 3.408 ms | | increment_under_mutex_blockingly_concurrently | 1_000 | 0 | 24 | 1.072 ms | | increment_under_mutex_spinny_concurrently | 1_000 | 0 | 24 | 4.376 ms | | increment_under_non_blocking_mutex_concurrently_static | 10_000 | 0 | 24 | 23.969 ms | | increment_under_non_blocking_mutex_concurrently_dynamic | 10_000 | 0 | 24 | 42.584 ms | | increment_under_mutex_blockingly_concurrently | 10_000 | 0 | 24 | 14.960 ms | | increment_under_mutex_spinny_concurrently | 10_000 | 0 | 24 | 94.658 ms | | increment_under_non_blocking_mutex_concurrently_static | 1_000 | 10 | 24 | 9.457 ms | | increment_under_non_blocking_mutex_concurrently_dynamic | 1_000 | 10 | 24 | 12.280 ms | | increment_under_mutex_blockingly_concurrently | 1_000 | 10 | 24 | 8.345 ms | | increment_under_mutex_spinny_concurrently | 1_000 | 10 | 24 | 34.977 ms | | increment_under_non_blocking_mutex_concurrently_static | 10_000 | 10 | 24 | 58.297 ms | | increment_under_non_blocking_mutex_concurrently_dynamic | 10_000 | 10 | 24 | 70.013 ms | | increment_under_mutex_blockingly_concurrently | 10_000 | 10 | 24 | 84.143 ms | | increment_under_mutex_spinny_concurrently | 10_000 | 10 | 24 | 349.070 ms | | increment_under_non_blocking_mutex_concurrently_static | 1_000 | 100 | 24 | 39.569 ms | | increment_under_non_blocking_mutex_concurrently_dynamic | 1_000 | 100 | 24 | 44.670 ms | | increment_under_mutex_blockingly_concurrently | 1_000 | 100 | 24 | 47.335 ms | | increment_under_mutex_spinny_concurrently | 1_000 | 100 | 24 | 117.570 ms | | increment_under_non_blocking_mutex_concurrently_static | 10_000 | 100 | 24 | 358.480 ms | | increment_under_non_blocking_mutex_concurrently_dynamic | 10_000 | 100 | 24 | 378.230 ms | | increment_under_mutex_blockingly_concurrently | 10_000 | 100 | 24 | 801.090 ms | | increment_under_mutex_spinny_concurrently | 10_000 | 100 | 24 | 1200.400 ms | ## Design explanation First thread, which calls `NonBlockingMutex::run_if_first_or_schedule_on_first`, atomically increments `task_count`, and, if thread was first to increment `task_count` from 0 to 1, first thread immediately executes first task, and then atomically decrements `task_count` and checks if `task_count` changed from 1 to 0. If `task_count` changed from 1 to 0 - there are no more tasks and first thread can finish execution loop, otherwise first thread gets next task from `task_queue` and runs task, then decrements tasks count after it was run and repeats check if `task_count` changed from 1 to 0 and running tasks until there are no more tasks left. Not first threads also atomically increment `task_count`, do check if they are first, `Box` task and push task `Box` to `task_queue` This design allows us to avoid lock contention, but adds ~constant time of `Box`-ing task and putting task `Box` into concurrent `task_queue`, and incrementing and decrementing `task_count`, so when lock contention is low, `NonBlockingMutex` performs worse than `std::sync::Mutex`, but when contention is high (because we have more CPU-s or because we want to do expensive calculations under lock), `NonBlockingMutex` performs better than `std::sync::Mutex`