Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance AsyncLock to support passing state. #554

Merged
merged 1 commit into from
Jun 6, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions Rx.NET/Source/src/System.Reactive/Concurrency/AsyncLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ namespace System.Reactive.Concurrency
/// </summary>
public sealed class AsyncLock : IDisposable
{
private object guard = new object();
private Queue<Action> queue;
private bool isAcquired = false;
private bool hasFaulted = false;
private object guard = new object();
private Queue<(Action<Delegate, object> action, Delegate @delegate, object state)> queue;

/// <summary>
/// Queues the action for execution. If the caller acquires the lock and becomes the owner,
Expand All @@ -28,6 +28,28 @@ public void Wait(Action action)
if (action == null)
throw new ArgumentNullException(nameof(action));

Wait(action, closureAction => closureAction());
}

/// <summary>
/// Queues the action for execution. If the caller acquires the lock and becomes the owner,
/// the queue is processed. If the lock is already owned, the action is queued and will get
/// processed by the owner.
/// </summary>
/// <param name="action">Action to queue for execution.</param>
/// <exception cref="ArgumentNullException"><paramref name="action"/> is <c>null</c>.</exception>
/// <remarks>In case TState is a value type, this operation will involve boxing of <paramref name="state"/>.
/// However, this is often an improvement over the allocation of a closure object and a delegate.</remarks>
internal void Wait<TState>(TState state, Action<TState> action)
{
if (action == null)
throw new ArgumentNullException(nameof(action));

Wait(state, action, (actionObject, stateObject) => ((Action<TState>)actionObject)((TState)stateObject));
}

private void Wait(object state, Delegate @delegate, Action<Delegate, object> action)
{
// allow one thread to update the state
lock (guard)
{
Expand All @@ -45,11 +67,11 @@ public void Wait(Action action)
var q = queue;
if (q == null)
{
q = new Queue<Action>();
q = new Queue<(Action<Delegate, object> action, Delegate @delegate, object state)>();
queue = q;
}
// enqueue the work
q.Enqueue(action);
q.Enqueue((action, @delegate, state));
return;
}

Expand All @@ -63,7 +85,7 @@ public void Wait(Action action)
{
try
{
action();
action(@delegate, state);
}
catch
{
Expand Down Expand Up @@ -91,7 +113,7 @@ public void Wait(Action action)
}

// get the next work action
action = q.Dequeue();
(action, @delegate, state) = q.Dequeue();
}
// loop back and execute the action
}
Expand Down