Skip to content

Commit

Permalink
4.x: Improve the logic of AsyncLock (#504)
Browse files Browse the repository at this point in the history
* 4.x: Improve the logic of AsyncLock

* Use guard object as the lock
  • Loading branch information
akarnokd authored and Oren Novotny committed May 26, 2018
1 parent 1a4c2f2 commit 9686dfe
Showing 1 changed file with 56 additions and 33 deletions.
89 changes: 56 additions & 33 deletions Rx.NET/Source/src/System.Reactive/Concurrency/AsyncLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ namespace System.Reactive.Concurrency
/// </summary>
public sealed class AsyncLock : IDisposable
{
private readonly Queue<Action> queue = new Queue<Action>();
private object guard = new object();
private Queue<Action> queue;
private bool isAcquired = false;
private bool hasFaulted = false;

Expand All @@ -27,50 +28,72 @@ public void Wait(Action action)
if (action == null)
throw new ArgumentNullException(nameof(action));

var isOwner = false;
lock (queue)
// allow one thread to update the state
lock (guard)
{
if (!hasFaulted)
// if a previous action crashed, ignore any future actions
if (hasFaulted)
{
queue.Enqueue(action);
isOwner = !isAcquired;
isAcquired = true;
return;
}

// if the "lock" is busy, queue up the extra work
// otherwise there is no need to queue up "action"
if (isAcquired)
{
// create the queue if necessary
var q = queue;
if (q == null)
{
q = new Queue<Action>();
queue = q;
}
// enqueue the work
q.Enqueue(action);
return;
}

// indicate there is processing going on
isAcquired = true;
}

if (isOwner)
// if we get here, execute the "action" first

for (; ; )
{
while (true)
try
{
var work = default(Action);
lock (queue)
action();
}
catch
{
// the execution failed, terminate this AsyncLock
lock (guard)
{
if (queue.Count > 0)
{
work = queue.Dequeue();
}
else
{
isAcquired = false;
break;
}
// throw away the queue
queue = null;
// report fault
hasFaulted = true;
}
throw;
}

try
// execution succeeded, let's see if more work has to be done
lock (guard)
{
var q = queue;
// either there is no queue yet or we run out of work
if (q == null || q.Count == 0)
{
work();
// release the lock
isAcquired = false;
return;
}
catch
{
lock (queue)
{
queue.Clear();
hasFaulted = true;
}

throw;
}
// get the next work action
action = q.Dequeue();
}
// loop back and execute the action
}
}

Expand All @@ -79,9 +102,9 @@ public void Wait(Action action)
/// </summary>
public void Dispose()
{
lock (queue)
lock (guard)
{
queue.Clear();
queue = null;
hasFaulted = true;
}
}
Expand Down

0 comments on commit 9686dfe

Please sign in to comment.