Skip to content

Commit

Permalink
Merge pull request ReactiveX#332 from benjchristensen/issue-329-unit-…
Browse files Browse the repository at this point in the history
…tests

Issue 329: Fix non-deterministic unit tests
  • Loading branch information
benjchristensen committed Aug 31, 2013
2 parents b0d901a + 586be83 commit 6fa51bb
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 52 deletions.
4 changes: 2 additions & 2 deletions rxjava-core/src/main/java/rx/operators/OperationGroupBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -548,8 +548,8 @@ public void onNext(String outputMessage) {
// sentEvents will go until 'eventCounter' hits 20 and then unsubscribes
// which means it will also send (but ignore) the 19/20 events for the other group
// It will not however send all 100 events.
assertEquals(39, sentEventCounter.get(), 2);
// gave it a delta of 2 so the threading/unsubscription race has wiggle
assertEquals(39, sentEventCounter.get(), 10);
// gave it a delta of 10 to account for the threading/unsubscription race condition which can vary depending on a machines performance, thread-scheduler, etc
}

private static class Event {
Expand Down
25 changes: 7 additions & 18 deletions rxjava-core/src/main/java/rx/operators/OperationMaterialize.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutionException;

import org.junit.Test;

Expand Down Expand Up @@ -139,25 +140,13 @@ public void testMaterialize2() {
}

@Test
public void testMultipleSubscribes() {
final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", null, "three");

Observable<Notification<String>> m = Observable.create(materialize(o1));

TestObserver Observer1 = new TestObserver();
m.subscribe(Observer1);

TestObserver Observer2 = new TestObserver();
m.subscribe(Observer2);
public void testMultipleSubscribes() throws InterruptedException, ExecutionException {
final TestAsyncErrorObservable o = new TestAsyncErrorObservable("one", "two", null, "three");

try {
o1.t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Observable<Notification<String>> m = Observable.create(materialize(o));

assertEquals(3, Observer1.notifications.size());
assertEquals(3, Observer2.notifications.size());
assertEquals(3, m.toList().toBlockingObservable().toFuture().get().size());
assertEquals(3, m.toList().toBlockingObservable().toFuture().get().size());
}

}
Expand Down Expand Up @@ -193,7 +182,7 @@ private static class TestAsyncErrorObservable extends Observable<String> {
valuesToReturn = values;
}

Thread t;
volatile Thread t;

@Override
public Subscription subscribe(final Observer<String> observer) {
Expand Down
16 changes: 7 additions & 9 deletions rxjava-core/src/main/java/rx/operators/OperationNext.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -313,6 +314,8 @@ private static class TestException extends RuntimeException {
@Test
public void testNoBufferingOrBlockingOfSequence() throws Throwable {
final CountDownLatch finished = new CountDownLatch(1);
final int COUNT = 30;
final CountDownLatch timeHasPassed = new CountDownLatch(COUNT);
final AtomicBoolean running = new AtomicBoolean(true);
final AtomicInteger count = new AtomicInteger(0);
final Observable<Integer> obs = Observable.create(new Func1<Observer<Integer>, Subscription>() {
Expand All @@ -326,7 +329,7 @@ public void run() {
try {
while (running.get()) {
o.onNext(count.incrementAndGet());
Thread.sleep(0, 100);
timeHasPassed.countDown();
}
o.onCompleted();
} catch (Throwable e) {
Expand All @@ -350,19 +353,14 @@ public void run() {
// we should have a different value
assertTrue("a and b should be different", a != b);

// wait for some time
Thread.sleep(100);
// make sure the counter in the observable has increased beyond b
while (count.get() <= (b + 10)) {
Thread.sleep(100);
}
// wait for some time (if times out we are blocked somewhere so fail ... set very high for very slow, constrained machines)
timeHasPassed.await(8000, TimeUnit.MILLISECONDS);

assertTrue(it.hasNext());
int expectedHigherThan = count.get();
int c = it.next();

assertTrue("c should not just be the next in sequence", c != (b + 1));
assertTrue("expected that c [" + c + "] is higher than " + expectedHigherThan, c > expectedHigherThan);
assertTrue("expected that c [" + c + "] is higher than or equal to " + COUNT, c >= COUNT);

assertTrue(it.hasNext());

Expand Down
29 changes: 13 additions & 16 deletions rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,33 +153,27 @@ private static Object getPluginImplementationViaProperty(Class<?> pluginClass) {

public static class UnitTest {

@After
@Before
public void reset() {
// use private access to reset so we can test different initializations via the public static flow
RxJavaPlugins.getInstance().errorHandler.set(null);
RxJavaPlugins.getInstance().observableExecutionHook.set(null);
}

@Test
public void testErrorHandlerDefaultImpl() {
RxJavaErrorHandler impl = RxJavaPlugins.getInstance().getErrorHandler();
RxJavaErrorHandler impl = new RxJavaPlugins().getErrorHandler();
assertTrue(impl instanceof RxJavaErrorHandlerDefault);
}

@Test
public void testErrorHandlerViaRegisterMethod() {
RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandlerTestImpl());
RxJavaErrorHandler impl = RxJavaPlugins.getInstance().getErrorHandler();
RxJavaPlugins p = new RxJavaPlugins();
p.registerErrorHandler(new RxJavaErrorHandlerTestImpl());
RxJavaErrorHandler impl = p.getErrorHandler();
assertTrue(impl instanceof RxJavaErrorHandlerTestImpl);
}

@Test
public void testErrorHandlerViaProperty() {
try {
RxJavaPlugins p = new RxJavaPlugins();
String fullClass = getFullClassNameForTestClass(RxJavaErrorHandlerTestImpl.class);
System.setProperty("rxjava.plugin.RxJavaErrorHandler.implementation", fullClass);
RxJavaErrorHandler impl = RxJavaPlugins.getInstance().getErrorHandler();
RxJavaErrorHandler impl = p.getErrorHandler();
assertTrue(impl instanceof RxJavaErrorHandlerTestImpl);
} finally {
System.clearProperty("rxjava.plugin.RxJavaErrorHandler.implementation");
Expand All @@ -193,23 +187,26 @@ public static class RxJavaErrorHandlerTestImpl extends RxJavaErrorHandler {

@Test
public void testObservableExecutionHookDefaultImpl() {
RxJavaObservableExecutionHook impl = RxJavaPlugins.getInstance().getObservableExecutionHook();
RxJavaPlugins p = new RxJavaPlugins();
RxJavaObservableExecutionHook impl = p.getObservableExecutionHook();
assertTrue(impl instanceof RxJavaObservableExecutionHookDefault);
}

@Test
public void testObservableExecutionHookViaRegisterMethod() {
RxJavaPlugins.getInstance().registerObservableExecutionHook(new RxJavaObservableExecutionHookTestImpl());
RxJavaObservableExecutionHook impl = RxJavaPlugins.getInstance().getObservableExecutionHook();
RxJavaPlugins p = new RxJavaPlugins();
p.registerObservableExecutionHook(new RxJavaObservableExecutionHookTestImpl());
RxJavaObservableExecutionHook impl = p.getObservableExecutionHook();
assertTrue(impl instanceof RxJavaObservableExecutionHookTestImpl);
}

@Test
public void testObservableExecutionHookViaProperty() {
try {
RxJavaPlugins p = new RxJavaPlugins();
String fullClass = getFullClassNameForTestClass(RxJavaObservableExecutionHookTestImpl.class);
System.setProperty("rxjava.plugin.RxJavaObservableExecutionHook.implementation", fullClass);
RxJavaObservableExecutionHook impl = RxJavaPlugins.getInstance().getObservableExecutionHook();
RxJavaObservableExecutionHook impl = p.getObservableExecutionHook();
assertTrue(impl instanceof RxJavaObservableExecutionHookTestImpl);
} finally {
System.clearProperty("rxjava.plugin.RxJavaErrorHandler.implementation");
Expand Down
10 changes: 3 additions & 7 deletions rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -306,12 +306,7 @@ public Subscription call(Scheduler scheduler, BooleanSubscription cancel) {
observer.onNext(42);
latch.countDown();

try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}

// this will recursively schedule this task for execution again
scheduler.schedule(cancel, this);

return cancel;
Expand Down Expand Up @@ -353,7 +348,8 @@ public void onNext(Integer args) {
fail("Timed out waiting on completion latch");
}

assertEquals(10, count.get()); // wondering if this could be 11 in a race condition (which would be okay due to how unsubscribe works ... just it would make this test non-deterministic)
// the count can be 10 or higher due to thread scheduling of the unsubscribe vs the scheduler looping to emit the count
assertTrue(count.get() >= 10);
assertTrue(completed.get());
}

Expand Down

0 comments on commit 6fa51bb

Please sign in to comment.