/// /// Common part of a cown that is independent of the data the cown is storing. /// /// /// Just contains a pointer to the last behaviour's request for this cown. /// class CownBase : StableOrder { /// /// Points to the end of the queue of requests for this cown. /// /// /// If it is null, then the cown is not currently in use by /// any behaviour. /// volatile internal Request? last = null; } /// /// Behaviour that cpatures the content of a when body. /// /// /// It contains all the state required to run the body, and release the /// cowns when the body has finished. /// class Behaviour { /// /// The body of the behaviour. /// Action thunk; /// /// How many requests are outstanding for the behaviour. /// int count; /// /// The set of requests for this behaviour. /// /// /// This is used to release the cowns to the subseqeuent behaviours. /// Request[] requests; /// /// Creates and schedules a new Behaviour. /// internal Behaviour(Action t, params CownBase[] cowns) { thunk = t; // We add an additional count, so that the 2PL is finished // before we start running the thunk. Without this, the calls to // Release at the end of the thunk could race with the calls to // FinishEnqueue in the 2PL. count = cowns.Count() + 1; Array.Sort(cowns); requests = new Request[cowns.Length]; for (int i = 0; i < cowns.Length; i++) { requests[i] = new Request(cowns[i]); } } /// /// Schedules the behaviour. /// /// /// Performs two phase locking (2PL) over the enqueuing of the requests. /// This ensures that the overall effect of the enqueue is atomic. /// internal void Schedule() { // Complete first phase of 2PL enqueuing on all cowns. foreach (var r in requests) { r.StartEnqueue(this); } // Complete second phase of 2PL enqueuing on all cowns. foreach (var r in requests) { r.FinishEnqueue(); } // Resolve the additional request. [See comment in the Constructor] // All the cowns may already be resolved, in which case, this will // schedule the task. ResolveOne(); // Prevent runtime exiting until this has run. Terminator.Increment(); } /// /// Resolves a single outstanding request for this behaviour. /// /// /// Called when a request is at the head of the queue for a particular cown. /// If this is the last request, then the thunk is scheduled. /// internal void ResolveOne() { if (Interlocked.Decrement(ref count) != 0) return; // Last request so schedule the task. Task.Run(() => { // Run body. thunk(); // Release all the cowns. foreach (var r in requests) { r.Release(); } Terminator.Decrement(); } ); } } /// /// A request for a cown. /// /// /// This is used to wait for a cown to be available for a particular behaviour. /// class Request { /// Pointer to the next behaviour in the queue. volatile Behaviour? next = null; /// /// Flag to indicate the associated behaviour to this request has been /// scheduled /// volatile bool scheduled = false; /// /// The cown that this request is for. /// CownBase target; public Request(CownBase t) { target = t; } /// /// Release the cown to the next behaviour. /// /// /// This is called when the associated behaviour has completed, and thus can /// allow any waiting behaviour to run. /// /// If there is no next behaviour, then the cown's `last` pointer is set to null. /// internal void Release() { // This code is effectively a MCS-style queue lock release. if (next == null) { if (Interlocked.CompareExchange(ref target.last, null, this) == this) { return; } // Wait for the next pointer to be set. The target.last is no longer us // so this should not take long. var w = new SpinWait(); while (next == null) { w.SpinOnce(); } } next.ResolveOne(); } /// /// Start the first phase of the 2PL enqueue operation. /// /// /// This enqueues the request onto the cown. It will only return /// once any previous behaviour on this cown has finished enqueueing /// on all its required cowns. This ensures that the 2PL is obeyed. /// internal void StartEnqueue(Behaviour behaviour) { var prev = Interlocked.Exchange(ref target.last, this); if (prev == null) { behaviour.ResolveOne(); return; } prev.next = behaviour; var w = new SpinWait(); while (!prev.scheduled) { w.SpinOnce(); } } /// /// Finish the second phase of the 2PL enqueue operation. /// /// /// This will set the scheduled flag, so subsequent behaviours on this /// cown can continue the 2PL enqueue. /// internal void FinishEnqueue() { scheduled = true; } } /// /// Cown that wraps a value. The value should only be accessed inside /// a when() block. /// /// The type that is wrapped by the cown. class Cown : CownBase { internal T value; public Cown(T v) { value = v; } } /// /// This class proviated the when() function for various arities. /// class When { public static Action when() { return f => new Behaviour(f).Schedule(); } public static Action> when(Cown t) { return f => { var thunk = () => f(t.value); new Behaviour(thunk, t).Schedule(); }; } public static Action> when(Cown t1, Cown t2) { return (f) => { var thunk = () => f(t1.value, t2.value); new Behaviour(thunk, t1, t2).Schedule(); }; } public static Action> when(Cown t1, Cown t2, Cown t3) { return (f) => { var thunk = () => f(t1.value, t2.value, t3.value); new Behaviour(thunk, t1, t2, t3).Schedule(); }; } }