Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Review GroupByUntil operator #656

Merged
merged 2 commits into from
Jun 26, 2018
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/GroupByUntil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ public override void OnNext(TSource value)
lock (_gate)
ForwardOnNext(group);

var md = new SingleAssignmentDisposable();
_groupDisposable.Add(md);
md.Disposable = duration.SubscribeSafe(new DurationObserver(this, key, writer, md));
var durationObserver = new DurationObserver(this, key, writer);
_groupDisposable.Add(durationObserver);
durationObserver.SetResource(duration.SubscribeSafe(durationObserver));
}

var element = default(TElement);
Expand Down Expand Up @@ -178,33 +178,31 @@ public override void OnNext(TSource value)
writer.OnNext(element);
}

private sealed class DurationObserver : IObserver<TDuration>
private sealed class DurationObserver : SafeObserver<TDuration>
{
private readonly _ _parent;
private readonly TKey _key;
private readonly ISubject<TElement> _writer;
private readonly IDisposable _self;

public DurationObserver(_ parent, TKey key, ISubject<TElement> writer, IDisposable self)
public DurationObserver(_ parent, TKey key, ISubject<TElement> writer)
{
_parent = parent;
_key = key;
_writer = writer;
_self = self;
}

public void OnNext(TDuration value)
public override void OnNext(TDuration value)
{
OnCompleted();
}

public void OnError(Exception error)
public override void OnError(Exception error)
{
_parent.Error(error);
_self.Dispose();
Dispose();
}

public void OnCompleted()
public override void OnCompleted()
{
if (_key == null)
{
Expand All @@ -225,7 +223,7 @@ public void OnCompleted()
}
}

_parent._groupDisposable.Remove(_self);
_parent._groupDisposable.Remove(this);
}
}

Expand Down