diff --git a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java index 4a673b6fee..77e04324b0 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java +++ b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java @@ -548,8 +548,8 @@ public void onNext(String outputMessage) { // sentEvents will go until 'eventCounter' hits 20 and then unsubscribes // which means it will also send (but ignore) the 19/20 events for the other group // It will not however send all 100 events. - assertEquals(39, sentEventCounter.get(), 2); - // gave it a delta of 2 so the threading/unsubscription race has wiggle + assertEquals(39, sentEventCounter.get(), 10); + // gave it a delta of 10 to account for the threading/unsubscription race condition which can vary depending on a machines performance, thread-scheduler, etc } private static class Event { diff --git a/rxjava-core/src/main/java/rx/operators/OperationMaterialize.java b/rxjava-core/src/main/java/rx/operators/OperationMaterialize.java index 1e22154e14..279bff006a 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMaterialize.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMaterialize.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Vector; +import java.util.concurrent.ExecutionException; import org.junit.Test; @@ -139,25 +140,13 @@ public void testMaterialize2() { } @Test - public void testMultipleSubscribes() { - final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", null, "three"); - - Observable> m = Observable.create(materialize(o1)); - - TestObserver Observer1 = new TestObserver(); - m.subscribe(Observer1); - - TestObserver Observer2 = new TestObserver(); - m.subscribe(Observer2); + public void testMultipleSubscribes() throws InterruptedException, ExecutionException { + final TestAsyncErrorObservable o = new TestAsyncErrorObservable("one", "two", null, "three"); - try { - o1.t.join(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + Observable> m = Observable.create(materialize(o)); - assertEquals(3, Observer1.notifications.size()); - assertEquals(3, Observer2.notifications.size()); + assertEquals(3, m.toList().toBlockingObservable().toFuture().get().size()); + assertEquals(3, m.toList().toBlockingObservable().toFuture().get().size()); } } @@ -193,7 +182,7 @@ private static class TestAsyncErrorObservable extends Observable { valuesToReturn = values; } - Thread t; + volatile Thread t; @Override public Subscription subscribe(final Observer observer) { diff --git a/rxjava-core/src/main/java/rx/operators/OperationNext.java b/rxjava-core/src/main/java/rx/operators/OperationNext.java index 587141b1eb..5c0a8a7553 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationNext.java +++ b/rxjava-core/src/main/java/rx/operators/OperationNext.java @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -313,6 +314,8 @@ private static class TestException extends RuntimeException { @Test public void testNoBufferingOrBlockingOfSequence() throws Throwable { final CountDownLatch finished = new CountDownLatch(1); + final int COUNT = 30; + final CountDownLatch timeHasPassed = new CountDownLatch(COUNT); final AtomicBoolean running = new AtomicBoolean(true); final AtomicInteger count = new AtomicInteger(0); final Observable obs = Observable.create(new Func1, Subscription>() { @@ -326,7 +329,7 @@ public void run() { try { while (running.get()) { o.onNext(count.incrementAndGet()); - Thread.sleep(0, 100); + timeHasPassed.countDown(); } o.onCompleted(); } catch (Throwable e) { @@ -350,19 +353,14 @@ public void run() { // we should have a different value assertTrue("a and b should be different", a != b); - // wait for some time - Thread.sleep(100); - // make sure the counter in the observable has increased beyond b - while (count.get() <= (b + 10)) { - Thread.sleep(100); - } + // wait for some time (if times out we are blocked somewhere so fail ... set very high for very slow, constrained machines) + timeHasPassed.await(8000, TimeUnit.MILLISECONDS); assertTrue(it.hasNext()); - int expectedHigherThan = count.get(); int c = it.next(); assertTrue("c should not just be the next in sequence", c != (b + 1)); - assertTrue("expected that c [" + c + "] is higher than " + expectedHigherThan, c > expectedHigherThan); + assertTrue("expected that c [" + c + "] is higher than or equal to " + COUNT, c >= COUNT); assertTrue(it.hasNext()); diff --git a/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java b/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java index 8d2d74d449..cb7af6b6f0 100644 --- a/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java +++ b/rxjava-core/src/main/java/rx/plugins/RxJavaPlugins.java @@ -153,33 +153,27 @@ private static Object getPluginImplementationViaProperty(Class pluginClass) { public static class UnitTest { - @After - @Before - public void reset() { - // use private access to reset so we can test different initializations via the public static flow - RxJavaPlugins.getInstance().errorHandler.set(null); - RxJavaPlugins.getInstance().observableExecutionHook.set(null); - } - @Test public void testErrorHandlerDefaultImpl() { - RxJavaErrorHandler impl = RxJavaPlugins.getInstance().getErrorHandler(); + RxJavaErrorHandler impl = new RxJavaPlugins().getErrorHandler(); assertTrue(impl instanceof RxJavaErrorHandlerDefault); } @Test public void testErrorHandlerViaRegisterMethod() { - RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandlerTestImpl()); - RxJavaErrorHandler impl = RxJavaPlugins.getInstance().getErrorHandler(); + RxJavaPlugins p = new RxJavaPlugins(); + p.registerErrorHandler(new RxJavaErrorHandlerTestImpl()); + RxJavaErrorHandler impl = p.getErrorHandler(); assertTrue(impl instanceof RxJavaErrorHandlerTestImpl); } @Test public void testErrorHandlerViaProperty() { try { + RxJavaPlugins p = new RxJavaPlugins(); String fullClass = getFullClassNameForTestClass(RxJavaErrorHandlerTestImpl.class); System.setProperty("rxjava.plugin.RxJavaErrorHandler.implementation", fullClass); - RxJavaErrorHandler impl = RxJavaPlugins.getInstance().getErrorHandler(); + RxJavaErrorHandler impl = p.getErrorHandler(); assertTrue(impl instanceof RxJavaErrorHandlerTestImpl); } finally { System.clearProperty("rxjava.plugin.RxJavaErrorHandler.implementation"); @@ -193,23 +187,26 @@ public static class RxJavaErrorHandlerTestImpl extends RxJavaErrorHandler { @Test public void testObservableExecutionHookDefaultImpl() { - RxJavaObservableExecutionHook impl = RxJavaPlugins.getInstance().getObservableExecutionHook(); + RxJavaPlugins p = new RxJavaPlugins(); + RxJavaObservableExecutionHook impl = p.getObservableExecutionHook(); assertTrue(impl instanceof RxJavaObservableExecutionHookDefault); } @Test public void testObservableExecutionHookViaRegisterMethod() { - RxJavaPlugins.getInstance().registerObservableExecutionHook(new RxJavaObservableExecutionHookTestImpl()); - RxJavaObservableExecutionHook impl = RxJavaPlugins.getInstance().getObservableExecutionHook(); + RxJavaPlugins p = new RxJavaPlugins(); + p.registerObservableExecutionHook(new RxJavaObservableExecutionHookTestImpl()); + RxJavaObservableExecutionHook impl = p.getObservableExecutionHook(); assertTrue(impl instanceof RxJavaObservableExecutionHookTestImpl); } @Test public void testObservableExecutionHookViaProperty() { try { + RxJavaPlugins p = new RxJavaPlugins(); String fullClass = getFullClassNameForTestClass(RxJavaObservableExecutionHookTestImpl.class); System.setProperty("rxjava.plugin.RxJavaObservableExecutionHook.implementation", fullClass); - RxJavaObservableExecutionHook impl = RxJavaPlugins.getInstance().getObservableExecutionHook(); + RxJavaObservableExecutionHook impl = p.getObservableExecutionHook(); assertTrue(impl instanceof RxJavaObservableExecutionHookTestImpl); } finally { System.clearProperty("rxjava.plugin.RxJavaErrorHandler.implementation"); diff --git a/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java b/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java index 71a9678c67..4bc8fe993b 100644 --- a/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java +++ b/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java @@ -306,12 +306,7 @@ public Subscription call(Scheduler scheduler, BooleanSubscription cancel) { observer.onNext(42); latch.countDown(); - try { - Thread.sleep(1); - } catch (InterruptedException e) { - e.printStackTrace(); - } - + // this will recursively schedule this task for execution again scheduler.schedule(cancel, this); return cancel; @@ -353,7 +348,8 @@ public void onNext(Integer args) { fail("Timed out waiting on completion latch"); } - assertEquals(10, count.get()); // wondering if this could be 11 in a race condition (which would be okay due to how unsubscribe works ... just it would make this test non-deterministic) + // the count can be 10 or higher due to thread scheduling of the unsubscribe vs the scheduler looping to emit the count + assertTrue(count.get() >= 10); assertTrue(completed.get()); }