Skip to content

Commit

Permalink
Fix order of observer and resource disposal of the Using and Finally …
Browse files Browse the repository at this point in the history
…operator, reported in #829
  • Loading branch information
quinmars committed Oct 17, 2018
1 parent 338741d commit df7763e
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 15 deletions.
18 changes: 8 additions & 10 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Finally.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ internal sealed class _ : IdentitySink<TSource>
{
private readonly Action _finallyAction;

private IDisposable _sourceDisposable;

public _(Action finallyAction, IObserver<TSource> observer)
: base(observer)
{
Expand All @@ -35,19 +33,19 @@ public _(Action finallyAction, IObserver<TSource> observer)

public override void Run(IObservable<TSource> source)
{
Disposable.SetSingle(ref _sourceDisposable, source.SubscribeSafe(this));
}
var subscription = source.SubscribeSafe(this);

protected override void Dispose(bool disposing)
{
if (disposing)
SetUpstream(Disposable.Create(() =>

This comment has been minimized.

Copy link
@akarnokd

akarnokd Oct 18, 2018

Collaborator

You have to implement the swap yourself to avoid allocation:

var d = Interlocked.Exchange(ref _sourceDisposable, BooleanDisposable.True);
if (d != BooleanDisposable.True) {
    try {
        d?.Dispose();
    } finally {
        _finallyAction();
    }
}
{
if (Disposable.TryDispose(ref _sourceDisposable))
try
{
subscription.Dispose();
}
finally
{
_finallyAction();
}
}
base.Dispose(disposing);
}));
}
}
}
Expand Down
14 changes: 9 additions & 5 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Using.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,33 +34,37 @@ public _(IObserver<TSource> observer)
public void Run(Using<TSource, TResource> parent)
{
var source = default(IObservable<TSource>);
var disposable = Disposable.Empty;
try
{
var resource = parent._resourceFactory();
if (resource != null)
{
Disposable.SetSingle(ref _disposable, resource);
disposable = resource;
}

source = parent._observableFactory(resource);
}
catch (Exception exception)
{
SetUpstream(Observable.Throw<TSource>(exception).SubscribeSafe(this));

return;
source = Observable.Throw<TSource>(exception);
}

// It is important to set the disposable resource after
// Run(). In the synchronous case this would else dispose
// the the resource before the source subscription.
Run(source);
Disposable.SetSingle(ref _disposable, disposable);
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);

if (disposing)
{
Disposable.TryDispose(ref _disposable);
}
base.Dispose(disposing);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using Microsoft.Reactive.Testing;
using ReactiveTests.Dummies;
Expand Down Expand Up @@ -295,5 +297,35 @@ public void Using_ThrowResourceUsage()
);
}

[Fact]
public void Using_NestedCompleted()
{
var order = "";

Observable.Using(() => Disposable.Create(() => order += "3"),
_ => Observable.Using(() => Disposable.Create(() => order += "2"),
__ => Observable.Using(() => Disposable.Create(() => order += "1"),
___ => Observable.Return(Unit.Default))))
.Finally(() => order += "4")
.Subscribe();

Assert.Equal("1234", order);
}

[Fact]
public void Using_NestedDisposed()
{
var order = "";

Observable.Using(() => Disposable.Create(() => order += "3"),
_ => Observable.Using(() => Disposable.Create(() => order += "2"),
__ => Observable.Using(() => Disposable.Create(() => order += "1"),
___ => Observable.Never<Unit>())))
.Finally(() => order += "4")
.Subscribe()
.Dispose();

Assert.Equal("1234", order);
}
}
}

0 comments on commit df7763e

Please sign in to comment.