Skip to content

Commit

Permalink
Lint suppressions and use custom observers in lifecycle subscriptions (
Browse files Browse the repository at this point in the history
…#108)

* Suppress warnings in weaker access calls

* Create full disposable observers with documented noop and less alloc

This changes the lifecycle subscriptions to use full observers to save some allocations, and also documents the noop on completions

* Add unbound() factory

* Inline subscribe check calls to observer

Saves some more allocs
  • Loading branch information
ZacSweers authored Oct 16, 2017
1 parent 9b26a0c commit 5e00b57
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.functions.BiConsumer;
import io.reactivex.functions.Consumer;
import io.reactivex.observers.DisposableMaybeObserver;
import java.util.concurrent.atomic.AtomicReference;

final class AutoDisposingCompletableObserverImpl implements AutoDisposingCompletableObserver {
Expand All @@ -43,20 +42,23 @@ final class AutoDisposingCompletableObserverImpl implements AutoDisposingComplet

@Override public void onSubscribe(final Disposable d) {
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable,
lifecycle.doOnEvent(new BiConsumer<Object, Throwable>() {
@Override public void accept(Object o, Throwable throwable) throws Exception {
lifecycle.subscribeWith(new DisposableMaybeObserver<Object>() {
@Override public void onSuccess(Object o) {
callMainSubscribeIfNecessary(d);
AutoDisposingCompletableObserverImpl.this.dispose();
}
})
.subscribe(new Consumer<Object>() {
@Override public void accept(Object o) throws Exception {
dispose();
}
}, new Consumer<Throwable>() {
@Override public void accept(Throwable e1) throws Exception {
onError(e1);
}
}), getClass())) {

@Override public void onError(Throwable e) {
callMainSubscribeIfNecessary(d);
AutoDisposingCompletableObserverImpl.this.onError(e);
}

@Override public void onComplete() {
callMainSubscribeIfNecessary(d);
// Noop - we're unbound now
}
}),
getClass())) {
if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) {
delegate.onSubscribe(this);
}
Expand All @@ -81,7 +83,7 @@ private void lazyDispose() {
}
}

/* private */
@SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors
void callMainSubscribeIfNecessary(Disposable d) {
// If we've never actually called the downstream onSubscribe (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty disposable instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import io.reactivex.MaybeObserver;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.functions.BiConsumer;
import io.reactivex.functions.Consumer;
import io.reactivex.observers.DisposableMaybeObserver;
import java.util.concurrent.atomic.AtomicReference;

final class AutoDisposingMaybeObserverImpl<T> implements AutoDisposingMaybeObserver<T> {
Expand All @@ -43,20 +42,23 @@ final class AutoDisposingMaybeObserverImpl<T> implements AutoDisposingMaybeObser

@Override public void onSubscribe(final Disposable d) {
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable,
lifecycle.doOnEvent(new BiConsumer<Object, Throwable>() {
@Override public void accept(Object o, Throwable throwable) throws Exception {
lifecycle.subscribeWith(new DisposableMaybeObserver<Object>() {
@Override public void onSuccess(Object o) {
callMainSubscribeIfNecessary(d);
AutoDisposingMaybeObserverImpl.this.dispose();
}
})
.subscribe(new Consumer<Object>() {
@Override public void accept(Object o) throws Exception {
dispose();
}
}, new Consumer<Throwable>() {
@Override public void accept(Throwable e1) throws Exception {
onError(e1);
}
}), getClass())) {

@Override public void onError(Throwable e) {
callMainSubscribeIfNecessary(d);
AutoDisposingMaybeObserverImpl.this.onError(e);
}

@Override public void onComplete() {
callMainSubscribeIfNecessary(d);
// Noop - we're unbound now
}
}),
getClass())) {
if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) {
delegate.onSubscribe(this);
}
Expand All @@ -81,7 +83,7 @@ private void lazyDispose() {
}
}

/* private */
@SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors
void callMainSubscribeIfNecessary(Disposable d) {
// If we've never actually called the downstream onSubscribe (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty disposable instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.functions.BiConsumer;
import io.reactivex.functions.Consumer;
import io.reactivex.observers.DisposableMaybeObserver;
import java.util.concurrent.atomic.AtomicReference;

final class AutoDisposingObserverImpl<T> implements AutoDisposingObserver<T> {
Expand All @@ -43,20 +42,23 @@ final class AutoDisposingObserverImpl<T> implements AutoDisposingObserver<T> {

@Override public void onSubscribe(final Disposable d) {
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable,
lifecycle.doOnEvent(new BiConsumer<Object, Throwable>() {
@Override public void accept(Object o, Throwable throwable) throws Exception {
lifecycle.subscribeWith(new DisposableMaybeObserver<Object>() {
@Override public void onSuccess(Object o) {
callMainSubscribeIfNecessary(d);
AutoDisposingObserverImpl.this.dispose();
}
})
.subscribe(new Consumer<Object>() {
@Override public void accept(Object o) throws Exception {
dispose();
}
}, new Consumer<Throwable>() {
@Override public void accept(Throwable e) throws Exception {
AutoDisposingObserverImpl.this.onError(e);
}
}), getClass())) {

@Override public void onError(Throwable e) {
callMainSubscribeIfNecessary(d);
AutoDisposingObserverImpl.this.onError(e);
}

@Override public void onComplete() {
callMainSubscribeIfNecessary(d);
// Noop - we're unbound now
}
}),
getClass())) {
if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) {
delegate.onSubscribe(this);
}
Expand All @@ -81,7 +83,7 @@ private void lazyDispose() {
}
}

/* private */
@SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors
void callMainSubscribeIfNecessary(Disposable d) {
// If we've never actually called the downstream onSubscribe (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty disposable instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import io.reactivex.SingleObserver;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.functions.BiConsumer;
import io.reactivex.functions.Consumer;
import io.reactivex.observers.DisposableMaybeObserver;
import java.util.concurrent.atomic.AtomicReference;

final class AutoDisposingSingleObserverImpl<T> implements AutoDisposingSingleObserver<T> {
Expand All @@ -43,20 +42,23 @@ final class AutoDisposingSingleObserverImpl<T> implements AutoDisposingSingleObs

@Override public void onSubscribe(final Disposable d) {
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable,
lifecycle.doOnEvent(new BiConsumer<Object, Throwable>() {
@Override public void accept(Object o, Throwable throwable) throws Exception {
lifecycle.subscribeWith(new DisposableMaybeObserver<Object>() {
@Override public void onSuccess(Object o) {
callMainSubscribeIfNecessary(d);
AutoDisposingSingleObserverImpl.this.dispose();
}
})
.subscribe(new Consumer<Object>() {
@Override public void accept(Object o) throws Exception {
dispose();
}
}, new Consumer<Throwable>() {
@Override public void accept(Throwable e) throws Exception {
AutoDisposingSingleObserverImpl.this.onError(e);
}
}), getClass())) {

@Override public void onError(Throwable e) {
callMainSubscribeIfNecessary(d);
AutoDisposingSingleObserverImpl.this.onError(e);
}

@Override public void onComplete() {
callMainSubscribeIfNecessary(d);
// Noop - we're unbound now
}
}),
getClass())) {
if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) {
delegate.onSubscribe(this);
}
Expand All @@ -81,7 +83,7 @@ private void lazyDispose() {
}
}

/* private */
@SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors
void callMainSubscribeIfNecessary(Disposable d) {
// If we've never actually called the downstream onSubscribe (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty disposable instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
import com.uber.autodispose.observers.AutoDisposingSubscriber;
import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.BiConsumer;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.subscriptions.EmptySubscription;
import io.reactivex.observers.DisposableMaybeObserver;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
Expand All @@ -44,20 +43,23 @@ final class AutoDisposingSubscriberImpl<T> implements AutoDisposingSubscriber<T>

@Override public void onSubscribe(final Subscription s) {
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable,
lifecycle.doOnEvent(new BiConsumer<Object, Throwable>() {
@Override public void accept(Object o, Throwable throwable) throws Exception {
lifecycle.subscribeWith(new DisposableMaybeObserver<Object>() {
@Override public void onSuccess(Object o) {
callMainSubscribeIfNecessary(s);
AutoDisposingSubscriberImpl.this.dispose();
}
})
.subscribe(new Consumer<Object>() {
@Override public void accept(Object o) throws Exception {
dispose();
}
}, new Consumer<Throwable>() {
@Override public void accept(Throwable e) throws Exception {
AutoDisposingSubscriberImpl.this.onError(e);
}
}), getClass())) {

@Override public void onError(Throwable e) {
callMainSubscribeIfNecessary(s);
AutoDisposingSubscriberImpl.this.onError(e);
}

@Override public void onComplete() {
callMainSubscribeIfNecessary(s);
// Noop - we're unbound now
}
}),
getClass())) {
if (AutoDisposeEndConsumerHelper.setOnce(mainSubscription, s, getClass())) {
delegate.onSubscribe(this);
}
Expand Down Expand Up @@ -99,7 +101,7 @@ private void lazyCancel() {
}
}

/* private */
@SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors
void callMainSubscribeIfNecessary(Subscription s) {
// If we've never actually started the upstream subscription (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty subscription instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ public static TestScopeProvider create(Maybe<?> delegate) {
return new TestScopeProvider(delegate);
}

/**
* Creates a new provider that is "unbound", e.g. will emit a completion event to signal that the
* scope is unbound.
*
* @return the created TestScopeProvider
*/
public static TestScopeProvider unbound() {
return create(Maybe.empty());
}

private final MaybeSubject<Object> innerMaybe = MaybeSubject.create();

private TestScopeProvider(Maybe<?> delegate) {
Expand Down

0 comments on commit 5e00b57

Please sign in to comment.