diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDispose.java b/autodispose/src/main/java/com/uber/autodispose/AutoDispose.java index d2a4ed0f9..fa4333f62 100644 --- a/autodispose/src/main/java/com/uber/autodispose/AutoDispose.java +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDispose.java @@ -17,6 +17,7 @@ package com.uber.autodispose; import io.reactivex.Completable; +import io.reactivex.CompletableObserver; import io.reactivex.Flowable; import io.reactivex.Maybe; import io.reactivex.MaybeSource; @@ -339,8 +340,44 @@ public static AutoDisposeConverter autoDisposable(final Maybe scope) { }; } - @Override public CompletableSubscribeProxy apply(Completable upstream) { - return upstream.to(new CompletableScoper(scope)); + @Override public CompletableSubscribeProxy apply(final Completable upstream) { + return new CompletableSubscribeProxy() { + @Override public Disposable subscribe() { + return new AutoDisposeCompletable(upstream, scope).subscribe(); + } + + @Override public Disposable subscribe(Action action) { + return new AutoDisposeCompletable(upstream, scope).subscribe(action); + } + + @Override + public Disposable subscribe(Action action, Consumer onError) { + return new AutoDisposeCompletable(upstream, scope).subscribe(action, onError); + } + + @Override public void subscribe(CompletableObserver observer) { + new AutoDisposeCompletable(upstream, scope).subscribe(observer); + } + + @Override public E subscribeWith(E observer) { + return new AutoDisposeCompletable(upstream, scope).subscribeWith(observer); + } + + @Override public TestObserver test() { + TestObserver observer = new TestObserver<>(); + subscribe(observer); + return observer; + } + + @Override public TestObserver test(boolean cancel) { + TestObserver observer = new TestObserver<>(); + if (cancel) { + observer.cancel(); + } + subscribe(observer); + return observer; + } + }; } @Override public FlowableSubscribeProxy apply(Flowable upstream) { diff --git a/autodispose/src/main/java/com/uber/autodispose/AutoDisposeCompletable.java b/autodispose/src/main/java/com/uber/autodispose/AutoDisposeCompletable.java new file mode 100644 index 000000000..9b27228c4 --- /dev/null +++ b/autodispose/src/main/java/com/uber/autodispose/AutoDisposeCompletable.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2018. Uber Technologies + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.autodispose; + +import io.reactivex.Completable; +import io.reactivex.CompletableObserver; +import io.reactivex.Maybe; + +final class AutoDisposeCompletable extends Completable { + + private final Completable source; + private final Maybe scope; + + AutoDisposeCompletable(Completable source, Maybe scope) { + this.source = source; + this.scope = scope; + } + + @Override + protected void subscribeActual(CompletableObserver completableObserver) { + source.subscribe(new AutoDisposingCompletableObserverImpl(scope, completableObserver)); + } +} diff --git a/autodispose/src/main/java/com/uber/autodispose/CompletableScoper.java b/autodispose/src/main/java/com/uber/autodispose/CompletableScoper.java index 98a4b4214..9328a731e 100644 --- a/autodispose/src/main/java/com/uber/autodispose/CompletableScoper.java +++ b/autodispose/src/main/java/com/uber/autodispose/CompletableScoper.java @@ -17,14 +17,8 @@ package com.uber.autodispose; import io.reactivex.Completable; -import io.reactivex.CompletableObserver; -import io.reactivex.CompletableSource; import io.reactivex.Maybe; -import io.reactivex.disposables.Disposable; -import io.reactivex.functions.Action; -import io.reactivex.functions.Consumer; import io.reactivex.functions.Function; -import io.reactivex.observers.TestObserver; /** * Entry point for auto-disposing {@link Completable}s. @@ -45,8 +39,8 @@ * @deprecated Use the static factories in {@link AutoDispose}. This will be removed in 1.0. */ @Deprecated -public class CompletableScoper extends BaseAutoDisposeConverter - implements Function { +public class CompletableScoper extends BaseAutoDisposeConverter implements + Function { public CompletableScoper(ScopeProvider provider) { super(provider); @@ -60,57 +54,8 @@ public CompletableScoper(Maybe lifecycle) { super(lifecycle); } - @Override public CompletableSubscribeProxy apply(final Completable maybeSource) throws Exception { - return new CompletableSubscribeProxy() { - @Override public Disposable subscribe() { - return new AutoDisposeCompletable(maybeSource, scope()).subscribe(); - } - - @Override public Disposable subscribe(Action action) { - return new AutoDisposeCompletable(maybeSource, scope()).subscribe(action); - } - - @Override public Disposable subscribe(Action action, Consumer onError) { - return new AutoDisposeCompletable(maybeSource, scope()).subscribe(action, onError); - } - - @Override public void subscribe(CompletableObserver observer) { - new AutoDisposeCompletable(maybeSource, scope()).subscribe(observer); - } - - @Override public E subscribeWith(E observer) { - return new AutoDisposeCompletable(maybeSource, scope()).subscribeWith(observer); - } - - @Override public TestObserver test() { - TestObserver observer = new TestObserver<>(); - subscribe(observer); - return observer; - } - - @Override public TestObserver test(boolean cancel) { - TestObserver observer = new TestObserver<>(); - if (cancel) { - observer.cancel(); - } - subscribe(observer); - return observer; - } - }; - } - - static final class AutoDisposeCompletable extends Completable { - private final CompletableSource source; - private final Maybe scope; - - AutoDisposeCompletable(CompletableSource source, Maybe scope) { - this.source = source; - this.scope = scope; - } - - @Override protected void subscribeActual(CompletableObserver observer) { - source.subscribe(new AutoDisposingCompletableObserverImpl(scope, observer)); - } + @Override + public CompletableSubscribeProxy apply(final Completable maybeSource) { + return maybeSource.as(AutoDispose.autoDisposable(scope())); } } -