Skip to content

Commit

Permalink
Make Materialize.testMultipleSubscribes test deterministic
Browse files Browse the repository at this point in the history
Refactored to use BlockingObservable instead of non-blocking subscribe and waiting on the underlying thread (it was only waiting on one of two threads running).
This should resolve one of the issues reported in http://github.com/Netflix/RxJava/issues/329
  • Loading branch information
benjchristensen committed Aug 31, 2013
1 parent a222d1c commit 586be83
Showing 1 changed file with 7 additions and 18 deletions.
25 changes: 7 additions & 18 deletions rxjava-core/src/main/java/rx/operators/OperationMaterialize.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutionException;

import org.junit.Test;

Expand Down Expand Up @@ -139,25 +140,13 @@ public void testMaterialize2() {
}

@Test
public void testMultipleSubscribes() {
final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", null, "three");

Observable<Notification<String>> m = Observable.create(materialize(o1));

TestObserver Observer1 = new TestObserver();
m.subscribe(Observer1);

TestObserver Observer2 = new TestObserver();
m.subscribe(Observer2);
public void testMultipleSubscribes() throws InterruptedException, ExecutionException {
final TestAsyncErrorObservable o = new TestAsyncErrorObservable("one", "two", null, "three");

try {
o1.t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Observable<Notification<String>> m = Observable.create(materialize(o));

assertEquals(3, Observer1.notifications.size());
assertEquals(3, Observer2.notifications.size());
assertEquals(3, m.toList().toBlockingObservable().toFuture().get().size());
assertEquals(3, m.toList().toBlockingObservable().toFuture().get().size());
}

}
Expand Down Expand Up @@ -193,7 +182,7 @@ private static class TestAsyncErrorObservable extends Observable<String> {
valuesToReturn = values;
}

Thread t;
volatile Thread t;

@Override
public Subscription subscribe(final Observer<String> observer) {
Expand Down

0 comments on commit 586be83

Please sign in to comment.