diff --git a/rxjava-core/src/test/java/rx/operators/OperatorPivotTest.java b/rxjava-core/src/test/java/rx/operators/OperatorPivotTest.java index 6293974272..c2432d7787 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorPivotTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorPivotTest.java @@ -16,6 +16,7 @@ package rx.operators; +import java.util.Random; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -239,12 +240,16 @@ public void testConcurrencyAndSerialization() throws InterruptedException { @Override public Observable call(final GroupedObservable> outerGroup) { return outerGroup.flatMap(new Func1, Observable>() { - @Override public Observable call(final GroupedObservable innerGroup) { final AtomicInteger threadsPerGroup = new AtomicInteger(); return innerGroup.take(100).map(new Func1() { - + final ThreadLocal tlr = new ThreadLocal() { + @Override + protected Random initialValue() { + return new Random(); + } + }; @Override public String call(Integer i) { int outerThreadCount = outerThreads.incrementAndGet(); @@ -256,7 +261,11 @@ public String call(Integer i) { throw new RuntimeException("more than 1 thread for this group [" + innerGroup.getKey() + "]: " + innerThreadCount + " (before)"); } try { + // give the other threads a shot. + Thread.sleep(tlr.get().nextInt(10) + 1); return (outerGroup.getKey() ? "Even" : "Odd ") + " => from source: " + innerGroup.getKey() + " Value: " + i; + } catch (InterruptedException ex) { + throw new RuntimeException("Interrupted [" + innerGroup.getKey() + "]: " + i); } finally { int outerThreadCountAfter = outerThreads.decrementAndGet(); setMaxConcurrency(maxOuterConcurrency, outerThreadCountAfter);