Skip to content

Commit

Permalink
Enhance AsyncLock to support passing state. (#554)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielcweber authored Jun 6, 2018
1 parent 09bf2c8 commit f4f6a62
Showing 1 changed file with 28 additions and 6 deletions.
34 changes: 28 additions & 6 deletions Rx.NET/Source/src/System.Reactive/Concurrency/AsyncLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ namespace System.Reactive.Concurrency
/// </summary>
public sealed class AsyncLock : IDisposable
{
private object guard = new object();
private Queue<Action> queue;
private bool isAcquired = false;
private bool hasFaulted = false;
private object guard = new object();
private Queue<(Action<Delegate, object> action, Delegate @delegate, object state)> queue;

/// <summary>
/// Queues the action for execution. If the caller acquires the lock and becomes the owner,
Expand All @@ -28,6 +28,28 @@ public void Wait(Action action)
if (action == null)
throw new ArgumentNullException(nameof(action));

Wait(action, closureAction => closureAction());
}

/// <summary>
/// Queues the action for execution. If the caller acquires the lock and becomes the owner,
/// the queue is processed. If the lock is already owned, the action is queued and will get
/// processed by the owner.
/// </summary>
/// <param name="action">Action to queue for execution.</param>
/// <exception cref="ArgumentNullException"><paramref name="action"/> is <c>null</c>.</exception>
/// <remarks>In case TState is a value type, this operation will involve boxing of <paramref name="state"/>.
/// However, this is often an improvement over the allocation of a closure object and a delegate.</remarks>
internal void Wait<TState>(TState state, Action<TState> action)
{
if (action == null)
throw new ArgumentNullException(nameof(action));

Wait(state, action, (actionObject, stateObject) => ((Action<TState>)actionObject)((TState)stateObject));
}

private void Wait(object state, Delegate @delegate, Action<Delegate, object> action)
{
// allow one thread to update the state
lock (guard)
{
Expand All @@ -45,11 +67,11 @@ public void Wait(Action action)
var q = queue;
if (q == null)
{
q = new Queue<Action>();
q = new Queue<(Action<Delegate, object> action, Delegate @delegate, object state)>();
queue = q;
}
// enqueue the work
q.Enqueue(action);
q.Enqueue((action, @delegate, state));
return;
}

Expand All @@ -63,7 +85,7 @@ public void Wait(Action action)
{
try
{
action();
action(@delegate, state);
}
catch
{
Expand Down Expand Up @@ -91,7 +113,7 @@ public void Wait(Action action)
}

// get the next work action
action = q.Dequeue();
(action, @delegate, state) = q.Dequeue();
}
// loop back and execute the action
}
Expand Down

0 comments on commit f4f6a62

Please sign in to comment.