Skip to content

Commit

Permalink
2.x: TestObserver shouldn't clear the upstream disposable on terminated
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Nov 22, 2016
1 parent 37bde8c commit d7e8e4f
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 15 deletions.
4 changes: 0 additions & 4 deletions src/main/java/io/reactivex/observers/TestObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,6 @@ public void onError(Throwable t) {
}

actual.onError(t);

subscription.lazySet(DisposableHelper.DISPOSED);
} finally {
done.countDown();
}
Expand All @@ -194,8 +192,6 @@ public void onComplete() {
completions++;

actual.onComplete();

subscription.lazySet(DisposableHelper.DISPOSED);
} finally {
done.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
package io.reactivex.internal.operators.completable;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Test;
import static org.junit.Assert.*;
import org.junit.*;

import io.reactivex.*;
import io.reactivex.exceptions.*;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.*;
import io.reactivex.observers.TestObserver;

public class CompletableDoOnTest {
Expand Down Expand Up @@ -52,4 +54,24 @@ public void accept(Throwable e) throws Exception {
TestHelper.assertError(errors, 0, TestException.class, "Outer");
TestHelper.assertError(errors, 1, TestException.class, "Inner");
}

@Test
public void doOnDisposeCalled() {
final AtomicBoolean atomicBoolean = new AtomicBoolean();

assertFalse(atomicBoolean.get());

Completable.complete()
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
atomicBoolean.set(true);
}
})
.test()
.assertResult()
.dispose();

assertTrue(atomicBoolean.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ public void testUntilFires() {

assertFalse("Source still has observers", source.hasObservers());
assertFalse("Until still has observers", until.hasObservers());
assertTrue("Not cancelled!", ts.isCancelled());
// 2.0.2 - not anymore
// assertTrue("Not cancelled!", ts.isCancelled());
}
@Test
public void testMainCompletes() {
Expand All @@ -228,7 +229,8 @@ public void testMainCompletes() {

assertFalse("Source still has observers", source.hasObservers());
assertFalse("Until still has observers", until.hasObservers());
assertTrue("Not cancelled!", ts.isCancelled());
// 2.0.2 - not anymore
// assertTrue("Not cancelled!", ts.isCancelled());
}
@Test
public void testDownstreamUnsubscribes() {
Expand All @@ -250,7 +252,8 @@ public void testDownstreamUnsubscribes() {

assertFalse("Source still has observers", source.hasObservers());
assertFalse("Until still has observers", until.hasObservers());
assertTrue("Not cancelled!", ts.isCancelled());
// 2.0.2 - not anymore
// assertTrue("Not cancelled!", ts.isCancelled());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ public boolean test(Integer t1) {
ts.assertNoErrors();
ts.assertValue(1);

Assert.assertTrue("Not cancelled!", ts.isCancelled());
// 2.0.2 - not anymore
// Assert.assertTrue("Not cancelled!", ts.isCancelled());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,9 @@ public Observable<String> call() {
TestObserver<Observable<Integer>> ts = new TestObserver<Observable<Integer>>();
source.window(boundary).subscribe(ts);

assertTrue("Not cancelled!", ts.isCancelled());
// 2.0.2 - not anymore
// assertTrue("Not cancelled!", ts.isCancelled());
ts.assertComplete();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ public Observable<Integer> apply(Integer t) {
ts.assertNoErrors();
ts.assertValueCount(1);

assertTrue("Not cancelled!", ts.isCancelled());
// 2.0.2 - not anymore
// assertTrue("Not cancelled!", ts.isCancelled());
assertFalse(open.hasObservers());
assertFalse(close.hasObservers());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ public void testNoDownstreamUnsubscribe() {

source.onComplete();

assertTrue("Not cancelled!", ts.isCancelled());
// 2.0.2 - not anymore
// assertTrue("Not cancelled!", ts.isCancelled());
}


Expand Down
6 changes: 4 additions & 2 deletions src/test/java/io/reactivex/observers/TestObserverTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1175,14 +1175,16 @@ public void asyncQueueThrows() {

@Test
public void completedMeansDisposed() {
assertTrue(Observable.just(1)
// 2.0.2 - a terminated TestObserver no longer reports isDisposed
assertFalse(Observable.just(1)
.test()
.assertResult(1).isDisposed());
}

@Test
public void errorMeansDisposed() {
assertTrue(Observable.error(new TestException())
// 2.0.2 - a terminated TestObserver no longer reports isDisposed
assertFalse(Observable.error(new TestException())
.test()
.assertFailure(TestException.class).isDisposed());
}
Expand Down

0 comments on commit d7e8e4f

Please sign in to comment.