Skip to content

Commit

Permalink
Use a dedicated implementation of IObserver instead of passing three …
Browse files Browse the repository at this point in the history
…delegates to Subscribe.
  • Loading branch information
danielcweber committed Oct 2, 2018
1 parent ab2e319 commit 33c85e0
Showing 1 changed file with 62 additions and 33 deletions.
95 changes: 62 additions & 33 deletions Rx.NET/Source/src/System.Reactive/EventPatternSourceBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,66 @@ namespace System.Reactive
/// <typeparam name="TEventArgs">The type of the event data generated by the event.</typeparam>
public abstract class EventPatternSourceBase<TSender, TEventArgs>
{
private sealed class Observer : ObserverBase<EventPattern<TSender, TEventArgs>>, ISafeObserver<EventPattern<TSender, TEventArgs>>
{
private bool _isDone;
private bool _isAdded;
private readonly Delegate _handler;
private readonly object _gate = new object();
private readonly Action<TSender, TEventArgs> _invoke;
private readonly EventPatternSourceBase<TSender, TEventArgs> _sourceBase;

public Observer(EventPatternSourceBase<TSender, TEventArgs> sourceBase, Delegate handler, Action<TSender, TEventArgs> invoke)
{
_handler = handler;
_invoke = invoke;
_sourceBase = sourceBase;
}

protected override void OnNextCore(EventPattern<TSender, TEventArgs> value)
{
_sourceBase._invokeHandler(_invoke, value);
}

protected override void OnErrorCore(Exception error)
{
Remove();
error.Throw();
}

protected override void OnCompletedCore()
{
Remove();
}

private void Remove()
{
lock (_gate)
{
if (_isAdded)
{
_sourceBase.Remove(_handler);
}
else
{
_isDone = true;
}
}
}

public void SetResource(IDisposable resource)
{
lock (_gate)
{
if (!_isDone)
{
_sourceBase.Add(_handler, resource);
_isAdded = true;
}
}
}
}

private readonly IObservable<EventPattern<TSender, TEventArgs>> _source;
private readonly Dictionary<Delegate, Stack<IDisposable>> _subscriptions;
private readonly Action<Action<TSender, TEventArgs>, /*object,*/ EventPattern<TSender, TEventArgs>> _invokeHandler;
Expand Down Expand Up @@ -50,42 +110,11 @@ protected void Add(Delegate handler, Action<TSender, TEventArgs> invoke)
throw new ArgumentNullException(nameof(invoke));
}

var gate = new object();
var isAdded = false;
var isDone = false;

var remove = new Action(() =>
{
lock (gate)
{
if (isAdded)
{
Remove(handler);
}
else
{
isDone = true;
}
}
});

var observer = new Observer(this, handler, invoke);
//
// [OK] Use of unsafe Subscribe: non-pretentious wrapper of an observable in an event; exceptions can occur during +=.
//
var d = _source.Subscribe/*Unsafe*/(
x => _invokeHandler(invoke, /*this,*/ x),
ex => { remove(); ex.Throw(); },
remove
);

lock (gate)
{
if (!isDone)
{
Add(handler, d);
isAdded = true;
}
}
observer.SetResource(_source.Subscribe(observer));
}

private void Add(Delegate handler, IDisposable disposable)
Expand Down

0 comments on commit 33c85e0

Please sign in to comment.