From 5c57ae53049d9c6bd208aeee0b3cfba2658e1ac0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Mon, 18 Sep 2017 18:36:39 +0200 Subject: [PATCH] 2.x: Fix Observable.concatMapEager queueing of source items --- .../observable/ObservableConcatMapEager.java | 3 +- .../flowable/FlowableConcatMapEagerTest.java | 35 +++++++++++++++++++ .../ObservableConcatMapEagerTest.java | 35 +++++++++++++++++++ 3 files changed, 72 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapEager.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapEager.java index 0fc83214e1..b19ca9bbdf 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapEager.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapEager.java @@ -24,6 +24,7 @@ import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.fuseable.*; import io.reactivex.internal.observers.*; +import io.reactivex.internal.queue.SpscLinkedArrayQueue; import io.reactivex.internal.util.*; import io.reactivex.plugins.RxJavaPlugins; @@ -129,7 +130,7 @@ public void onSubscribe(Disposable d) { } } - queue = QueueDrainHelper.createQueue(prefetch); + queue = new SpscLinkedArrayQueue(prefetch); actual.onSubscribe(this); } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java index d4e0937350..ee4e982bf7 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java @@ -1193,4 +1193,39 @@ public Flowable apply(Integer i) throws Exception { .assertResult(1, 2, 3, 4, 5) ; } + + @Test + @SuppressWarnings("unchecked") + public void maxConcurrencyOf2() { + List[] list = new ArrayList[100]; + for (int i = 0; i < 100; i++) { + List lst = new ArrayList(); + list[i] = lst; + for (int k = 1; k <= 10; k++) { + lst.add((i) * 10 + k); + } + } + + Flowable.range(1, 1000) + .buffer(10) + .concatMapEager(new Function, Flowable>>() { + @Override + public Flowable> apply(List v) + throws Exception { + return Flowable.just(v) + .subscribeOn(Schedulers.io()) + .doOnNext(new Consumer>() { + @Override + public void accept(List v) + throws Exception { + Thread.sleep(new Random().nextInt(20)); + } + }); + } + } + , 2, 3) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(list); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapEagerTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapEagerTest.java index 1e1b814865..9823436941 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapEagerTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapEagerTest.java @@ -1001,4 +1001,39 @@ public ObservableSource apply(Integer i) throws Exception { .assertResult(1, 2, 3, 4, 5) ; } + + @Test + @SuppressWarnings("unchecked") + public void maxConcurrencyOf2() { + List[] list = new ArrayList[100]; + for (int i = 0; i < 100; i++) { + List lst = new ArrayList(); + list[i] = lst; + for (int k = 1; k <= 10; k++) { + lst.add((i) * 10 + k); + } + } + + Observable.range(1, 1000) + .buffer(10) + .concatMapEager(new Function, ObservableSource>>() { + @Override + public ObservableSource> apply(List v) + throws Exception { + return Observable.just(v) + .subscribeOn(Schedulers.io()) + .doOnNext(new Consumer>() { + @Override + public void accept(List v) + throws Exception { + Thread.sleep(new Random().nextInt(20)); + } + }); + } + } + , 2, 3) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(list); + } }