Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 329: Fix non-deterministic unit tests #332

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rxjava-core/src/main/java/rx/operators/OperationGroupBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -548,8 +548,8 @@ public void onNext(String outputMessage) {
// sentEvents will go until 'eventCounter' hits 20 and then unsubscribes
// which means it will also send (but ignore) the 19/20 events for the other group
// It will not however send all 100 events.
assertEquals(39, sentEventCounter.get(), 2);
// gave it a delta of 2 so the threading/unsubscription race has wiggle
assertEquals(39, sentEventCounter.get(), 10);
// gave it a delta of 10 to account for the threading/unsubscription race condition which can vary depending on a machines performance, thread-scheduler, etc
}

private static class Event {
Expand Down
25 changes: 7 additions & 18 deletions rxjava-core/src/main/java/rx/operators/OperationMaterialize.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutionException;

import org.junit.Test;

Expand Down Expand Up @@ -139,25 +140,13 @@ public void testMaterialize2() {
}

@Test
public void testMultipleSubscribes() {
final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", null, "three");

Observable<Notification<String>> m = Observable.create(materialize(o1));

TestObserver Observer1 = new TestObserver();
m.subscribe(Observer1);

TestObserver Observer2 = new TestObserver();
m.subscribe(Observer2);
public void testMultipleSubscribes() throws InterruptedException, ExecutionException {
final TestAsyncErrorObservable o = new TestAsyncErrorObservable("one", "two", null, "three");

try {
o1.t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Observable<Notification<String>> m = Observable.create(materialize(o));

assertEquals(3, Observer1.notifications.size());
assertEquals(3, Observer2.notifications.size());
assertEquals(3, m.toList().toBlockingObservable().toFuture().get().size());
assertEquals(3, m.toList().toBlockingObservable().toFuture().get().size());
}

}
Expand Down Expand Up @@ -193,7 +182,7 @@ private static class TestAsyncErrorObservable extends Observable<String> {
valuesToReturn = values;
}

Thread t;
volatile Thread t;

@Override
public Subscription subscribe(final Observer<String> observer) {
Expand Down
16 changes: 7 additions & 9 deletions rxjava-core/src/main/java/rx/operators/OperationNext.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -313,6 +314,8 @@ private static class TestException extends RuntimeException {
@Test
public void testNoBufferingOrBlockingOfSequence() throws Throwable {
final CountDownLatch finished = new CountDownLatch(1);
final int COUNT = 30;
final CountDownLatch timeHasPassed = new CountDownLatch(COUNT);
final AtomicBoolean running = new AtomicBoolean(true);
final AtomicInteger count = new AtomicInteger(0);
final Observable<Integer> obs = Observable.create(new Func1<Observer<Integer>, Subscription>() {
Expand All @@ -326,7 +329,7 @@ public void run() {
try {
while (running.get()) {
o.onNext(count.incrementAndGet());
Thread.sleep(0, 100);
timeHasPassed.countDown();
}
o.onCompleted();
} catch (Throwable e) {
Expand All @@ -350,19 +353,14 @@ public void run() {
// we should have a different value
assertTrue("a and b should be different", a != b);

// wait for some time
Thread.sleep(100);
// make sure the counter in the observable has increased beyond b
while (count.get() <= (b + 10)) {
Thread.sleep(100);
}
// wait for some time (if times out we are blocked somewhere so fail ... set very high for very slow, constrained machines)
timeHasPassed.await(8000, TimeUnit.MILLISECONDS);

assertTrue(it.hasNext());
int expectedHigherThan = count.get();
int c = it.next();

assertTrue("c should not just be the next in sequence", c != (b + 1));
assertTrue("expected that c [" + c + "] is higher than " + expectedHigherThan, c > expectedHigherThan);
assertTrue("expected that c [" + c + "] is higher than or equal to " + COUNT, c >= COUNT);

assertTrue(it.hasNext());

Expand Down
29 changes: 13 additions & 16 deletions rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,33 +153,27 @@ private static Object getPluginImplementationViaProperty(Class<?> pluginClass) {

public static class UnitTest {

@After
@Before
public void reset() {
// use private access to reset so we can test different initializations via the public static flow
RxJavaPlugins.getInstance().errorHandler.set(null);
RxJavaPlugins.getInstance().observableExecutionHook.set(null);
}

@Test
public void testErrorHandlerDefaultImpl() {
RxJavaErrorHandler impl = RxJavaPlugins.getInstance().getErrorHandler();
RxJavaErrorHandler impl = new RxJavaPlugins().getErrorHandler();
assertTrue(impl instanceof RxJavaErrorHandlerDefault);
}

@Test
public void testErrorHandlerViaRegisterMethod() {
RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandlerTestImpl());
RxJavaErrorHandler impl = RxJavaPlugins.getInstance().getErrorHandler();
RxJavaPlugins p = new RxJavaPlugins();
p.registerErrorHandler(new RxJavaErrorHandlerTestImpl());
RxJavaErrorHandler impl = p.getErrorHandler();
assertTrue(impl instanceof RxJavaErrorHandlerTestImpl);
}

@Test
public void testErrorHandlerViaProperty() {
try {
RxJavaPlugins p = new RxJavaPlugins();
String fullClass = getFullClassNameForTestClass(RxJavaErrorHandlerTestImpl.class);
System.setProperty("rxjava.plugin.RxJavaErrorHandler.implementation", fullClass);
RxJavaErrorHandler impl = RxJavaPlugins.getInstance().getErrorHandler();
RxJavaErrorHandler impl = p.getErrorHandler();
assertTrue(impl instanceof RxJavaErrorHandlerTestImpl);
} finally {
System.clearProperty("rxjava.plugin.RxJavaErrorHandler.implementation");
Expand All @@ -193,23 +187,26 @@ public static class RxJavaErrorHandlerTestImpl extends RxJavaErrorHandler {

@Test
public void testObservableExecutionHookDefaultImpl() {
RxJavaObservableExecutionHook impl = RxJavaPlugins.getInstance().getObservableExecutionHook();
RxJavaPlugins p = new RxJavaPlugins();
RxJavaObservableExecutionHook impl = p.getObservableExecutionHook();
assertTrue(impl instanceof RxJavaObservableExecutionHookDefault);
}

@Test
public void testObservableExecutionHookViaRegisterMethod() {
RxJavaPlugins.getInstance().registerObservableExecutionHook(new RxJavaObservableExecutionHookTestImpl());
RxJavaObservableExecutionHook impl = RxJavaPlugins.getInstance().getObservableExecutionHook();
RxJavaPlugins p = new RxJavaPlugins();
p.registerObservableExecutionHook(new RxJavaObservableExecutionHookTestImpl());
RxJavaObservableExecutionHook impl = p.getObservableExecutionHook();
assertTrue(impl instanceof RxJavaObservableExecutionHookTestImpl);
}

@Test
public void testObservableExecutionHookViaProperty() {
try {
RxJavaPlugins p = new RxJavaPlugins();
String fullClass = getFullClassNameForTestClass(RxJavaObservableExecutionHookTestImpl.class);
System.setProperty("rxjava.plugin.RxJavaObservableExecutionHook.implementation", fullClass);
RxJavaObservableExecutionHook impl = RxJavaPlugins.getInstance().getObservableExecutionHook();
RxJavaObservableExecutionHook impl = p.getObservableExecutionHook();
assertTrue(impl instanceof RxJavaObservableExecutionHookTestImpl);
} finally {
System.clearProperty("rxjava.plugin.RxJavaErrorHandler.implementation");
Expand Down
10 changes: 3 additions & 7 deletions rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -306,12 +306,7 @@ public Subscription call(Scheduler scheduler, BooleanSubscription cancel) {
observer.onNext(42);
latch.countDown();

try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}

// this will recursively schedule this task for execution again
scheduler.schedule(cancel, this);

return cancel;
Expand Down Expand Up @@ -353,7 +348,8 @@ public void onNext(Integer args) {
fail("Timed out waiting on completion latch");
}

assertEquals(10, count.get()); // wondering if this could be 11 in a race condition (which would be okay due to how unsubscribe works ... just it would make this test non-deterministic)
// the count can be 10 or higher due to thread scheduling of the unsubscribe vs the scheduler looping to emit the count
assertTrue(count.get() >= 10);
assertTrue(completed.get());
}

Expand Down