diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java index 2a8a9f66fd..460279af7f 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java @@ -411,7 +411,10 @@ public void clear() { @Override public boolean isEmpty() { Iterator it = current; - return (it != null && !it.hasNext()) || queue.isEmpty(); + if (it == null) { + return queue.isEmpty(); + } + return !it.hasNext(); } @Nullable diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterableTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterableTest.java index 51de692dff..a865475acf 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterableTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterableTest.java @@ -863,4 +863,55 @@ public void remove() { ts.assertResult(1); } + + @Test + public void doubleShare() { + Iterable it = Flowable.range(1, 300).blockingIterable(); + Flowable.just(it, it) + .flatMapIterable(Functions.>identity()) + .share() + .share() + .count() + .test() + .assertResult(600L); + } + + @Test + public void multiShare() { + Iterable it = Flowable.range(1, 300).blockingIterable(); + for (int i = 0; i < 5; i++) { + Flowable f = Flowable.just(it, it) + .flatMapIterable(Functions.>identity()); + + for (int j = 0; j < i; j++) { + f = f.share(); + } + + f + .count() + .test() + .withTag("Share: " + i) + .assertResult(600L); + } + } + + @Test + public void multiShareHidden() { + Iterable it = Flowable.range(1, 300).blockingIterable(); + for (int i = 0; i < 5; i++) { + Flowable f = Flowable.just(it, it) + .flatMapIterable(Functions.>identity()) + .hide(); + + for (int j = 0; j < i; j++) { + f = f.share(); + } + + f + .count() + .test() + .withTag("Share: " + i) + .assertResult(600L); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableToListTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableToListTest.java index 8c97ae5475..2bb37c2f18 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableToListTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableToListTest.java @@ -395,7 +395,7 @@ public void onNextCancelRace() { for (int i = 0; i < 1000; i++) { final PublishProcessor pp = PublishProcessor.create(); final TestObserver> ts = pp.toList().test(); - + Runnable r1 = new Runnable() { @Override public void run() { @@ -408,10 +408,9 @@ public void run() { ts.cancel(); } }; - + TestHelper.race(r1, r2); } - } @Test @@ -419,7 +418,7 @@ public void onNextCancelRaceFlowable() { for (int i = 0; i < 1000; i++) { final PublishProcessor pp = PublishProcessor.create(); final TestSubscriber> ts = pp.toList().toFlowable().test(); - + Runnable r1 = new Runnable() { @Override public void run() { @@ -432,10 +431,10 @@ public void run() { ts.cancel(); } }; - + TestHelper.race(r1, r2); } - + } @Test @@ -443,9 +442,9 @@ public void onCompleteCancelRaceFlowable() { for (int i = 0; i < 1000; i++) { final PublishProcessor pp = PublishProcessor.create(); final TestSubscriber> ts = pp.toList().toFlowable().test(); - + pp.onNext(1); - + Runnable r1 = new Runnable() { @Override public void run() { @@ -458,14 +457,13 @@ public void run() { ts.cancel(); } }; - + TestHelper.race(r1, r2); - + if (ts.valueCount() != 0) { ts.assertValue(Arrays.asList(1)) .assertNoErrors(); } } - } }