From 7526eb73c8374bb98413d3c1d4db9e45391583a8 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sat, 4 Jan 2014 00:32:06 +0800 Subject: [PATCH] Added unit tests --- .../java/rx/operators/OperationMergeTest.java | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/rxjava-core/src/test/java/rx/operators/OperationMergeTest.java b/rxjava-core/src/test/java/rx/operators/OperationMergeTest.java index 4778d8038a..becf17d803 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationMergeTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationMergeTest.java @@ -21,6 +21,8 @@ import static rx.operators.OperationMerge.*; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -35,6 +37,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.schedulers.Schedulers; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Action1; @@ -465,4 +468,85 @@ public void unsubscribe() { }; } } + + @Test + public void testWhenMaxConcurrentIsOne() { + for (int i = 0; i < 100; i++) { + List> os = new ArrayList>(); + os.add(Observable.from("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread())); + os.add(Observable.from("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread())); + os.add(Observable.from("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread())); + + List expected = Arrays.asList("one", "two", "three", "four", "five", "one", "two", "three", "four", "five", "one", "two", "three", "four", "five"); + Iterator iter = Observable.merge(os, 1).toBlockingObservable().toIterable().iterator(); + List actual = new ArrayList(); + while(iter.hasNext()) { + actual.add(iter.next()); + } + assertEquals(expected, actual); + } + } + + @Test + public void testMaxConcurrent() { + for (int times = 0; times < 100; times++) { + int observableCount = 100; + // Test maxConcurrent from 2 to 12 + int maxConcurrent = 2 + (times % 10); + AtomicInteger subscriptionCount = new AtomicInteger(0); + + List> os = new ArrayList>(); + List scos = new ArrayList(); + for (int i = 0; i < observableCount; i++) { + SubscriptionCheckObservable sco = new SubscriptionCheckObservable( + subscriptionCount, maxConcurrent); + scos.add(sco); + os.add(Observable.create(sco).subscribeOn( + Schedulers.threadPoolForComputation())); + } + + Iterator iter = Observable.merge(os, maxConcurrent) + .toBlockingObservable().toIterable().iterator(); + List actual = new ArrayList(); + while (iter.hasNext()) { + actual.add(iter.next()); + } + assertEquals(5 * observableCount, actual.size()); + for (SubscriptionCheckObservable sco : scos) { + assertFalse(sco.failed); + } + } + } + + private static class SubscriptionCheckObservable implements + Observable.OnSubscribeFunc { + + private final AtomicInteger subscriptionCount; + private final int maxConcurrent; + volatile boolean failed = false; + + SubscriptionCheckObservable(AtomicInteger subscriptionCount, + int maxConcurrent) { + this.subscriptionCount = subscriptionCount; + this.maxConcurrent = maxConcurrent; + } + + @Override + public Subscription onSubscribe(Observer t1) { + if (subscriptionCount.incrementAndGet() > maxConcurrent) { + failed = true; + } + t1.onNext("one"); + t1.onNext("two"); + t1.onNext("three"); + t1.onNext("four"); + t1.onNext("five"); + // We could not decrement subscriptionCount in the unsubscribe method + // as "unsubscribe" is not guaranteed to be called before the next "subscribe". + subscriptionCount.decrementAndGet(); + t1.onCompleted(); + return Subscriptions.empty(); + } + + } }