diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ConcatMapXMainObserver.java b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ConcatMapXMainObserver.java new file mode 100644 index 0000000000..f4e95e8a9d --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ConcatMapXMainObserver.java @@ -0,0 +1,150 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.mixed; + +import java.util.concurrent.atomic.AtomicInteger; + +import io.reactivex.rxjava3.core.Observer; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.internal.disposables.DisposableHelper; +import io.reactivex.rxjava3.internal.fuseable.*; +import io.reactivex.rxjava3.internal.queue.SpscLinkedArrayQueue; +import io.reactivex.rxjava3.internal.util.*; + +/** + * Base class for implementing concatMapX main observers. + * + * @param the upstream value type + * @since 3.0.10 + */ +public abstract class ConcatMapXMainObserver extends AtomicInteger +implements Observer, Disposable { + + private static final long serialVersionUID = -3214213361171757852L; + + final AtomicThrowable errors; + + final int prefetch; + + final ErrorMode errorMode; + + SimpleQueue queue; + + Disposable upstream; + + volatile boolean done; + + volatile boolean disposed; + + public ConcatMapXMainObserver(int prefetch, ErrorMode errorMode) { + this.errorMode = errorMode; + this.errors = new AtomicThrowable(); + this.prefetch = prefetch; + } + + @Override + public final void onSubscribe(Disposable d) { + if (DisposableHelper.validate(upstream, d)) { + upstream = d; + if (d instanceof QueueDisposable) { + @SuppressWarnings("unchecked") + QueueDisposable qd = (QueueDisposable)d; + int mode = qd.requestFusion(QueueFuseable.ANY | QueueFuseable.BOUNDARY); + if (mode == QueueFuseable.SYNC) { + queue = qd; + done = true; + + onSubscribeDownstream(); + + drain(); + return; + } + else if (mode == QueueFuseable.ASYNC) { + queue = qd; + + onSubscribeDownstream(); + + return; + } + } + + queue = new SpscLinkedArrayQueue<>(prefetch); + onSubscribeDownstream(); + } + } + + @Override + public final void onNext(T t) { + // In async fusion mode, t is a drain indicator + if (t != null) { + queue.offer(t); + } + drain(); + } + + @Override + public final void onError(Throwable t) { + if (errors.tryAddThrowableOrReport(t)) { + if (errorMode == ErrorMode.IMMEDIATE) { + disposeInner(); + } + done = true; + drain(); + } + } + + @Override + public final void onComplete() { + done = true; + drain(); + } + + @Override + public final void dispose() { + disposed = true; + upstream.dispose(); + disposeInner(); + errors.tryTerminateAndReport(); + if (getAndIncrement() == 0) { + queue.clear(); + clearValue(); + } + } + + @Override + public final boolean isDisposed() { + return disposed; + } + + /** + * Override this to clear values when the downstream disposes. + */ + void clearValue() { + } + + /** + * Typically, this should be {@code downstream.onSubscribe(this)}. + */ + abstract void onSubscribeDownstream(); + + /** + * Typically, this should be {@code inner.dispose()}. + */ + abstract void disposeInner(); + + /** + * Implement the serialized inner subscribing and value emission here. + */ + abstract void drain(); +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ConcatMapXMainSubscriber.java b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ConcatMapXMainSubscriber.java new file mode 100644 index 0000000000..398e4fb015 --- /dev/null +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ConcatMapXMainSubscriber.java @@ -0,0 +1,155 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.mixed; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.reactivestreams.Subscription; + +import io.reactivex.rxjava3.core.FlowableSubscriber; +import io.reactivex.rxjava3.exceptions.MissingBackpressureException; +import io.reactivex.rxjava3.internal.fuseable.*; +import io.reactivex.rxjava3.internal.queue.SpscArrayQueue; +import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper; +import io.reactivex.rxjava3.internal.util.*; + +/** + * Base class for implementing concatMapX main subscribers. + * + * @param the upstream value type + * @since 3.0.10 + */ +public abstract class ConcatMapXMainSubscriber extends AtomicInteger +implements FlowableSubscriber { + + private static final long serialVersionUID = -3214213361171757852L; + + final AtomicThrowable errors; + + final int prefetch; + + final ErrorMode errorMode; + + SimpleQueue queue; + + Subscription upstream; + + volatile boolean done; + + volatile boolean cancelled; + + boolean syncFused; + + public ConcatMapXMainSubscriber(int prefetch, ErrorMode errorMode) { + this.errorMode = errorMode; + this.errors = new AtomicThrowable(); + this.prefetch = prefetch; + } + + @Override + public final void onSubscribe(Subscription s) { + if (SubscriptionHelper.validate(upstream, s)) { + upstream = s; + if (s instanceof QueueSubscription) { + @SuppressWarnings("unchecked") + QueueSubscription qs = (QueueSubscription)s; + int mode = qs.requestFusion(QueueFuseable.ANY | QueueFuseable.BOUNDARY); + if (mode == QueueFuseable.SYNC) { + queue = qs; + syncFused = true; + done = true; + + onSubscribeDownstream(); + + drain(); + return; + } + else if (mode == QueueFuseable.ASYNC) { + queue = qs; + + onSubscribeDownstream(); + + upstream.request(prefetch); + return; + } + } + + queue = new SpscArrayQueue<>(prefetch); + onSubscribeDownstream(); + upstream.request(prefetch); + } + } + + @Override + public final void onNext(T t) { + // In async fusion mode, t is a drain indicator + if (t != null) { + if (!queue.offer(t)) { + upstream.cancel(); + onError(new MissingBackpressureException("queue full?!")); + return; + } + } + drain(); + } + + @Override + public final void onError(Throwable t) { + if (errors.tryAddThrowableOrReport(t)) { + if (errorMode == ErrorMode.IMMEDIATE) { + disposeInner(); + } + done = true; + drain(); + } + } + + @Override + public final void onComplete() { + done = true; + drain(); + } + + final void stop() { + cancelled = true; + upstream.cancel(); + disposeInner(); + errors.tryTerminateAndReport(); + if (getAndIncrement() == 0) { + queue.clear(); + clearValue(); + } + } + + /** + * Override this to clear values when the downstream disposes. + */ + void clearValue() { + } + + /** + * Typically, this should be {@code downstream.onSubscribe(this);}. + */ + abstract void onSubscribeDownstream(); + + /** + * Typically, this should be {@code inner.dispose()}. + */ + abstract void disposeInner(); + + /** + * Implement the serialized inner subscribing and value emission here. + */ + abstract void drain(); +} diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapCompletable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapCompletable.java index e4a3f36c05..2838138a09 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapCompletable.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapCompletable.java @@ -14,18 +14,14 @@ package io.reactivex.rxjava3.internal.operators.mixed; import java.util.Objects; -import java.util.concurrent.atomic.*; - -import org.reactivestreams.Subscription; +import java.util.concurrent.atomic.AtomicReference; import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.disposables.Disposable; -import io.reactivex.rxjava3.exceptions.*; +import io.reactivex.rxjava3.exceptions.Exceptions; import io.reactivex.rxjava3.functions.Function; import io.reactivex.rxjava3.internal.disposables.DisposableHelper; -import io.reactivex.rxjava3.internal.fuseable.SimplePlainQueue; -import io.reactivex.rxjava3.internal.queue.SpscArrayQueue; -import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper; +import io.reactivex.rxjava3.internal.fuseable.SimpleQueue; import io.reactivex.rxjava3.internal.util.*; /** @@ -61,8 +57,8 @@ protected void subscribeActual(CompletableObserver observer) { } static final class ConcatMapCompletableObserver - extends AtomicInteger - implements FlowableSubscriber, Disposable { + extends ConcatMapXMainSubscriber + implements Disposable { private static final long serialVersionUID = 3610901111000061034L; @@ -70,93 +66,39 @@ static final class ConcatMapCompletableObserver final Function mapper; - final ErrorMode errorMode; - - final AtomicThrowable errors; - final ConcatMapInnerObserver inner; - final int prefetch; - - final SimplePlainQueue queue; - - Subscription upstream; - volatile boolean active; - volatile boolean done; - - volatile boolean disposed; - int consumed; ConcatMapCompletableObserver(CompletableObserver downstream, Function mapper, ErrorMode errorMode, int prefetch) { + super(prefetch, errorMode); this.downstream = downstream; this.mapper = mapper; - this.errorMode = errorMode; - this.prefetch = prefetch; - this.errors = new AtomicThrowable(); this.inner = new ConcatMapInnerObserver(this); - this.queue = new SpscArrayQueue<>(prefetch); - } - - @Override - public void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(upstream, s)) { - this.upstream = s; - downstream.onSubscribe(this); - s.request(prefetch); - } } @Override - public void onNext(T t) { - if (queue.offer(t)) { - drain(); - } else { - upstream.cancel(); - onError(new MissingBackpressureException("Queue full?!")); - } + void onSubscribeDownstream() { + downstream.onSubscribe(this); } @Override - public void onError(Throwable t) { - if (errors.tryAddThrowableOrReport(t)) { - if (errorMode == ErrorMode.IMMEDIATE) { - inner.dispose(); - errors.tryTerminateConsumer(downstream); - if (getAndIncrement() == 0) { - queue.clear(); - } - } else { - done = true; - drain(); - } - } - } - - @Override - public void onComplete() { - done = true; - drain(); + void disposeInner() { + inner.dispose(); } @Override public void dispose() { - disposed = true; - upstream.cancel(); - inner.dispose(); - errors.tryTerminateAndReport(); - if (getAndIncrement() == 0) { - queue.clear(); - } + stop(); } @Override public boolean isDisposed() { - return disposed; + return cancelled; } void innerError(Throwable ex) { @@ -179,29 +121,45 @@ void innerComplete() { drain(); } + @Override void drain() { if (getAndIncrement() != 0) { return; } + ErrorMode errorMode = this.errorMode; + SimpleQueue queue = this.queue; + AtomicThrowable errors = this.errors; + boolean syncFused = this.syncFused; + do { - if (disposed) { + if (cancelled) { queue.clear(); return; } - if (!active) { - - if (errorMode == ErrorMode.BOUNDARY) { - if (errors.get() != null) { - queue.clear(); - errors.tryTerminateConsumer(downstream); - return; - } + if (errors.get() != null) { + if (errorMode == ErrorMode.IMMEDIATE + || (errorMode == ErrorMode.BOUNDARY && !active)) { + queue.clear(); + errors.tryTerminateConsumer(downstream); + return; } + } + + if (!active) { boolean d = done; - T v = queue.poll(); + T v; + try { + v = queue.poll(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + upstream.cancel(); + errors.tryAddThrowableOrReport(ex); + errors.tryTerminateConsumer(downstream); + return; + } boolean empty = v == null; if (d && empty) { @@ -212,12 +170,15 @@ void drain() { if (!empty) { int limit = prefetch - (prefetch >> 1); - int c = consumed + 1; - if (c == limit) { - consumed = 0; - upstream.request(limit); - } else { - consumed = c; + + if (!syncFused) { + int c = consumed + 1; + if (c == limit) { + consumed = 0; + upstream.request(limit); + } else { + consumed = c; + } } CompletableSource cs; diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapMaybe.java b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapMaybe.java index 5f71148e0d..c0d151e6fd 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapMaybe.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapMaybe.java @@ -20,12 +20,10 @@ import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.disposables.Disposable; -import io.reactivex.rxjava3.exceptions.*; +import io.reactivex.rxjava3.exceptions.Exceptions; import io.reactivex.rxjava3.functions.Function; import io.reactivex.rxjava3.internal.disposables.DisposableHelper; -import io.reactivex.rxjava3.internal.fuseable.SimplePlainQueue; -import io.reactivex.rxjava3.internal.queue.SpscArrayQueue; -import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper; +import io.reactivex.rxjava3.internal.fuseable.SimpleQueue; import io.reactivex.rxjava3.internal.util.*; /** @@ -62,8 +60,7 @@ protected void subscribeActual(Subscriber s) { } static final class ConcatMapMaybeSubscriber - extends AtomicInteger - implements FlowableSubscriber, Subscription { + extends ConcatMapXMainSubscriber implements Subscription { private static final long serialVersionUID = -9140123220065488293L; @@ -71,24 +68,10 @@ static final class ConcatMapMaybeSubscriber final Function> mapper; - final int prefetch; - final AtomicLong requested; - final AtomicThrowable errors; - final ConcatMapMaybeObserver inner; - final SimplePlainQueue queue; - - final ErrorMode errorMode; - - Subscription upstream; - - volatile boolean done; - - volatile boolean cancelled; - long emitted; int consumed; @@ -107,50 +90,16 @@ static final class ConcatMapMaybeSubscriber ConcatMapMaybeSubscriber(Subscriber downstream, Function> mapper, int prefetch, ErrorMode errorMode) { + super(prefetch, errorMode); this.downstream = downstream; this.mapper = mapper; - this.prefetch = prefetch; - this.errorMode = errorMode; this.requested = new AtomicLong(); - this.errors = new AtomicThrowable(); this.inner = new ConcatMapMaybeObserver<>(this); - this.queue = new SpscArrayQueue<>(prefetch); - } - - @Override - public void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(upstream, s)) { - upstream = s; - downstream.onSubscribe(this); - s.request(prefetch); - } } @Override - public void onNext(T t) { - if (!queue.offer(t)) { - upstream.cancel(); - onError(new MissingBackpressureException("queue full?!")); - return; - } - drain(); - } - - @Override - public void onError(Throwable t) { - if (errors.tryAddThrowableOrReport(t)) { - if (errorMode == ErrorMode.IMMEDIATE) { - inner.dispose(); - } - done = true; - drain(); - } - } - - @Override - public void onComplete() { - done = true; - drain(); + void onSubscribeDownstream() { + downstream.onSubscribe(this); } @Override @@ -161,14 +110,7 @@ public void request(long n) { @Override public void cancel() { - cancelled = true; - upstream.cancel(); - inner.dispose(); - errors.tryTerminateAndReport(); - if (getAndIncrement() == 0) { - queue.clear(); - item = null; - } + stop(); } void innerSuccess(R item) { @@ -192,6 +134,17 @@ void innerError(Throwable ex) { } } + @Override + void clearValue() { + item = null; + } + + @Override + void disposeInner() { + inner.dispose(); + } + + @Override void drain() { if (getAndIncrement() != 0) { return; @@ -200,10 +153,11 @@ void drain() { int missed = 1; Subscriber downstream = this.downstream; ErrorMode errorMode = this.errorMode; - SimplePlainQueue queue = this.queue; + SimpleQueue queue = this.queue; AtomicThrowable errors = this.errors; AtomicLong requested = this.requested; int limit = prefetch - (prefetch >> 1); + boolean syncFused = this.syncFused; for (;;) { @@ -228,7 +182,16 @@ void drain() { if (s == STATE_INACTIVE) { boolean d = done; - T v = queue.poll(); + T v; + try { + v = queue.poll(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + upstream.cancel(); + errors.tryAddThrowableOrReport(ex); + errors.tryTerminateConsumer(downstream); + return; + } boolean empty = v == null; if (d && empty) { @@ -240,12 +203,14 @@ void drain() { break; } - int c = consumed + 1; - if (c == limit) { - consumed = 0; - upstream.request(limit); - } else { - consumed = c; + if (!syncFused) { + int c = consumed + 1; + if (c == limit) { + consumed = 0; + upstream.request(limit); + } else { + consumed = c; + } } MaybeSource ms; diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapSingle.java b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapSingle.java index f6cd939546..49009a722e 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapSingle.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapSingle.java @@ -20,12 +20,10 @@ import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.disposables.Disposable; -import io.reactivex.rxjava3.exceptions.*; +import io.reactivex.rxjava3.exceptions.Exceptions; import io.reactivex.rxjava3.functions.Function; import io.reactivex.rxjava3.internal.disposables.DisposableHelper; -import io.reactivex.rxjava3.internal.fuseable.SimplePlainQueue; -import io.reactivex.rxjava3.internal.queue.SpscArrayQueue; -import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper; +import io.reactivex.rxjava3.internal.fuseable.SimpleQueue; import io.reactivex.rxjava3.internal.util.*; /** @@ -62,8 +60,7 @@ protected void subscribeActual(Subscriber s) { } static final class ConcatMapSingleSubscriber - extends AtomicInteger - implements FlowableSubscriber, Subscription { + extends ConcatMapXMainSubscriber implements Subscription { private static final long serialVersionUID = -9140123220065488293L; @@ -71,24 +68,10 @@ static final class ConcatMapSingleSubscriber final Function> mapper; - final int prefetch; - final AtomicLong requested; - final AtomicThrowable errors; - final ConcatMapSingleObserver inner; - final SimplePlainQueue queue; - - final ErrorMode errorMode; - - Subscription upstream; - - volatile boolean done; - - volatile boolean cancelled; - long emitted; int consumed; @@ -107,68 +90,37 @@ static final class ConcatMapSingleSubscriber ConcatMapSingleSubscriber(Subscriber downstream, Function> mapper, int prefetch, ErrorMode errorMode) { + super(prefetch, errorMode); this.downstream = downstream; this.mapper = mapper; - this.prefetch = prefetch; - this.errorMode = errorMode; this.requested = new AtomicLong(); - this.errors = new AtomicThrowable(); this.inner = new ConcatMapSingleObserver<>(this); - this.queue = new SpscArrayQueue<>(prefetch); } @Override - public void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(upstream, s)) { - upstream = s; - downstream.onSubscribe(this); - s.request(prefetch); - } + void onSubscribeDownstream() { + downstream.onSubscribe(this); } @Override - public void onNext(T t) { - if (!queue.offer(t)) { - upstream.cancel(); - onError(new MissingBackpressureException("queue full?!")); - return; - } + public void request(long n) { + BackpressureHelper.add(requested, n); drain(); } @Override - public void onError(Throwable t) { - if (errors.tryAddThrowableOrReport(t)) { - if (errorMode == ErrorMode.IMMEDIATE) { - inner.dispose(); - } - done = true; - drain(); - } - } - - @Override - public void onComplete() { - done = true; - drain(); + public void cancel() { + stop(); } @Override - public void request(long n) { - BackpressureHelper.add(requested, n); - drain(); + void clearValue() { + item = null; } @Override - public void cancel() { - cancelled = true; - upstream.cancel(); + void disposeInner() { inner.dispose(); - errors.tryTerminateAndReport(); - if (getAndIncrement() == 0) { - queue.clear(); - item = null; - } } void innerSuccess(R item) { @@ -187,6 +139,7 @@ void innerError(Throwable ex) { } } + @Override void drain() { if (getAndIncrement() != 0) { return; @@ -195,10 +148,11 @@ void drain() { int missed = 1; Subscriber downstream = this.downstream; ErrorMode errorMode = this.errorMode; - SimplePlainQueue queue = this.queue; + SimpleQueue queue = this.queue; AtomicThrowable errors = this.errors; AtomicLong requested = this.requested; int limit = prefetch - (prefetch >> 1); + boolean syncFused = this.syncFused; for (;;) { @@ -223,7 +177,16 @@ void drain() { if (s == STATE_INACTIVE) { boolean d = done; - T v = queue.poll(); + T v; + try { + v = queue.poll(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + upstream.cancel(); + errors.tryAddThrowableOrReport(ex); + errors.tryTerminateConsumer(downstream); + return; + } boolean empty = v == null; if (d && empty) { @@ -235,12 +198,14 @@ void drain() { break; } - int c = consumed + 1; - if (c == limit) { - consumed = 0; - upstream.request(limit); - } else { - consumed = c; + if (!syncFused) { + int c = consumed + 1; + if (c == limit) { + consumed = 0; + upstream.request(limit); + } else { + consumed = c; + } } SingleSource ss; diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapCompletable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapCompletable.java index 2295f4397e..49fcbcff83 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapCompletable.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapCompletable.java @@ -14,15 +14,14 @@ package io.reactivex.rxjava3.internal.operators.mixed; import java.util.Objects; -import java.util.concurrent.atomic.*; +import java.util.concurrent.atomic.AtomicReference; import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.exceptions.Exceptions; import io.reactivex.rxjava3.functions.Function; import io.reactivex.rxjava3.internal.disposables.DisposableHelper; -import io.reactivex.rxjava3.internal.fuseable.*; -import io.reactivex.rxjava3.internal.queue.SpscLinkedArrayQueue; +import io.reactivex.rxjava3.internal.fuseable.SimpleQueue; import io.reactivex.rxjava3.internal.util.*; /** @@ -60,8 +59,7 @@ protected void subscribeActual(CompletableObserver observer) { } static final class ConcatMapCompletableObserver - extends AtomicInteger - implements Observer, Disposable { + extends ConcatMapXMainObserver { private static final long serialVersionUID = 3610901111000061034L; @@ -69,122 +67,36 @@ static final class ConcatMapCompletableObserver final Function mapper; - final ErrorMode errorMode; - - final AtomicThrowable errors; - final ConcatMapInnerObserver inner; - final int prefetch; - - SimpleQueue queue; - - Disposable upstream; - volatile boolean active; - volatile boolean done; - - volatile boolean disposed; - ConcatMapCompletableObserver(CompletableObserver downstream, Function mapper, ErrorMode errorMode, int prefetch) { + super(prefetch, errorMode); this.downstream = downstream; this.mapper = mapper; - this.errorMode = errorMode; - this.prefetch = prefetch; - this.errors = new AtomicThrowable(); this.inner = new ConcatMapInnerObserver(this); } @Override - public void onSubscribe(Disposable d) { - if (DisposableHelper.validate(upstream, d)) { - this.upstream = d; - if (d instanceof QueueDisposable) { - @SuppressWarnings("unchecked") - QueueDisposable qd = (QueueDisposable) d; - - int m = qd.requestFusion(QueueDisposable.ANY); - if (m == QueueDisposable.SYNC) { - queue = qd; - done = true; - downstream.onSubscribe(this); - drain(); - return; - } - if (m == QueueDisposable.ASYNC) { - queue = qd; - downstream.onSubscribe(this); - return; - } - } - queue = new SpscLinkedArrayQueue<>(prefetch); - downstream.onSubscribe(this); - } - } - - @Override - public void onNext(T t) { - if (t != null) { - queue.offer(t); - } - drain(); - } - - @Override - public void onError(Throwable t) { - if (errors.tryAddThrowableOrReport(t)) { - if (errorMode == ErrorMode.IMMEDIATE) { - disposed = true; - inner.dispose(); - errors.tryTerminateConsumer(downstream); - if (getAndIncrement() == 0) { - queue.clear(); - } - } else { - done = true; - drain(); - } - } - } - - @Override - public void onComplete() { - done = true; - drain(); + void onSubscribeDownstream() { + downstream.onSubscribe(this); } @Override - public void dispose() { - disposed = true; - upstream.dispose(); + void disposeInner() { inner.dispose(); - errors.tryTerminateAndReport(); - if (getAndIncrement() == 0) { - queue.clear(); - } - } - - @Override - public boolean isDisposed() { - return disposed; } void innerError(Throwable ex) { if (errors.tryAddThrowableOrReport(ex)) { - if (errorMode == ErrorMode.IMMEDIATE) { - disposed = true; + if (errorMode != ErrorMode.END) { upstream.dispose(); - errors.tryTerminateConsumer(downstream); - if (getAndIncrement() == 0) { - queue.clear(); - } - } else { - active = false; - drain(); } + active = false; + drain(); } } @@ -193,6 +105,7 @@ void innerComplete() { drain(); } + @Override void drain() { if (getAndIncrement() != 0) { return; @@ -200,6 +113,7 @@ void drain() { AtomicThrowable errors = this.errors; ErrorMode errorMode = this.errorMode; + SimpleQueue queue = this.queue; do { if (disposed) { @@ -207,16 +121,17 @@ void drain() { return; } - if (!active) { - - if (errorMode == ErrorMode.BOUNDARY) { - if (errors.get() != null) { - disposed = true; - queue.clear(); - errors.tryTerminateConsumer(downstream); - return; - } + if (errors.get() != null) { + if (errorMode == ErrorMode.IMMEDIATE + || (errorMode == ErrorMode.BOUNDARY && !active)) { + disposed = true; + queue.clear(); + errors.tryTerminateConsumer(downstream); + return; } + } + + if (!active) { boolean d = done; boolean empty = true; diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapMaybe.java b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapMaybe.java index 1177d5a06e..1e7090ee7d 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapMaybe.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapMaybe.java @@ -14,15 +14,14 @@ package io.reactivex.rxjava3.internal.operators.mixed; import java.util.Objects; -import java.util.concurrent.atomic.*; +import java.util.concurrent.atomic.AtomicReference; import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.exceptions.Exceptions; import io.reactivex.rxjava3.functions.Function; import io.reactivex.rxjava3.internal.disposables.DisposableHelper; -import io.reactivex.rxjava3.internal.fuseable.SimplePlainQueue; -import io.reactivex.rxjava3.internal.queue.SpscLinkedArrayQueue; +import io.reactivex.rxjava3.internal.fuseable.*; import io.reactivex.rxjava3.internal.util.*; /** @@ -61,8 +60,7 @@ protected void subscribeActual(Observer observer) { } static final class ConcatMapMaybeMainObserver - extends AtomicInteger - implements Observer, Disposable { + extends ConcatMapXMainObserver { private static final long serialVersionUID = -9140123220065488293L; @@ -70,20 +68,8 @@ static final class ConcatMapMaybeMainObserver final Function> mapper; - final AtomicThrowable errors; - final ConcatMapMaybeObserver inner; - final SimplePlainQueue queue; - - final ErrorMode errorMode; - - Disposable upstream; - - volatile boolean done; - - volatile boolean cancelled; - R item; volatile int state; @@ -98,60 +84,20 @@ static final class ConcatMapMaybeMainObserver ConcatMapMaybeMainObserver(Observer downstream, Function> mapper, int prefetch, ErrorMode errorMode) { + super(prefetch, errorMode); this.downstream = downstream; this.mapper = mapper; - this.errorMode = errorMode; - this.errors = new AtomicThrowable(); this.inner = new ConcatMapMaybeObserver<>(this); - this.queue = new SpscLinkedArrayQueue<>(prefetch); - } - - @Override - public void onSubscribe(Disposable d) { - if (DisposableHelper.validate(upstream, d)) { - upstream = d; - downstream.onSubscribe(this); - } - } - - @Override - public void onNext(T t) { - queue.offer(t); - drain(); - } - - @Override - public void onError(Throwable t) { - if (errors.tryAddThrowableOrReport(t)) { - if (errorMode == ErrorMode.IMMEDIATE) { - inner.dispose(); - } - done = true; - drain(); - } } @Override - public void onComplete() { - done = true; - drain(); - } - - @Override - public void dispose() { - cancelled = true; - upstream.dispose(); - inner.dispose(); - errors.tryTerminateAndReport(); - if (getAndIncrement() == 0) { - queue.clear(); - item = null; - } + void onSubscribeDownstream() { + downstream.onSubscribe(this); } @Override - public boolean isDisposed() { - return cancelled; + void clearValue() { + item = null; } void innerSuccess(R item) { @@ -175,6 +121,12 @@ void innerError(Throwable ex) { } } + @Override + void disposeInner() { + inner.dispose(); + } + + @Override void drain() { if (getAndIncrement() != 0) { return; @@ -183,13 +135,13 @@ void drain() { int missed = 1; Observer downstream = this.downstream; ErrorMode errorMode = this.errorMode; - SimplePlainQueue queue = this.queue; + SimpleQueue queue = this.queue; AtomicThrowable errors = this.errors; for (;;) { for (;;) { - if (cancelled) { + if (disposed) { queue.clear(); item = null; break; @@ -209,7 +161,18 @@ void drain() { if (s == STATE_INACTIVE) { boolean d = done; - T v = queue.poll(); + T v; + + try { + v = queue.poll(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + disposed = true; + upstream.dispose(); + errors.tryAddThrowableOrReport(ex); + errors.tryTerminateConsumer(downstream); + return; + } boolean empty = v == null; if (d && empty) { diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapSingle.java b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapSingle.java index 70d913cd42..3ad2b53cbc 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapSingle.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapSingle.java @@ -14,15 +14,14 @@ package io.reactivex.rxjava3.internal.operators.mixed; import java.util.Objects; -import java.util.concurrent.atomic.*; +import java.util.concurrent.atomic.AtomicReference; import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.exceptions.Exceptions; import io.reactivex.rxjava3.functions.Function; import io.reactivex.rxjava3.internal.disposables.DisposableHelper; -import io.reactivex.rxjava3.internal.fuseable.SimplePlainQueue; -import io.reactivex.rxjava3.internal.queue.SpscLinkedArrayQueue; +import io.reactivex.rxjava3.internal.fuseable.SimpleQueue; import io.reactivex.rxjava3.internal.util.*; /** @@ -61,8 +60,7 @@ protected void subscribeActual(Observer observer) { } static final class ConcatMapSingleMainObserver - extends AtomicInteger - implements Observer, Disposable { + extends ConcatMapXMainObserver { private static final long serialVersionUID = -9140123220065488293L; @@ -70,20 +68,8 @@ static final class ConcatMapSingleMainObserver final Function> mapper; - final AtomicThrowable errors; - final ConcatMapSingleObserver inner; - final SimplePlainQueue queue; - - final ErrorMode errorMode; - - Disposable upstream; - - volatile boolean done; - - volatile boolean cancelled; - R item; volatile int state; @@ -98,78 +84,44 @@ static final class ConcatMapSingleMainObserver ConcatMapSingleMainObserver(Observer downstream, Function> mapper, int prefetch, ErrorMode errorMode) { + super(prefetch, errorMode); this.downstream = downstream; this.mapper = mapper; - this.errorMode = errorMode; - this.errors = new AtomicThrowable(); this.inner = new ConcatMapSingleObserver<>(this); - this.queue = new SpscLinkedArrayQueue<>(prefetch); } - @Override - public void onSubscribe(Disposable d) { - if (DisposableHelper.validate(upstream, d)) { - upstream = d; - downstream.onSubscribe(this); - } - } - - @Override - public void onNext(T t) { - queue.offer(t); + void innerSuccess(R item) { + this.item = item; + this.state = STATE_RESULT_VALUE; drain(); } - @Override - public void onError(Throwable t) { - if (errors.tryAddThrowableOrReport(t)) { - if (errorMode == ErrorMode.IMMEDIATE) { - inner.dispose(); + void innerError(Throwable ex) { + if (errors.tryAddThrowableOrReport(ex)) { + if (errorMode != ErrorMode.END) { + upstream.dispose(); } - done = true; + this.state = STATE_INACTIVE; drain(); } } @Override - public void onComplete() { - done = true; - drain(); - } - - @Override - public void dispose() { - cancelled = true; - upstream.dispose(); + void disposeInner() { inner.dispose(); - errors.tryTerminateAndReport(); - if (getAndIncrement() == 0) { - queue.clear(); - item = null; - } } @Override - public boolean isDisposed() { - return cancelled; + void onSubscribeDownstream() { + downstream.onSubscribe(this); } - void innerSuccess(R item) { - this.item = item; - this.state = STATE_RESULT_VALUE; - drain(); - } - - void innerError(Throwable ex) { - if (errors.tryAddThrowableOrReport(ex)) { - if (errorMode != ErrorMode.END) { - upstream.dispose(); - } - this.state = STATE_INACTIVE; - drain(); - } + @Override + void clearValue() { + item = null; } + @Override void drain() { if (getAndIncrement() != 0) { return; @@ -178,13 +130,13 @@ void drain() { int missed = 1; Observer downstream = this.downstream; ErrorMode errorMode = this.errorMode; - SimplePlainQueue queue = this.queue; + SimpleQueue queue = this.queue; AtomicThrowable errors = this.errors; for (;;) { for (;;) { - if (cancelled) { + if (disposed) { queue.clear(); item = null; break; @@ -204,7 +156,18 @@ void drain() { if (s == STATE_INACTIVE) { boolean d = done; - T v = queue.poll(); + T v; + + try { + v = queue.poll(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + disposed = true; + upstream.dispose(); + errors.tryAddThrowableOrReport(ex); + errors.tryTerminateConsumer(downstream); + return; + } boolean empty = v == null; if (d && empty) { diff --git a/src/main/java/io/reactivex/rxjava3/internal/subscriptions/DeferredScalarSubscription.java b/src/main/java/io/reactivex/rxjava3/internal/subscriptions/DeferredScalarSubscription.java index b50b4e13f9..155f9ebb48 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/subscriptions/DeferredScalarSubscription.java +++ b/src/main/java/io/reactivex/rxjava3/internal/subscriptions/DeferredScalarSubscription.java @@ -115,7 +115,7 @@ public final void complete(T v) { lazySet(FUSED_READY); Subscriber a = downstream; - a.onNext(v); + a.onNext(null); if (get() != CANCELLED) { a.onComplete(); } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapCompletableTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapCompletableTest.java index 28c90619bb..6a67cb9fee 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapCompletableTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapCompletableTest.java @@ -28,7 +28,7 @@ import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription; import io.reactivex.rxjava3.observers.TestObserver; import io.reactivex.rxjava3.plugins.RxJavaPlugins; -import io.reactivex.rxjava3.processors.PublishProcessor; +import io.reactivex.rxjava3.processors.*; import io.reactivex.rxjava3.subjects.CompletableSubject; import io.reactivex.rxjava3.testsupport.*; @@ -66,6 +66,14 @@ public void simpleLongPrefetch() { .assertResult(); } + @Test + public void simpleLongPrefetchHidden() { + Flowable.range(1, 1024).hide() + .concatMapCompletable(Functions.justFunction(Completable.complete()), 32) + .test() + .assertResult(); + } + @Test public void mainError() { Flowable.error(new TestException()) @@ -431,4 +439,54 @@ public Completable apply(Integer v) throws Throwable { } }); } + + @Test + public void basicNonFused() { + Flowable.range(1, 5).hide() + .concatMapCompletable(v -> Completable.complete().hide()) + .test() + .assertResult(); + } + + @Test + public void basicSyncFused() { + Flowable.range(1, 5) + .concatMapCompletable(v -> Completable.complete().hide()) + .test() + .assertResult(); + } + + @Test + public void basicAsyncFused() { + UnicastProcessor up = UnicastProcessor.create(); + TestHelper.emit(up, 1, 2, 3, 4, 5); + + up + .concatMapCompletable(v -> Completable.complete().hide()) + .test() + .assertResult(); + } + + @Test + public void basicFusionRejected() { + TestHelper.rejectFlowableFusion() + .concatMapCompletable(v -> Completable.complete().hide()) + .test() + .assertEmpty(); + } + + @Test + public void fusedPollCrash() { + Flowable.range(1, 5) + .map(v -> { + if (v == 3) { + throw new TestException(); + } + return v; + }) + .compose(TestHelper.flowableStripBoundary()) + .concatMapCompletable(v -> Completable.complete().hide()) + .test() + .assertFailure(TestException.class); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapMaybeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapMaybeTest.java index 8a60f6c367..595740a452 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapMaybeTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapMaybeTest.java @@ -19,11 +19,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import io.reactivex.rxjava3.disposables.Disposable; import org.junit.Test; import org.reactivestreams.Subscriber; import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.exceptions.*; import io.reactivex.rxjava3.functions.Function; import io.reactivex.rxjava3.internal.functions.Functions; @@ -31,7 +31,7 @@ import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription; import io.reactivex.rxjava3.internal.util.ErrorMode; import io.reactivex.rxjava3.plugins.RxJavaPlugins; -import io.reactivex.rxjava3.processors.PublishProcessor; +import io.reactivex.rxjava3.processors.*; import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.subjects.MaybeSubject; import io.reactivex.rxjava3.subscribers.TestSubscriber; @@ -54,15 +54,19 @@ public MaybeSource apply(Integer v) } @Test - public void simpleLong() { + public void simpleLongPrefetch() { Flowable.range(1, 1024) - .concatMapMaybe(new Function>() { - @Override - public MaybeSource apply(Integer v) - throws Exception { - return Maybe.just(v); - } - }, 32) + .concatMapMaybe(Maybe::just, 32) + .test() + .assertValueCount(1024) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void simpleLongPrefetchHidden() { + Flowable.range(1, 1024).hide() + .concatMapMaybe(Maybe::just, 32) .test() .assertValueCount(1024) .assertNoErrors() @@ -464,4 +468,54 @@ public Maybe apply(Integer v) throws Throwable { } }); } + + @Test + public void basicNonFused() { + Flowable.range(1, 5).hide() + .concatMapMaybe(v -> Maybe.just(v).hide()) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void basicSyncFused() { + Flowable.range(1, 5) + .concatMapMaybe(v -> Maybe.just(v).hide()) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void basicAsyncFused() { + UnicastProcessor up = UnicastProcessor.create(); + TestHelper.emit(up, 1, 2, 3, 4, 5); + + up + .concatMapMaybe(v -> Maybe.just(v).hide()) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void basicFusionRejected() { + TestHelper.rejectFlowableFusion() + .concatMapMaybe(v -> Maybe.just(v).hide()) + .test() + .assertEmpty(); + } + + @Test + public void fusedPollCrash() { + Flowable.range(1, 5) + .map(v -> { + if (v == 3) { + throw new TestException(); + } + return v; + }) + .compose(TestHelper.flowableStripBoundary()) + .concatMapMaybe(v -> Maybe.just(v).hide()) + .test() + .assertFailure(TestException.class, 1, 2); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapSingleTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapSingleTest.java index a2e5d73c09..73426eda72 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapSingleTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapSingleTest.java @@ -30,7 +30,7 @@ import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription; import io.reactivex.rxjava3.internal.util.ErrorMode; import io.reactivex.rxjava3.plugins.RxJavaPlugins; -import io.reactivex.rxjava3.processors.PublishProcessor; +import io.reactivex.rxjava3.processors.*; import io.reactivex.rxjava3.subjects.SingleSubject; import io.reactivex.rxjava3.subscribers.TestSubscriber; import io.reactivex.rxjava3.testsupport.*; @@ -52,15 +52,19 @@ public SingleSource apply(Integer v) } @Test - public void simpleLong() { + public void simpleLongPrefetch() { Flowable.range(1, 1024) - .concatMapSingle(new Function>() { - @Override - public SingleSource apply(Integer v) - throws Exception { - return Single.just(v); - } - }, 32) + .concatMapSingle(Single::just, 32) + .test() + .assertValueCount(1024) + .assertNoErrors() + .assertComplete(); + } + + @Test + public void simpleLongPrefetchHidden() { + Flowable.range(1, 1024).hide() + .concatMapSingle(Single::just, 32) .test() .assertValueCount(1024) .assertNoErrors() @@ -382,4 +386,54 @@ public Single apply(Integer v) throws Throwable { } }); } + + @Test + public void basicNonFused() { + Flowable.range(1, 5).hide() + .concatMapSingle(v -> Single.just(v).hide()) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void basicSyncFused() { + Flowable.range(1, 5) + .concatMapSingle(v -> Single.just(v).hide()) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void basicAsyncFused() { + UnicastProcessor up = UnicastProcessor.create(); + TestHelper.emit(up, 1, 2, 3, 4, 5); + + up + .concatMapSingle(v -> Single.just(v).hide()) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void basicFusionRejected() { + TestHelper.rejectFlowableFusion() + .concatMapSingle(v -> Single.just(v).hide()) + .test() + .assertEmpty(); + } + + @Test + public void fusedPollCrash() { + Flowable.range(1, 5) + .map(v -> { + if (v == 3) { + throw new TestException(); + } + return v; + }) + .compose(TestHelper.flowableStripBoundary()) + .concatMapSingle(v -> Single.just(v).hide()) + .test() + .assertFailure(TestException.class, 1, 2); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapCompletableTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapCompletableTest.java index 540e0828af..ae0ce25d91 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapCompletableTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapCompletableTest.java @@ -474,4 +474,54 @@ public Completable apply(Integer v) throws Throwable { } }); } + + @Test + public void basicNonFused() { + Observable.range(1, 5).hide() + .concatMapCompletable(v -> Completable.complete().hide()) + .test() + .assertResult(); + } + + @Test + public void basicSyncFused() { + Observable.range(1, 5) + .concatMapCompletable(v -> Completable.complete().hide()) + .test() + .assertResult(); + } + + @Test + public void basicAsyncFused() { + UnicastSubject us = UnicastSubject.create(); + TestHelper.emit(us, 1, 2, 3, 4, 5); + + us + .concatMapCompletable(v -> Completable.complete().hide()) + .test() + .assertResult(); + } + + @Test + public void basicFusionRejected() { + TestHelper.rejectObservableFusion() + .concatMapCompletable(v -> Completable.complete().hide()) + .test() + .assertEmpty(); + } + + @Test + public void fusedPollCrash() { + Observable.range(1, 5) + .map(v -> { + if (v == 3) { + throw new TestException(); + } + return v; + }) + .compose(TestHelper.observableStripBoundary()) + .concatMapCompletable(v -> Completable.complete().hide()) + .test() + .assertFailure(TestException.class); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapMaybeTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapMaybeTest.java index 323cbc3270..120e027538 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapMaybeTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapMaybeTest.java @@ -485,4 +485,54 @@ public Maybe apply(Integer v) throws Throwable { } }); } + + @Test + public void basicNonFused() { + Observable.range(1, 5).hide() + .concatMapMaybe(v -> Maybe.just(v).hide()) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void basicSyncFused() { + Observable.range(1, 5) + .concatMapMaybe(v -> Maybe.just(v).hide()) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void basicAsyncFused() { + UnicastSubject us = UnicastSubject.create(); + TestHelper.emit(us, 1, 2, 3, 4, 5); + + us + .concatMapMaybe(v -> Maybe.just(v).hide()) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void basicFusionRejected() { + TestHelper.rejectObservableFusion() + .concatMapMaybe(v -> Maybe.just(v).hide()) + .test() + .assertEmpty(); + } + + @Test + public void fusedPollCrash() { + Observable.range(1, 5) + .map(v -> { + if (v == 3) { + throw new TestException(); + } + return v; + }) + .compose(TestHelper.observableStripBoundary()) + .concatMapMaybe(v -> Maybe.just(v).hide()) + .test() + .assertFailure(TestException.class, 1, 2); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapSingleTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapSingleTest.java index 90f15ac250..76bf708e55 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapSingleTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapSingleTest.java @@ -425,4 +425,54 @@ public Single apply(Integer v) throws Throwable { } }); } + + @Test + public void basicNonFused() { + Observable.range(1, 5).hide() + .concatMapSingle(v -> Single.just(v).hide()) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void basicSyncFused() { + Observable.range(1, 5) + .concatMapSingle(v -> Single.just(v).hide()) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void basicAsyncFused() { + UnicastSubject us = UnicastSubject.create(); + TestHelper.emit(us, 1, 2, 3, 4, 5); + + us + .concatMapSingle(v -> Single.just(v).hide()) + .test() + .assertResult(1, 2, 3, 4, 5); + } + + @Test + public void basicFusionRejected() { + TestHelper.rejectObservableFusion() + .concatMapSingle(v -> Single.just(v).hide()) + .test() + .assertEmpty(); + } + + @Test + public void fusedPollCrash() { + Observable.range(1, 5) + .map(v -> { + if (v == 3) { + throw new TestException(); + } + return v; + }) + .compose(TestHelper.observableStripBoundary()) + .concatMapSingle(v -> Single.just(v).hide()) + .test() + .assertFailure(TestException.class, 1, 2); + } } diff --git a/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java b/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java index 1963a38459..9a19f84a7a 100644 --- a/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java +++ b/src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java @@ -3018,6 +3018,12 @@ public static void checkInvalidParallelSubscribers(ParallelFlowable sourc } } + /** + * Creates a fuseable Observable that does not emit anything but rejects + * fusion requests. + * @param the element type + * @return the new Observable + */ public static Observable rejectObservableFusion() { return new Observable() { @Override @@ -3066,6 +3072,12 @@ public boolean isDisposed() { }; } + /** + * Creates a fuseable Flowable that does not emit anything but rejects + * fusion requests. + * @param the element type + * @return the new Observable + */ public static Flowable rejectFlowableFusion() { return new Flowable() { @Override