Skip to content

Commit

Permalink
Merge pull request #1191 from akarnokd/OperatorPivotTestFix
Browse files Browse the repository at this point in the history
Fix attempt for OperatorPivotTest
  • Loading branch information
benjchristensen committed May 16, 2014
2 parents 9b205ee + d06e420 commit e5b75eb
Showing 1 changed file with 11 additions and 2 deletions.
13 changes: 11 additions & 2 deletions rxjava-core/src/test/java/rx/operators/OperatorPivotTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package rx.operators;

import java.util.Random;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -239,12 +240,16 @@ public void testConcurrencyAndSerialization() throws InterruptedException {
@Override
public Observable<String> call(final GroupedObservable<Boolean, GroupedObservable<String, Integer>> outerGroup) {
return outerGroup.flatMap(new Func1<GroupedObservable<String, Integer>, Observable<String>>() {

@Override
public Observable<String> call(final GroupedObservable<String, Integer> innerGroup) {
final AtomicInteger threadsPerGroup = new AtomicInteger();
return innerGroup.take(100).map(new Func1<Integer, String>() {

final ThreadLocal<Random> tlr = new ThreadLocal<Random>() {
@Override
protected Random initialValue() {
return new Random();
}
};
@Override
public String call(Integer i) {
int outerThreadCount = outerThreads.incrementAndGet();
Expand All @@ -256,7 +261,11 @@ public String call(Integer i) {
throw new RuntimeException("more than 1 thread for this group [" + innerGroup.getKey() + "]: " + innerThreadCount + " (before)");
}
try {
// give the other threads a shot.
Thread.sleep(tlr.get().nextInt(10) + 1);
return (outerGroup.getKey() ? "Even" : "Odd ") + " => from source: " + innerGroup.getKey() + " Value: " + i;
} catch (InterruptedException ex) {
throw new RuntimeException("Interrupted [" + innerGroup.getKey() + "]: " + i);
} finally {
int outerThreadCountAfter = outerThreads.decrementAndGet();
setMaxConcurrency(maxOuterConcurrency, outerThreadCountAfter);
Expand Down

0 comments on commit e5b75eb

Please sign in to comment.