Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.x: Fix Delay hanging due to wrong disposable-chain management #560

Merged
merged 3 commits into from
Jun 1, 2018
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 47 additions & 24 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public _(IObserver<TSource> observer, IDisposable cancel)
internal abstract class S : _
{
protected readonly object _gate = new object();
protected readonly SerialDisposable _cancelable = new SerialDisposable();
protected IDisposable _cancelable;

protected readonly IScheduler _scheduler;

Expand Down Expand Up @@ -74,22 +74,31 @@ public override IDisposable Run(TParent parent)

RunCore(parent);

var sourceSubscription = new SingleAssignmentDisposable();
_sourceSubscription = sourceSubscription;
sourceSubscription.Disposable = parent._source.SubscribeSafe(this);
Disposable.SetSingle(ref _sourceSubscription, parent._source.SubscribeSafe(this));

return StableCompositeDisposable.Create(_sourceSubscription, _cancelable);
return this;
}

protected override void Dispose(bool disposing)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move those two disposables and Dispose into the base class?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class is ugly, didn't want to accidentally break it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be good for now, might be an optimization PR for later.

{
if (disposing)
{
Disposable.TryDispose(ref _sourceSubscription);
Disposable.TryDispose(ref _cancelable);
}
base.Dispose(disposing);
}

protected abstract void RunCore(TParent parent);

public override void OnNext(TSource value)
{
var next = _watch.Elapsed.Add(_delay);
var shouldRun = false;

lock (_gate)
{
var next = _watch.Elapsed.Add(_delay);

_queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, next));

shouldRun = _ready && !_active;
Expand All @@ -98,13 +107,13 @@ public override void OnNext(TSource value)

if (shouldRun)
{
_cancelable.Disposable = _scheduler.Schedule(_delay, DrainQueue);
Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule(_delay, DrainQueue));
}
}

public override void OnError(Exception error)
{
_sourceSubscription.Dispose();
Disposable.TryDispose(ref _sourceSubscription);

var shouldRun = false;

Expand All @@ -126,13 +135,14 @@ public override void OnError(Exception error)

public override void OnCompleted()
{
_sourceSubscription.Dispose();
Disposable.TryDispose(ref _sourceSubscription);

var next = _watch.Elapsed.Add(_delay);
var shouldRun = false;

lock (_gate)
{
var next = _watch.Elapsed.Add(_delay);

_completeAt = next;
_hasCompleted = true;

Expand All @@ -142,7 +152,7 @@ public override void OnCompleted()

if (shouldRun)
{
_cancelable.Disposable = _scheduler.Schedule(_delay, DrainQueue);
Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule(_delay, DrainQueue));
}
}

Expand Down Expand Up @@ -256,7 +266,7 @@ protected void DrainQueue(Action<TimeSpan> recurse)
protected abstract class L : _
{
protected readonly object _gate = new object();
protected readonly SerialDisposable _cancelable = new SerialDisposable();
protected IDisposable _cancelable;
private readonly SemaphoreSlim _evt = new SemaphoreSlim(0);

private readonly IScheduler _scheduler;
Expand Down Expand Up @@ -291,29 +301,38 @@ public override IDisposable Run(TParent parent)

RunCore(parent);

var sourceSubscription = new SingleAssignmentDisposable();
_sourceSubscription = sourceSubscription;
sourceSubscription.Disposable = parent._source.SubscribeSafe(this);
Disposable.SetSingle(ref _sourceSubscription, parent._source.SubscribeSafe(this));

return StableCompositeDisposable.Create(_sourceSubscription, _cancelable);
return this;
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
Disposable.TryDispose(ref _sourceSubscription);
Disposable.TryDispose(ref _cancelable);
}
base.Dispose(disposing);
}

protected abstract void RunCore(TParent parent);

protected void ScheduleDrain()
{
_stop = new CancellationTokenSource();
_cancelable.Disposable = Disposable.Create(_stop.Cancel);
Disposable.TrySetSerial(ref _cancelable, Disposable.Create(_stop.Cancel));

_scheduler.AsLongRunning().ScheduleLongRunning(DrainQueue);
}

public override void OnNext(TSource value)
{
var next = _watch.Elapsed.Add(_delay);

lock (_gate)
{
var next = _watch.Elapsed.Add(_delay);

_queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, next));

_evt.Release();
Expand All @@ -322,7 +341,7 @@ public override void OnNext(TSource value)

public override void OnError(Exception error)
{
_sourceSubscription.Dispose();
Disposable.TryDispose(ref _sourceSubscription);

lock (_gate)
{
Expand All @@ -337,12 +356,13 @@ public override void OnError(Exception error)

public override void OnCompleted()
{
_sourceSubscription.Dispose();
Disposable.TryDispose(ref _sourceSubscription);

var next = _watch.Elapsed.Add(_delay);

lock (_gate)
{
var next = _watch.Elapsed.Add(_delay);

_completeAt = next;
_hasCompleted = true;

Expand Down Expand Up @@ -473,7 +493,7 @@ protected override void RunCore(Absolute parent)
{
_ready = false;

_cancelable.Disposable = parent._scheduler.Schedule(parent._dueTime, Start);
Disposable.TrySetSingle(ref _cancelable, parent._scheduler.Schedule(parent._dueTime, Start));
}

private void Start()
Expand Down Expand Up @@ -507,7 +527,7 @@ private void Start()

if (shouldRun)
{
_cancelable.Disposable = _scheduler.Schedule(next, DrainQueue);
Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule(next, DrainQueue));
}
}
}
Expand All @@ -521,7 +541,10 @@ public L(Absolute parent, IObserver<TSource> observer, IDisposable cancel)

protected override void RunCore(Absolute parent)
{
_cancelable.Disposable = parent._scheduler.Schedule(parent._dueTime, Start);
// ScheduleDrain might have already set a newer disposable
// using TrySetSerial would cancel it, stopping the emission
// and hang the consumer
Disposable.TrySetSingle(ref _cancelable, parent._scheduler.Schedule(parent._dueTime, Start));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_cancelable was as SerialDisposable before, even if this works, consider TrySetSerial throughout to reveal intention more consistently.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was the bug: _cancelable was SerialDisposable and SerialDisposable always forcefully overwrites and disposes the previous IDisposable it contains.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, alright.

}

private void Start()
Expand Down