using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace VW { /// /// Interface to model simple bag. /// /// public interface IBag { /// /// Try add to this bag. /// /// The item to add. /// True if succesful, false otherwise. bool TryAdd(T item); /// /// Remove and return one item from this bag. /// /// The item removed from the bag or default(T) if there is no item available. T Remove(); /// /// Remove and return all items from this bag. /// /// The items removed from the gag. IEnumerable RemoveAll(); /// /// The number of items this bag contains. /// int Count { get; } } /// /// Factory class for various bag implementations. /// public static class Bag { /// /// Creates a simple bound or unbound, not thread-safe bag object. /// /// The type of the items. /// The maximum number of items this bag should hold. /// A new bag instance. public static IBag Create(int max = int.MaxValue) { return max == int.MaxValue ? (IBag)new BagImpl() : new BoundedBagImpl(max); } /// /// Creates an unbound thread-safe, lock free bag. /// /// The type of the items. /// A new bag instance. public static IBag CreateLockFree(int max = int.MaxValue) { return new LockFreeBagImpl(max); } private abstract class BaseBagImpl { protected readonly Stack stack; internal BaseBagImpl() { this.stack = new Stack(); } public T Remove() { return this.stack.Count == 0 ? default(T) : this.stack.Pop(); } public IEnumerable RemoveAll() { var ret = this.stack.ToArray(); this.stack.Clear(); return ret; } public int Count { get { return this.stack.Count; } } } private sealed class BagImpl : BaseBagImpl, IBag { public bool TryAdd(T item) { this.stack.Push(item); return true; } } private sealed class BoundedBagImpl : BaseBagImpl, IBag { private readonly int max; internal BoundedBagImpl(int max) { this.max = max; } public bool TryAdd(T item) { if (this.stack.Count >= this.max) return false; this.stack.Push(item); return true; } } /// /// This is a good read on performance: http://msdn.microsoft.com/en-us/concurrency/ee851578.aspx /// For streaming training we are seeking good performance for a single producer and multiple consumers. /// /// private sealed class LockFreeBagImpl : IBag { private readonly int max; private readonly ConcurrentQueue queue; private int count; internal LockFreeBagImpl(int max) { this.queue = new ConcurrentQueue(); this.max = max; } public bool TryAdd(T item) { if (this.count < this.max) { this.queue.Enqueue(item); Interlocked.Increment(ref this.count); return true; } return false; } public T Remove() { T result; if (this.queue.TryDequeue(out result)) { Interlocked.Decrement(ref this.count); return result; } return default(T); } public IEnumerable RemoveAll() { // TODO: violates the lock constraint. though this is just used at disposable time return this.queue; } public int Count { get { return this.count; } } } } }