Skip to content

Commit

Permalink
Merge pull request #820 from danielcweber/ReviewEventPatternSourceBase
Browse files Browse the repository at this point in the history
Review EventPatternSourceBase
  • Loading branch information
danielcweber authored Oct 2, 2018
2 parents 9a6e8b2 + 33c85e0 commit 4d72bb1
Showing 1 changed file with 64 additions and 37 deletions.
101 changes: 64 additions & 37 deletions Rx.NET/Source/src/System.Reactive/EventPatternSourceBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,66 @@ namespace System.Reactive
/// <typeparam name="TEventArgs">The type of the event data generated by the event.</typeparam>
public abstract class EventPatternSourceBase<TSender, TEventArgs>
{
private sealed class Observer : ObserverBase<EventPattern<TSender, TEventArgs>>, ISafeObserver<EventPattern<TSender, TEventArgs>>
{
private bool _isDone;
private bool _isAdded;
private readonly Delegate _handler;
private readonly object _gate = new object();
private readonly Action<TSender, TEventArgs> _invoke;
private readonly EventPatternSourceBase<TSender, TEventArgs> _sourceBase;

public Observer(EventPatternSourceBase<TSender, TEventArgs> sourceBase, Delegate handler, Action<TSender, TEventArgs> invoke)
{
_handler = handler;
_invoke = invoke;
_sourceBase = sourceBase;
}

protected override void OnNextCore(EventPattern<TSender, TEventArgs> value)
{
_sourceBase._invokeHandler(_invoke, value);
}

protected override void OnErrorCore(Exception error)
{
Remove();
error.Throw();
}

protected override void OnCompletedCore()
{
Remove();
}

private void Remove()
{
lock (_gate)
{
if (_isAdded)
{
_sourceBase.Remove(_handler);
}
else
{
_isDone = true;
}
}
}

public void SetResource(IDisposable resource)
{
lock (_gate)
{
if (!_isDone)
{
_sourceBase.Add(_handler, resource);
_isAdded = true;
}
}
}
}

private readonly IObservable<EventPattern<TSender, TEventArgs>> _source;
private readonly Dictionary<Delegate, Stack<IDisposable>> _subscriptions;
private readonly Action<Action<TSender, TEventArgs>, /*object,*/ EventPattern<TSender, TEventArgs>> _invokeHandler;
Expand Down Expand Up @@ -50,50 +110,18 @@ protected void Add(Delegate handler, Action<TSender, TEventArgs> invoke)
throw new ArgumentNullException(nameof(invoke));
}

var gate = new object();
var isAdded = false;
var isDone = false;

var remove = new Action(() =>
{
lock (gate)
{
if (isAdded)
{
Remove(handler);
}
else
{
isDone = true;
}
}
});

var observer = new Observer(this, handler, invoke);
//
// [OK] Use of unsafe Subscribe: non-pretentious wrapper of an observable in an event; exceptions can occur during +=.
//
var d = _source.Subscribe/*Unsafe*/(
x => _invokeHandler(invoke, /*this,*/ x),
ex => { remove(); ex.Throw(); },
remove
);

lock (gate)
{
if (!isDone)
{
Add(handler, d);
isAdded = true;
}
}
observer.SetResource(_source.Subscribe(observer));
}

private void Add(Delegate handler, IDisposable disposable)
{
lock (_subscriptions)
{
var l = new Stack<IDisposable>();
if (!_subscriptions.TryGetValue(handler, out l))
if (!_subscriptions.TryGetValue(handler, out var l))
{
_subscriptions[handler] = l = new Stack<IDisposable>();
}
Expand All @@ -118,8 +146,7 @@ protected void Remove(Delegate handler)

lock (_subscriptions)
{
var l = new Stack<IDisposable>();
if (_subscriptions.TryGetValue(handler, out l))
if (_subscriptions.TryGetValue(handler, out var l))
{
d = l.Pop();

Expand Down

0 comments on commit 4d72bb1

Please sign in to comment.