Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.x: Improve the logic of AsyncLock #504

Merged
merged 2 commits into from
May 26, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 56 additions & 33 deletions Rx.NET/Source/src/System.Reactive/Concurrency/AsyncLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ namespace System.Reactive.Concurrency
/// </summary>
public sealed class AsyncLock : IDisposable
{
private readonly Queue<Action> queue = new Queue<Action>();
private object guard = new object();
private Queue<Action> queue;
private bool isAcquired = false;
private bool hasFaulted = false;

Expand All @@ -27,50 +28,72 @@ public void Wait(Action action)
if (action == null)
throw new ArgumentNullException(nameof(action));

var isOwner = false;
lock (queue)
// allow one thread to update the state
lock (guard)
{
if (!hasFaulted)
// if a previous action crashed, ignore any future actions
if (hasFaulted)
{
queue.Enqueue(action);
isOwner = !isAcquired;
isAcquired = true;
return;
}

// if the "lock" is busy, queue up the extra work
// otherwise there is no need to queue up "action"
if (isAcquired)
{
// create the queue if necessary
var q = queue;
if (q == null)
{
q = new Queue<Action>();
queue = q;
}
// enqueue the work
q.Enqueue(action);
return;
}

// indicate there is processing going on
isAcquired = true;
}

if (isOwner)
// if we get here, execute the "action" first

for (; ; )
{
while (true)
try
{
var work = default(Action);
lock (queue)
action();
}
catch
{
// the execution failed, terminate this AsyncLock
lock (guard)
{
if (queue.Count > 0)
{
work = queue.Dequeue();
}
else
{
isAcquired = false;
break;
}
// throw away the queue
queue = null;
// report fault
hasFaulted = true;
}
throw;
}

try
// execution succeeded, let's see if more work has to be done
lock (guard)
{
var q = queue;
// either there is no queue yet or we run out of work
if (q == null || q.Count == 0)
{
work();
// release the lock
isAcquired = false;
return;
}
catch
{
lock (queue)
{
queue.Clear();
hasFaulted = true;
}

throw;
}
// get the next work action
action = q.Dequeue();
}
// loop back and execute the action
}
}

Expand All @@ -79,9 +102,9 @@ public void Wait(Action action)
/// </summary>
public void Dispose()
{
lock (queue)
lock (guard)
{
queue.Clear();
queue = null;
hasFaulted = true;
}
}
Expand Down