diff --git a/Rx.NET/Source/src/System.Reactive/Linq/Observable/GroupByUntil.cs b/Rx.NET/Source/src/System.Reactive/Linq/Observable/GroupByUntil.cs index 0e48e92be2..cec104de22 100644 --- a/Rx.NET/Source/src/System.Reactive/Linq/Observable/GroupByUntil.cs +++ b/Rx.NET/Source/src/System.Reactive/Linq/Observable/GroupByUntil.cs @@ -141,9 +141,9 @@ public override void OnNext(TSource value) lock (_gate) ForwardOnNext(group); - var md = new SingleAssignmentDisposable(); - _groupDisposable.Add(md); - md.Disposable = duration.SubscribeSafe(new DurationObserver(this, key, writer, md)); + var durationObserver = new DurationObserver(this, key, writer); + _groupDisposable.Add(durationObserver); + durationObserver.SetResource(duration.SubscribeSafe(durationObserver)); } var element = default(TElement); @@ -178,33 +178,31 @@ public override void OnNext(TSource value) writer.OnNext(element); } - private sealed class DurationObserver : IObserver + private sealed class DurationObserver : SafeObserver { private readonly _ _parent; private readonly TKey _key; private readonly ISubject _writer; - private readonly IDisposable _self; - public DurationObserver(_ parent, TKey key, ISubject writer, IDisposable self) + public DurationObserver(_ parent, TKey key, ISubject writer) { _parent = parent; _key = key; _writer = writer; - _self = self; } - public void OnNext(TDuration value) + public override void OnNext(TDuration value) { OnCompleted(); } - public void OnError(Exception error) + public override void OnError(Exception error) { _parent.Error(error); - _self.Dispose(); + Dispose(); } - public void OnCompleted() + public override void OnCompleted() { if (_key == null) { @@ -225,7 +223,7 @@ public void OnCompleted() } } - _parent._groupDisposable.Remove(_self); + _parent._groupDisposable.Remove(this); } }