diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 9048a888c2..172e3962bd 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -2489,7 +2489,25 @@ public final static Observable> parallelMerge(Observable + * + * Observable>>: + * + * o1.odd: 1, 3, 5, 7, 9 on Thread 1 + * o1.even: 2, 4, 6, 8, 10 on Thread 1 + * o2.odd: 11, 13, 15, 17, 19 on Thread 2 + * o2.even: 12, 14, 16, 18, 20 on Thread 2 + * + * is pivoted to become this => + * + * Observable>>: + * + * odd.o1: 1, 3, 5, 7, 9 on Thread 1 + * odd.o2: 11, 13, 15, 17, 19 on Thread 2 + * even.o1: 2, 4, 6, 8, 10 on Thread 1 + * even.o2: 12, 14, 16, 18, 20 on Thread 2 * * @param groups * @return diff --git a/rxjava-core/src/test/java/rx/operators/OperatorPivotTest.java b/rxjava-core/src/test/java/rx/operators/OperatorPivotTest.java index 306252db3f..85ff2a995c 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorPivotTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorPivotTest.java @@ -60,7 +60,7 @@ public Observable call(final GroupedObservable innerGro @Override public String call(Integer i) { - return (outerGroup.getKey() ? "Even" : "Odd ") + " => from source: " + innerGroup.getKey() + " Value: " + i; + return (outerGroup.getKey() ? "Even" : "Odd ") + " => from source: " + outerGroup.getKey() + "." + innerGroup.getKey() + " Value: " + i + " Thread: " + Thread.currentThread(); } }); @@ -166,6 +166,140 @@ public String call(Integer i) { assertTrue(counter2.get() < 50000); // should be much smaller (< 1000) but this will be non-deterministic } + /** + * The pivot operator does not need to add any serialization but this is confirming the expected behavior. + * + * It does not need serializing as it never merges groups, it just re-arranges them. + * + * For example, a simple 2-stream case with odd/even: + * + * Observable> o1 = Observable.range(1, 10).groupBy(modKeySelector).subscribeOn(Schedulers.newThread()); // thread 1 + * Observable> o2 = Observable.range(11, 10).groupBy(modKeySelector).subscribeOn(Schedulers.newThread()); // thread 2 + * Observable>> groups = Observable.from(GroupedObservable.from("o1", o1), GroupedObservable.from("o2", o2)); + * Observable>> pivoted = Observable.pivot(groups); + * + * ============> OnNext: Odd => from source: false.o1 Value: 1 Thread: Thread[RxNewThreadScheduler-1,5,main] + * ============> OnNext: Even => from source: true.o2 Value: 12 Thread: Thread[RxNewThreadScheduler-2,5,main] + * ============> OnNext: Even => from source: true.o1 Value: 2 Thread: Thread[RxNewThreadScheduler-1,5,main] + * ============> OnNext: Odd => from source: false.o2 Value: 11 Thread: Thread[RxNewThreadScheduler-2,5,main] + * ============> OnNext: Odd => from source: false.o2 Value: 13 Thread: Thread[RxNewThreadScheduler-2,5,main] + * ============> OnNext: Odd => from source: false.o2 Value: 15 Thread: Thread[RxNewThreadScheduler-2,5,main] + * ============> OnNext: Odd => from source: false.o2 Value: 17 Thread: Thread[RxNewThreadScheduler-2,5,main] + * ============> OnNext: Even => from source: true.o2 Value: 14 Thread: Thread[RxNewThreadScheduler-2,5,main] + * ============> OnNext: Odd => from source: false.o1 Value: 3 Thread: Thread[RxNewThreadScheduler-1,5,main] + * ============> OnNext: Odd => from source: false.o1 Value: 5 Thread: Thread[RxNewThreadScheduler-1,5,main] + * ============> OnNext: Odd => from source: false.o1 Value: 7 Thread: Thread[RxNewThreadScheduler-1,5,main] + * ============> OnNext: Odd => from source: false.o1 Value: 9 Thread: Thread[RxNewThreadScheduler-1,5,main] + * ============> OnNext: Even => from source: true.o2 Value: 16 Thread: Thread[RxNewThreadScheduler-2,5,main] + * ============> OnNext: Even => from source: true.o2 Value: 18 Thread: Thread[RxNewThreadScheduler-2,5,main] + * ============> OnNext: Odd => from source: false.o2 Value: 19 Thread: Thread[RxNewThreadScheduler-2,5,main] + * ============> OnNext: Even => from source: true.o1 Value: 4 Thread: Thread[RxNewThreadScheduler-1,5,main] + * ============> OnNext: Even => from source: true.o1 Value: 6 Thread: Thread[RxNewThreadScheduler-1,5,main] + * ============> OnNext: Even => from source: true.o1 Value: 8 Thread: Thread[RxNewThreadScheduler-1,5,main] + * ============> OnNext: Even => from source: true.o1 Value: 10 Thread: Thread[RxNewThreadScheduler-1,5,main] + * ============> OnNext: Even => from source: true.o2 Value: 20 Thread: Thread[RxNewThreadScheduler-2,5,main] + * ============> OnCompleted + * + * This starts as: + * + * => Observable>>: + * + * o1.odd: 1, 3, 5, 7, 9 on Thread 1 + * o1.even: 2, 4, 6, 8, 10 on Thread 1 + * o2.odd: 11, 13, 15, 17, 19 on Thread 2 + * o2.even: 12, 14, 16, 18, 20 on Thread 2 + * + * It pivots to become: + * + * => Observable>>: + * + * odd.o1: 1, 3, 5, 7, 9 on Thread 1 + * odd.o2: 11, 13, 15, 17, 19 on Thread 2 + * even.o1: 2, 4, 6, 8, 10 on Thread 1 + * even.o2: 12, 14, 16, 18, 20 on Thread 2 + * + * Then a subsequent step can merge them if desired and add serialization, such as merge(even.o1, even.o2) to become a serialized "even" + */ + @Test + public void testConcurrencyAndSerialization() throws InterruptedException { + final AtomicInteger maxOuterConcurrency = new AtomicInteger(); + final AtomicInteger maxGroupConcurrency = new AtomicInteger(); + Observable> o1 = getSource(2000).subscribeOn(Schedulers.newThread()).groupBy(modKeySelector); + Observable> o2 = getSource(4000).subscribeOn(Schedulers.newThread()).groupBy(modKeySelector); + Observable> o3 = getSource(6000).subscribeOn(Schedulers.newThread()).groupBy(modKeySelector); + Observable> o4 = getSource(8000).subscribeOn(Schedulers.newThread()).groupBy(modKeySelector); + Observable>> groups = Observable.from(GroupedObservable.from("o1", o1), GroupedObservable.from("o2", o2), + GroupedObservable.from("o3", o3), GroupedObservable.from("o4", o4)); + Observable>> pivoted = Observable.pivot(groups); + TestSubscriber ts = new TestSubscriber(); + pivoted.take(2).flatMap(new Func1>, Observable>() { + + final AtomicInteger outerThreads = new AtomicInteger(); + + @Override + public Observable call(final GroupedObservable> outerGroup) { + return outerGroup.flatMap(new Func1, Observable>() { + + @Override + public Observable call(final GroupedObservable innerGroup) { + final AtomicInteger threadsPerGroup = new AtomicInteger(); + return innerGroup.take(100).map(new Func1() { + + @Override + public String call(Integer i) { + int outerThreadCount = outerThreads.incrementAndGet(); + setMaxConcurrency(maxOuterConcurrency, outerThreadCount); + int innerThreadCount = threadsPerGroup.incrementAndGet(); + setMaxConcurrency(maxGroupConcurrency, innerThreadCount); + if (innerThreadCount > 1) { + System.err.println("more than 1 thread for this group [" + innerGroup.getKey() + "]: " + innerThreadCount + " (before)"); + throw new RuntimeException("more than 1 thread for this group [" + innerGroup.getKey() + "]: " + innerThreadCount + " (before)"); + } + try { + return (outerGroup.getKey() ? "Even" : "Odd ") + " => from source: " + innerGroup.getKey() + " Value: " + i; + } finally { + int outerThreadCountAfter = outerThreads.decrementAndGet(); + setMaxConcurrency(maxOuterConcurrency, outerThreadCountAfter); + int innerThreadCountAfter = threadsPerGroup.decrementAndGet(); + setMaxConcurrency(maxGroupConcurrency, innerThreadCountAfter); + if (innerThreadCountAfter > 0) { + System.err.println("more than 1 thread for this group [" + innerGroup.getKey() + "]: " + innerThreadCount + " (after)"); + throw new RuntimeException("more than 1 thread for this group [" + innerGroup.getKey() + "]: " + innerThreadCountAfter + " (after)"); + } + } + } + + private void setMaxConcurrency(final AtomicInteger maxOuterConcurrency, int outerThreadCount) { + int max = maxOuterConcurrency.get(); + if (outerThreadCount > max) { + maxOuterConcurrency.compareAndSet(max, outerThreadCount); + } + } + + }); + } + + }); + } + + }).subscribe(ts); + + ts.awaitTerminalEvent(); + + System.out.println("onNext [" + ts.getOnNextEvents().size() + "]: " + ts.getOnNextEvents()); + System.out.println("max outer concurrency: " + maxOuterConcurrency.get()); + assertTrue(maxOuterConcurrency.get() > 2); // should be 4 since we have 4 threads running but setting at 3 as this is somewhat non-deterministic + System.out.println("max group concurrency: " + maxGroupConcurrency.get()); + assertTrue(maxGroupConcurrency.get() == 1); // should always be 1 + + assertEquals(800, ts.getOnNextEvents().size()); + + } + + private static Observable getSource(final int start) { + return getSource(start, new AtomicInteger()); + } + private static Observable getSource(final int start, final AtomicInteger counter) { return Observable.create(new OnSubscribe() {