# orx-concurrent-bag [![orx-concurrent-bag crate](https://img.shields.io/crates/v/orx-concurrent-bag.svg)](https://crates.io/crates/orx-concurrent-bag) [![orx-concurrent-bag documentation](https://docs.rs/orx-concurrent-bag/badge.svg)](https://docs.rs/orx-concurrent-bag) An efficient, convenient and lightweight grow-only concurrent data structure allowing high performance concurrent collection. * **convenient**: `ConcurrentBag` can safely be shared among threads simply as a shared reference. It is a [`PinnedConcurrentCol`](https://crates.io/crates/orx-pinned-concurrent-col) with a special concurrent state implementation. Underlying [`PinnedVec`](https://crates.io/crates/orx-pinned-vec) and concurrent bag can be converted back and forth to each other. * **efficient**: `ConcurrentBag` is a lock free structure suitable for concurrent, copy-free and high performance growth. You may see benchmarks and further performance notes for details. ## Examples Underlying `PinnedVec` guarantees make it straightforward to safely grow with a shared reference which leads to a convenient api as demonstrated below. ```rust use orx_concurrent_bag::*; let bag = ConcurrentBag::new(); let (num_threads, num_items_per_thread) = (4, 1_024); let bag_ref = &bag; std::thread::scope(|s| { for i in 0..num_threads { s.spawn(move || { for j in 0..num_items_per_thread { // concurrently collect results simply by calling `push` bag_ref.push(i * 1000 + j); } }); } }); let mut vec_from_bag = bag.into_inner().to_vec(); vec_from_bag.sort(); let mut expected: Vec<_> = (0..num_threads).flat_map(|i| (0..num_items_per_thread).map(move |j| i * 1000 + j)).collect(); expected.sort(); assert_eq!(vec_from_bag, expected); ``` ## Concurrent State and Properties The concurrent state is modeled simply by an atomic length. Combination of this state and `PinnedConcurrentCol` leads to the following properties: * Writing to a position of the collection does not block other writes, multiple writes can happen concurrently. * Each position is written exactly once. * ⟹ no write & write race condition exists. * Only one growth can happen at a given time. Growth is copy-free and does not change memory locations of already pushed elements. * Underlying pinned vector is always valid and can be taken out any time by `into_inner(self)`. * Reading is only possible after converting the bag into the underlying `PinnedVec`. * ⟹ no read & write race condition exists.
## Benchmarks ### Performance with `push` *You may find the details of the benchmarks at [benches/collect_with_push.rs](https://github.com/orxfun/orx-concurrent-bag/blob/main/benches/collect_with_push.rs).* In the experiment, `rayon`s parallel iterator, and push methods of `AppendOnlyVec`, `boxcar::Vec` and `ConcurrentBag` are used to collect results from multiple threads. Further, different underlying pinned vectors of the `ConcurrentBag` are evaluated. ```rust ignore // reserve and push one position at a time for j in 0..num_items_per_thread { bag.push(i * 1000 + j); } ``` We observe that `ConcurrentBag` allows for a highly efficient concurrent collection of elements: * The default `Doubling` growth strategy of the concurrent bag, which is the most flexible as it does not require any prior knowledge, already seems to outperform the alternatives. Hence, it can be used in most situations. * `Linear` growth strategy requires one argument determining the uniform fragment capacity of the underlying `SplitVec`. This strategy might be preferred whenever we would like to be more conservative about allocation. Recall that capacity of `Doubling`, similar to the standard Vec, grows exponentially; while as the name suggests `Linear` grows linearly. * Finally, `Fixed` growth strategy is the least flexible and requires perfect knowledge about the hard-constrained capacity (will panic if we exceed). Since it does not outperform `Doubling` or `Linear`, we do not necessarily required to use `Fixed` except for the rare cases where we want to allocate exactly the required memory that we know beforehand. The performance can further be improved by using `extend` method instead of `push`. You may see results in the next subsection and details in the performance notes. ### Performance with `extend` *You may find the details of the benchmarks at [benches/collect_with_extend.rs](https://github.com/orxfun/orx-concurrent-bag/blob/main/benches/collect_with_extend.rs).* The only difference in this follow up experiment is that we use `extend` rather than `push` with `ConcurrentBag`. The expectation is that this approach will solve the performance degradation due to false sharing in the *small data & little work* situation. In a perfectly homogeneous scenario, we can evenly share the work to threads as follows. ```rust ignore // reserve num_items_per_thread positions at a time // and then push as the iterator yields let iter = (0..num_items_per_thread).map(|j| i * 100000 + j); bag.extend(iter); ``` However, we do not need to have perfect homogeneity or perfect information on the number of items to be pushed per thread to get the benefits of `extend`. We can simply `step_by` and extend by `batch_size` elements. A large enough `batch_size` so that batch size elements exceed a cache line would be sufficient to prevent the dramatic performance degradation of false sharing. ```rust ignore // reserve batch_size positions at each iteration // and then push as the iterator yields for j in (0..num_items_per_thread).step_by(batch_size) { let iter = (j..(j + batch_size)).map(|j| i * 100000 + j); bag.extend(iter); } ``` Although concurrent collection via `ConcurrentBag::push` is highly efficient, collection with `ConcurrentBag::extend` certainly needs to be considered whenever possible as it changes the scale. As the graph below demonstrates, collection in batches of only 64 elements while collecting tens of thousands of elements provides orders of magnitudes of improvement. ## Concurrent Friend Collections ||[`ConcurrentBag`](https://crates.io/crates/orx-concurrent-bag)|[`ConcurrentVec`](https://crates.io/crates/orx-concurrent-vec)|[`ConcurrentOrderedBag`](https://crates.io/crates/orx-concurrent-ordered-bag)| |---|---|---|---| | Write | Guarantees that each element is written exactly once via `push` or `extend` methods | Guarantees that each element is written exactly once via `push` or `extend` methods | Different in two ways. First, a position can be written multiple times. Second, an arbitrary element of the bag can be written at any time at any order using `set_value` and `set_values` methods. This provides a great flexibility while moving the safety responsibility to the caller; hence, the set methods are `unsafe`. | | Read | Mainly, a write-only collection. Concurrent reading of already pushed elements is through `unsafe` `get` and `iter` methods. The caller is required to avoid race conditions. | A write-and-read collection. Already pushed elements can safely be read through `get` and `iter` methods. | Not supported currently. Due to the flexible but unsafe nature of write operations, it is difficult to provide required safety guarantees as a caller. | | Ordering of Elements | Since write operations are through adding elements to the end of the pinned vector via `push` and `extend`, two multi-threaded executions of a code that collects elements into the bag might result in the elements being collected in different orders. | Since write operations are through adding elements to the end of the pinned vector via `push` and `extend`, two multi-threaded executions of a code that collects elements into the bag might result in the elements being collected in different orders. | This is the main goal of this collection, allowing to collect elements concurrently and in the correct order. Although this does not seem trivial; it can be achieved almost trivially when `ConcurrentOrderedBag` is used together with a [`ConcurrentIter`](https://crates.io/crates/orx-concurrent-iter). | | `into_inner` | Once the concurrent collection is completed, the bag can safely and cheaply be converted to its underlying `PinnedVec