From 5b5aade918ea3d9cbc4aa993a23a21c1b3beda2f Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Sat, 31 Aug 2013 14:14:31 -0700 Subject: [PATCH] Make Materialize.testMultipleSubscribes test deterministic Refactored to use BlockingObservable instead of non-blocking subscribe and waiting on the underlying thread (it was only waiting on one of two threads running). This should resolve one of the issues reported in http://github.com/Netflix/RxJava/issues/329 --- .../rx/operators/OperationMaterialize.java | 25 ++++++------------- 1 file changed, 7 insertions(+), 18 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationMaterialize.java b/rxjava-core/src/main/java/rx/operators/OperationMaterialize.java index 1e22154e14..279bff006a 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMaterialize.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMaterialize.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Vector; +import java.util.concurrent.ExecutionException; import org.junit.Test; @@ -139,25 +140,13 @@ public void testMaterialize2() { } @Test - public void testMultipleSubscribes() { - final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", null, "three"); - - Observable> m = Observable.create(materialize(o1)); - - TestObserver Observer1 = new TestObserver(); - m.subscribe(Observer1); - - TestObserver Observer2 = new TestObserver(); - m.subscribe(Observer2); + public void testMultipleSubscribes() throws InterruptedException, ExecutionException { + final TestAsyncErrorObservable o = new TestAsyncErrorObservable("one", "two", null, "three"); - try { - o1.t.join(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + Observable> m = Observable.create(materialize(o)); - assertEquals(3, Observer1.notifications.size()); - assertEquals(3, Observer2.notifications.size()); + assertEquals(3, m.toList().toBlockingObservable().toFuture().get().size()); + assertEquals(3, m.toList().toBlockingObservable().toFuture().get().size()); } } @@ -193,7 +182,7 @@ private static class TestAsyncErrorObservable extends Observable { valuesToReturn = values; } - Thread t; + volatile Thread t; @Override public Subscription subscribe(final Observer observer) {