From a008e03484177f48ec7fef4c311705bb43fd8ec9 Mon Sep 17 00:00:00 2001 From: David Karnok Date: Wed, 1 Aug 2018 22:20:36 +0200 Subject: [PATCH] 2.x: Improve Completable.onErrorResumeNext internals (#6123) * 2.x: Improve Completable.onErrorResumeNext internals * Use ObjectHelper --- .../completable/CompletableResumeNext.java | 76 +++++++++---------- .../completable/CompletableTest.java | 7 +- .../CompletableResumeNextTest.java | 61 +++++++++++++++ 3 files changed, 104 insertions(+), 40 deletions(-) create mode 100644 src/test/java/io/reactivex/internal/operators/completable/CompletableResumeNextTest.java diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableResumeNext.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableResumeNext.java index 16c46aeb24..940942c943 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableResumeNext.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableResumeNext.java @@ -13,11 +13,14 @@ package io.reactivex.internal.operators.completable; +import java.util.concurrent.atomic.AtomicReference; + import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.*; import io.reactivex.functions.Function; -import io.reactivex.internal.disposables.SequentialDisposable; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; public final class CompletableResumeNext extends Completable { @@ -35,20 +38,32 @@ public CompletableResumeNext(CompletableSource source, @Override protected void subscribeActual(final CompletableObserver observer) { - - final SequentialDisposable sd = new SequentialDisposable(); - observer.onSubscribe(sd); - source.subscribe(new ResumeNext(observer, sd)); + ResumeNextObserver parent = new ResumeNextObserver(observer, errorMapper); + observer.onSubscribe(parent); + source.subscribe(parent); } - final class ResumeNext implements CompletableObserver { + static final class ResumeNextObserver + extends AtomicReference + implements CompletableObserver, Disposable { + + private static final long serialVersionUID = 5018523762564524046L; final CompletableObserver downstream; - final SequentialDisposable sd; - ResumeNext(CompletableObserver observer, SequentialDisposable sd) { + final Function errorMapper; + + boolean once; + + ResumeNextObserver(CompletableObserver observer, Function errorMapper) { this.downstream = observer; - this.sd = sd; + this.errorMapper = errorMapper; + } + + + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.replace(this, d); } @Override @@ -58,48 +73,33 @@ public void onComplete() { @Override public void onError(Throwable e) { + if (once) { + downstream.onError(e); + return; + } + once = true; + CompletableSource c; try { - c = errorMapper.apply(e); + c = ObjectHelper.requireNonNull(errorMapper.apply(e), "The errorMapper returned a null CompletableSource"); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); - downstream.onError(new CompositeException(ex, e)); + downstream.onError(new CompositeException(e, ex)); return; } - if (c == null) { - NullPointerException npe = new NullPointerException("The CompletableConsumable returned is null"); - npe.initCause(e); - downstream.onError(npe); - return; - } - - c.subscribe(new OnErrorObserver()); + c.subscribe(this); } @Override - public void onSubscribe(Disposable d) { - sd.update(d); + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); } - final class OnErrorObserver implements CompletableObserver { - - @Override - public void onComplete() { - downstream.onComplete(); - } - - @Override - public void onError(Throwable e) { - downstream.onError(e); - } - - @Override - public void onSubscribe(Disposable d) { - sd.update(d); - } - + @Override + public void dispose() { + DisposableHelper.dispose(this); } } } diff --git a/src/test/java/io/reactivex/completable/CompletableTest.java b/src/test/java/io/reactivex/completable/CompletableTest.java index 5dcfa1b531..dd4bdfc9e3 100644 --- a/src/test/java/io/reactivex/completable/CompletableTest.java +++ b/src/test/java/io/reactivex/completable/CompletableTest.java @@ -2170,8 +2170,11 @@ public Completable apply(Throwable e) { try { c.blockingAwait(); Assert.fail("Did not throw an exception"); - } catch (NullPointerException ex) { - Assert.assertTrue(ex.getCause() instanceof TestException); + } catch (CompositeException ex) { + List errors = ex.getExceptions(); + TestHelper.assertError(errors, 0, TestException.class); + TestHelper.assertError(errors, 1, NullPointerException.class); + assertEquals(2, errors.size()); } } diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableResumeNextTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableResumeNextTest.java new file mode 100644 index 0000000000..c2a3af2769 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableResumeNextTest.java @@ -0,0 +1,61 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; +import io.reactivex.internal.functions.Functions; + +public class CompletableResumeNextTest { + + @Test + public void resumeWithError() { + Completable.error(new TestException()) + .onErrorResumeNext(Functions.justFunction(Completable.error(new TestException("second")))) + .test() + .assertFailureAndMessage(TestException.class, "second"); + } + + @Test + public void disposeInMain() { + TestHelper.checkDisposedCompletable(new Function() { + @Override + public CompletableSource apply(Completable c) throws Exception { + return c.onErrorResumeNext(Functions.justFunction(Completable.complete())); + } + }); + } + + + @Test + public void disposeInResume() { + TestHelper.checkDisposedCompletable(new Function() { + @Override + public CompletableSource apply(Completable c) throws Exception { + return Completable.error(new TestException()).onErrorResumeNext(Functions.justFunction(c)); + } + }); + } + + @Test + public void disposed() { + TestHelper.checkDisposed( + Completable.error(new TestException()) + .onErrorResumeNext(Functions.justFunction(Completable.never())) + ); + } +}