Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tests and Javadoc for Pivot #984

Merged
merged 2 commits into from
Mar 25, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2489,7 +2489,25 @@ public final static <T> Observable<Observable<T>> parallelMerge(Observable<Obser
}

/**
* Pivot GroupedObservable streams without serializing/synchronizing to a single stream first.
* Pivot GroupedObservable streams without serializing/synchronizing to a single stream first.
*
* For example an Observable such as this =>
*
* Observable<GroupedObservable<String, GroupedObservable<Boolean, Integer>>>:
*
* o1.odd: 1, 3, 5, 7, 9 on Thread 1
* o1.even: 2, 4, 6, 8, 10 on Thread 1
* o2.odd: 11, 13, 15, 17, 19 on Thread 2
* o2.even: 12, 14, 16, 18, 20 on Thread 2
*
* is pivoted to become this =>
*
* Observable<GroupedObservable<Boolean, GroupedObservable<String, Integer>>>:
*
* odd.o1: 1, 3, 5, 7, 9 on Thread 1
* odd.o2: 11, 13, 15, 17, 19 on Thread 2
* even.o1: 2, 4, 6, 8, 10 on Thread 1
* even.o2: 12, 14, 16, 18, 20 on Thread 2
*
* @param groups
* @return
Expand Down
136 changes: 135 additions & 1 deletion rxjava-core/src/test/java/rx/operators/OperatorPivotTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public Observable<String> call(final GroupedObservable<String, Integer> innerGro

@Override
public String call(Integer i) {
return (outerGroup.getKey() ? "Even" : "Odd ") + " => from source: " + innerGroup.getKey() + " Value: " + i;
return (outerGroup.getKey() ? "Even" : "Odd ") + " => from source: " + outerGroup.getKey() + "." + innerGroup.getKey() + " Value: " + i + " Thread: " + Thread.currentThread();
}

});
Expand Down Expand Up @@ -166,6 +166,140 @@ public String call(Integer i) {
assertTrue(counter2.get() < 50000); // should be much smaller (< 1000) but this will be non-deterministic
}

/**
* The pivot operator does not need to add any serialization but this is confirming the expected behavior.
*
* It does not need serializing as it never merges groups, it just re-arranges them.
*
* For example, a simple 2-stream case with odd/even:
*
* Observable<GroupedObservable<Boolean, Integer>> o1 = Observable.range(1, 10).groupBy(modKeySelector).subscribeOn(Schedulers.newThread()); // thread 1
* Observable<GroupedObservable<Boolean, Integer>> o2 = Observable.range(11, 10).groupBy(modKeySelector).subscribeOn(Schedulers.newThread()); // thread 2
* Observable<GroupedObservable<String, GroupedObservable<Boolean, Integer>>> groups = Observable.from(GroupedObservable.from("o1", o1), GroupedObservable.from("o2", o2));
* Observable<GroupedObservable<Boolean, GroupedObservable<String, Integer>>> pivoted = Observable.pivot(groups);
*
* ============> OnNext: Odd => from source: false.o1 Value: 1 Thread: Thread[RxNewThreadScheduler-1,5,main]
* ============> OnNext: Even => from source: true.o2 Value: 12 Thread: Thread[RxNewThreadScheduler-2,5,main]
* ============> OnNext: Even => from source: true.o1 Value: 2 Thread: Thread[RxNewThreadScheduler-1,5,main]
* ============> OnNext: Odd => from source: false.o2 Value: 11 Thread: Thread[RxNewThreadScheduler-2,5,main]
* ============> OnNext: Odd => from source: false.o2 Value: 13 Thread: Thread[RxNewThreadScheduler-2,5,main]
* ============> OnNext: Odd => from source: false.o2 Value: 15 Thread: Thread[RxNewThreadScheduler-2,5,main]
* ============> OnNext: Odd => from source: false.o2 Value: 17 Thread: Thread[RxNewThreadScheduler-2,5,main]
* ============> OnNext: Even => from source: true.o2 Value: 14 Thread: Thread[RxNewThreadScheduler-2,5,main]
* ============> OnNext: Odd => from source: false.o1 Value: 3 Thread: Thread[RxNewThreadScheduler-1,5,main]
* ============> OnNext: Odd => from source: false.o1 Value: 5 Thread: Thread[RxNewThreadScheduler-1,5,main]
* ============> OnNext: Odd => from source: false.o1 Value: 7 Thread: Thread[RxNewThreadScheduler-1,5,main]
* ============> OnNext: Odd => from source: false.o1 Value: 9 Thread: Thread[RxNewThreadScheduler-1,5,main]
* ============> OnNext: Even => from source: true.o2 Value: 16 Thread: Thread[RxNewThreadScheduler-2,5,main]
* ============> OnNext: Even => from source: true.o2 Value: 18 Thread: Thread[RxNewThreadScheduler-2,5,main]
* ============> OnNext: Odd => from source: false.o2 Value: 19 Thread: Thread[RxNewThreadScheduler-2,5,main]
* ============> OnNext: Even => from source: true.o1 Value: 4 Thread: Thread[RxNewThreadScheduler-1,5,main]
* ============> OnNext: Even => from source: true.o1 Value: 6 Thread: Thread[RxNewThreadScheduler-1,5,main]
* ============> OnNext: Even => from source: true.o1 Value: 8 Thread: Thread[RxNewThreadScheduler-1,5,main]
* ============> OnNext: Even => from source: true.o1 Value: 10 Thread: Thread[RxNewThreadScheduler-1,5,main]
* ============> OnNext: Even => from source: true.o2 Value: 20 Thread: Thread[RxNewThreadScheduler-2,5,main]
* ============> OnCompleted
*
* This starts as:
*
* => Observable<GroupedObservable<String, GroupedObservable<Boolean, Integer>>>:
*
* o1.odd: 1, 3, 5, 7, 9 on Thread 1
* o1.even: 2, 4, 6, 8, 10 on Thread 1
* o2.odd: 11, 13, 15, 17, 19 on Thread 2
* o2.even: 12, 14, 16, 18, 20 on Thread 2
*
* It pivots to become:
*
* => Observable<GroupedObservable<Boolean, GroupedObservable<String, Integer>>>:
*
* odd.o1: 1, 3, 5, 7, 9 on Thread 1
* odd.o2: 11, 13, 15, 17, 19 on Thread 2
* even.o1: 2, 4, 6, 8, 10 on Thread 1
* even.o2: 12, 14, 16, 18, 20 on Thread 2
*
* Then a subsequent step can merge them if desired and add serialization, such as merge(even.o1, even.o2) to become a serialized "even"
*/
@Test
public void testConcurrencyAndSerialization() throws InterruptedException {
final AtomicInteger maxOuterConcurrency = new AtomicInteger();
final AtomicInteger maxGroupConcurrency = new AtomicInteger();
Observable<GroupedObservable<Boolean, Integer>> o1 = getSource(2000).subscribeOn(Schedulers.newThread()).groupBy(modKeySelector);
Observable<GroupedObservable<Boolean, Integer>> o2 = getSource(4000).subscribeOn(Schedulers.newThread()).groupBy(modKeySelector);
Observable<GroupedObservable<Boolean, Integer>> o3 = getSource(6000).subscribeOn(Schedulers.newThread()).groupBy(modKeySelector);
Observable<GroupedObservable<Boolean, Integer>> o4 = getSource(8000).subscribeOn(Schedulers.newThread()).groupBy(modKeySelector);
Observable<GroupedObservable<String, GroupedObservable<Boolean, Integer>>> groups = Observable.from(GroupedObservable.from("o1", o1), GroupedObservable.from("o2", o2),
GroupedObservable.from("o3", o3), GroupedObservable.from("o4", o4));
Observable<GroupedObservable<Boolean, GroupedObservable<String, Integer>>> pivoted = Observable.pivot(groups);
TestSubscriber<String> ts = new TestSubscriber<String>();
pivoted.take(2).flatMap(new Func1<GroupedObservable<Boolean, GroupedObservable<String, Integer>>, Observable<String>>() {

final AtomicInteger outerThreads = new AtomicInteger();

@Override
public Observable<String> call(final GroupedObservable<Boolean, GroupedObservable<String, Integer>> outerGroup) {
return outerGroup.flatMap(new Func1<GroupedObservable<String, Integer>, Observable<String>>() {

@Override
public Observable<String> call(final GroupedObservable<String, Integer> innerGroup) {
final AtomicInteger threadsPerGroup = new AtomicInteger();
return innerGroup.take(100).map(new Func1<Integer, String>() {

@Override
public String call(Integer i) {
int outerThreadCount = outerThreads.incrementAndGet();
setMaxConcurrency(maxOuterConcurrency, outerThreadCount);
int innerThreadCount = threadsPerGroup.incrementAndGet();
setMaxConcurrency(maxGroupConcurrency, innerThreadCount);
if (innerThreadCount > 1) {
System.err.println("more than 1 thread for this group [" + innerGroup.getKey() + "]: " + innerThreadCount + " (before)");
throw new RuntimeException("more than 1 thread for this group [" + innerGroup.getKey() + "]: " + innerThreadCount + " (before)");
}
try {
return (outerGroup.getKey() ? "Even" : "Odd ") + " => from source: " + innerGroup.getKey() + " Value: " + i;
} finally {
int outerThreadCountAfter = outerThreads.decrementAndGet();
setMaxConcurrency(maxOuterConcurrency, outerThreadCountAfter);
int innerThreadCountAfter = threadsPerGroup.decrementAndGet();
setMaxConcurrency(maxGroupConcurrency, innerThreadCountAfter);
if (innerThreadCountAfter > 0) {
System.err.println("more than 1 thread for this group [" + innerGroup.getKey() + "]: " + innerThreadCount + " (after)");
throw new RuntimeException("more than 1 thread for this group [" + innerGroup.getKey() + "]: " + innerThreadCountAfter + " (after)");
}
}
}

private void setMaxConcurrency(final AtomicInteger maxOuterConcurrency, int outerThreadCount) {
int max = maxOuterConcurrency.get();
if (outerThreadCount > max) {
maxOuterConcurrency.compareAndSet(max, outerThreadCount);
}
}

});
}

});
}

}).subscribe(ts);

ts.awaitTerminalEvent();

System.out.println("onNext [" + ts.getOnNextEvents().size() + "]: " + ts.getOnNextEvents());
System.out.println("max outer concurrency: " + maxOuterConcurrency.get());
assertTrue(maxOuterConcurrency.get() > 2); // should be 4 since we have 4 threads running but setting at 3 as this is somewhat non-deterministic
System.out.println("max group concurrency: " + maxGroupConcurrency.get());
assertTrue(maxGroupConcurrency.get() == 1); // should always be 1

assertEquals(800, ts.getOnNextEvents().size());

}

private static Observable<Integer> getSource(final int start) {
return getSource(start, new AtomicInteger());
}

private static Observable<Integer> getSource(final int start, final AtomicInteger counter) {
return Observable.create(new OnSubscribe<Integer>() {

Expand Down