diff --git a/Rx.NET/Source/src/System.Reactive/Concurrency/AsyncLock.cs b/Rx.NET/Source/src/System.Reactive/Concurrency/AsyncLock.cs index 4f6f918cf4..7852bf76b7 100644 --- a/Rx.NET/Source/src/System.Reactive/Concurrency/AsyncLock.cs +++ b/Rx.NET/Source/src/System.Reactive/Concurrency/AsyncLock.cs @@ -11,7 +11,8 @@ namespace System.Reactive.Concurrency /// public sealed class AsyncLock : IDisposable { - private readonly Queue queue = new Queue(); + private object guard = new object(); + private Queue queue; private bool isAcquired = false; private bool hasFaulted = false; @@ -27,50 +28,72 @@ public void Wait(Action action) if (action == null) throw new ArgumentNullException(nameof(action)); - var isOwner = false; - lock (queue) + // allow one thread to update the state + lock (guard) { - if (!hasFaulted) + // if a previous action crashed, ignore any future actions + if (hasFaulted) { - queue.Enqueue(action); - isOwner = !isAcquired; - isAcquired = true; + return; } + + // if the "lock" is busy, queue up the extra work + // otherwise there is no need to queue up "action" + if (isAcquired) + { + // create the queue if necessary + var q = queue; + if (q == null) + { + q = new Queue(); + queue = q; + } + // enqueue the work + q.Enqueue(action); + return; + } + + // indicate there is processing going on + isAcquired = true; } - if (isOwner) + // if we get here, execute the "action" first + + for (; ; ) { - while (true) + try { - var work = default(Action); - lock (queue) + action(); + } + catch + { + // the execution failed, terminate this AsyncLock + lock (guard) { - if (queue.Count > 0) - { - work = queue.Dequeue(); - } - else - { - isAcquired = false; - break; - } + // throw away the queue + queue = null; + // report fault + hasFaulted = true; } + throw; + } - try + // execution succeeded, let's see if more work has to be done + lock (guard) + { + var q = queue; + // either there is no queue yet or we run out of work + if (q == null || q.Count == 0) { - work(); + // release the lock + isAcquired = false; + return; } - catch - { - lock (queue) - { - queue.Clear(); - hasFaulted = true; - } - throw; - } + // get the next work action + action = q.Dequeue(); } + // loop back and execute the action } } @@ -79,9 +102,9 @@ public void Wait(Action action) /// public void Dispose() { - lock (queue) + lock (guard) { - queue.Clear(); + queue = null; hasFaulted = true; } }