Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incorporate feedback from #130 #138

Merged
merged 25 commits into from
Dec 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
bd22bc2
Remove unnecessary synchronized blocks
ZacSweers Nov 11, 2017
caf904a
Fix introspection typo
ZacSweers Nov 11, 2017
79e6dbf
Fix introspection typo in observer
ZacSweers Nov 11, 2017
94212eb
Make AutoDispose ScopeHandlers final
ZacSweers Nov 11, 2017
9bf1993
Extract DisposableMaybeObserver and setOnce first
ZacSweers Nov 12, 2017
53eef69
Opportunistic tweak of RxErrorsRule
ZacSweers Nov 18, 2017
7ac1488
Lazyset disposables in scope callbacks to prevent double subscriptions
ZacSweers Nov 18, 2017
d564b40
Inline lazyDispose() methods
ZacSweers Nov 18, 2017
cb432a8
Wire up HalfSerializer to observer and subscribers
ZacSweers Nov 18, 2017
07f0f1e
Fix ViewAttachEventsObservable not observing observer contract
ZacSweers Nov 18, 2017
ec190c0
Fix LifecycleEventsObservable not observing observer contract
ZacSweers Nov 18, 2017
9606ae4
Inline lazyDispose() impl
ZacSweers Nov 18, 2017
b38d03f
Remove unnecessary nullcheck in request()
ZacSweers Nov 18, 2017
7705288
Suppress nullaway false positive
ZacSweers Nov 18, 2017
daad41b
Initialize atomic throwables inline
ZacSweers Nov 18, 2017
500f337
Use deferred requesting
ZacSweers Nov 18, 2017
e7e1b87
Call delegate.onSubscribe() first, remove callSubscribeIfNecessary
ZacSweers Nov 27, 2017
a0d8556
Check if observer is disposed in lifecycle events
ZacSweers Nov 27, 2017
22ba683
Check if listener is disposed in viewattacheventsobservable
ZacSweers Nov 27, 2017
959fe09
Remove private to avoid synthetic accessor
ZacSweers Nov 27, 2017
6a45aa4
Use non-rx-internal subscriptionhelper APIs
ZacSweers Nov 27, 2017
44b362f
Opportunistic fix missing synchronized in overridden method
ZacSweers Nov 27, 2017
ba8bd87
Remove callSubscribeIfNecessary, avoid duplicate subscribes
ZacSweers Nov 27, 2017
9c3edf4
Switch orders of lazySet + dispose to flush
ZacSweers Nov 27, 2017
3b87876
Optimize lifecycle error propagation a bit
ZacSweers Nov 27, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,18 @@ void backfillEvents() {
}

@Override protected void subscribeActual(Observer<? super Event> observer) {
ArchLifecycleObserver archObserver =
new ArchLifecycleObserver(lifecycle, observer, eventsObservable);
observer.onSubscribe(archObserver);
if (!isMainThread()) {
observer.onError(
new IllegalStateException("Lifecycles can only be bound to on the main thread!"));
return;
}
ArchLifecycleObserver archObserver =
new ArchLifecycleObserver(lifecycle, observer, eventsObservable);
observer.onSubscribe(archObserver);
lifecycle.addObserver(archObserver);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the Observer disposes immediately, this will still add the listener and keep a reference to it. You should check for isDisposed after this add and call removeObserver:

lifecycle.addObserver(archObserver);
if (archObserver.isDisposed()) {
    lifecycle.removeObserver();
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (archObserver.isDisposed()) {
lifecycle.removeObserver(archObserver);
}
}

static final class ArchLifecycleObserver extends MainThreadDisposable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ final class ViewAttachEventsObservable extends Observable<ViewLifecycleEvent> {
}

@Override protected void subscribeActual(Observer<? super ViewLifecycleEvent> observer) {
Listener listener = new Listener(view, observer);
observer.onSubscribe(listener);
if (!isMainThread()) {
observer.onError(new IllegalStateException("Views can only be bound to on the main thread!"));
return;
Expand All @@ -46,9 +48,10 @@ final class ViewAttachEventsObservable extends Observable<ViewLifecycleEvent> {
// Emit the last event, like a behavior subject
observer.onNext(ViewLifecycleEvent.ATTACH);
}
Listener listener = new Listener(view, observer);
observer.onSubscribe(listener);
view.addOnAttachStateChangeListener(listener);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here too, if disposed, the listener will remain added.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (listener.isDisposed()) {
view.removeOnAttachStateChangeListener(listener);
}
}

static final class Listener extends MainThreadDisposable
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.uber.autodispose;

import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;

/**
* Atomic container for Throwables including combining and having a
* terminal state via ExceptionHelper.
* <p>
* Watch out for the leaked AtomicReference methods!
*/
final class AtomicThrowable extends AtomicReference<Throwable> {

private static final long serialVersionUID = 3949248817947090603L;

/**
* Atomically adds a Throwable to this container (combining with a previous Throwable is
* necessary).
*
* @param t the throwable to add
* @return true if successful, false if the container has been terminated
*/
public boolean addThrowable(Throwable t) {
return ExceptionHelper.addThrowable(this, t);
}

/**
* Atomically terminate the container and return the contents of the last
* non-terminal Throwable of it.
*
* @return the last Throwable
*/
@Nullable
public Throwable terminate() {
return ExceptionHelper.terminate(this);
}

public boolean isTerminated() {
return get() == ExceptionHelper.TERMINATED;
}
}
12 changes: 6 additions & 6 deletions autodispose/src/main/java/com/uber/autodispose/AutoDispose.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ public interface ScopeHandler {
return new LifecycleScopeProviderHandlerImpl(scope);
}

private static class MaybeScopeHandlerImpl implements ScopeHandler {
private static final class MaybeScopeHandlerImpl implements ScopeHandler {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd avoid private in case you don't want to get accessor methods on Android.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Fixed in 959fe09 but planning to remove this class in the near future anyway now that as() is in RxJava :)


private final Maybe<?> scope;
final Maybe<?> scope;

MaybeScopeHandlerImpl(Maybe<?> scope) {
this.scope = scope;
Expand Down Expand Up @@ -196,9 +196,9 @@ public <T> Function<Observable<? extends T>, ObservableSubscribeProxy<T>> forObs
}
}

private static class ScopeProviderHandlerImpl implements ScopeHandler {
private static final class ScopeProviderHandlerImpl implements ScopeHandler {

private final ScopeProvider scope;
final ScopeProvider scope;

ScopeProviderHandlerImpl(ScopeProvider scope) {
this.scope = scope;
Expand Down Expand Up @@ -226,9 +226,9 @@ public <T> Function<Observable<? extends T>, ObservableSubscribeProxy<T>> forObs
}
}

private static class LifecycleScopeProviderHandlerImpl implements ScopeHandler {
private static final class LifecycleScopeProviderHandlerImpl implements ScopeHandler {

private final LifecycleScopeProvider<?> scope;
final LifecycleScopeProvider<?> scope;

LifecycleScopeProviderHandlerImpl(LifecycleScopeProvider<?> scope) {
this.scope = scope;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.reactivex.CompletableObserver;
import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.observers.DisposableMaybeObserver;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -41,27 +40,27 @@ final class AutoDisposingCompletableObserverImpl implements AutoDisposingComplet
}

@Override public void onSubscribe(final Disposable d) {
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable,
lifecycle.subscribeWith(new DisposableMaybeObserver<Object>() {
@Override public void onSuccess(Object o) {
callMainSubscribeIfNecessary(d);
AutoDisposingCompletableObserverImpl.this.dispose();
}
DisposableMaybeObserver<Object> o = new DisposableMaybeObserver<Object>() {
@Override public void onSuccess(Object o) {
lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED);
AutoDisposableHelper.dispose(mainDisposable);
}

@Override public void onError(Throwable e) {
callMainSubscribeIfNecessary(d);
AutoDisposingCompletableObserverImpl.this.onError(e);
}
@Override public void onError(Throwable e) {
lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED);
AutoDisposingCompletableObserverImpl.this.onError(e);
}

@Override public void onComplete() {
callMainSubscribeIfNecessary(d);
// Noop - we're unbound now
}
}),
getClass())) {
if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) {
delegate.onSubscribe(this);
@Override public void onComplete() {
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED);
// Noop - we're unbound now
}
};
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, o, getClass())) {
delegate.onSubscribe(this);
lifecycle.subscribe(o);
AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass());
}
}

Expand All @@ -70,39 +69,22 @@ final class AutoDisposingCompletableObserverImpl implements AutoDisposingComplet
}

@Override public void dispose() {
synchronized (this) {
AutoDisposableHelper.dispose(lifecycleDisposable);
AutoDisposableHelper.dispose(mainDisposable);
}
}

private void lazyDispose() {
synchronized (this) {
AutoDisposableHelper.dispose(lifecycleDisposable);
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
}
}

@SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors
void callMainSubscribeIfNecessary(Disposable d) {
// If we've never actually called the downstream onSubscribe (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty disposable instance
// to abide by the Observer contract.
if (AutoDisposableHelper.setIfNotSet(mainDisposable, d)) {
delegate.onSubscribe(Disposables.disposed());
}
AutoDisposableHelper.dispose(lifecycleDisposable);
AutoDisposableHelper.dispose(mainDisposable);
}

@Override public void onComplete() {
if (!isDisposed()) {
lazyDispose();
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
AutoDisposableHelper.dispose(lifecycleDisposable);
delegate.onComplete();
}
}

@Override public void onError(Throwable e) {
if (!isDisposed()) {
lazyDispose();
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
AutoDisposableHelper.dispose(lifecycleDisposable);
delegate.onError(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.reactivex.Maybe;
import io.reactivex.MaybeObserver;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.observers.DisposableMaybeObserver;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -41,27 +40,27 @@ final class AutoDisposingMaybeObserverImpl<T> implements AutoDisposingMaybeObser
}

@Override public void onSubscribe(final Disposable d) {
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable,
lifecycle.subscribeWith(new DisposableMaybeObserver<Object>() {
@Override public void onSuccess(Object o) {
callMainSubscribeIfNecessary(d);
AutoDisposingMaybeObserverImpl.this.dispose();
}
DisposableMaybeObserver<Object> o = new DisposableMaybeObserver<Object>() {
@Override public void onSuccess(Object o) {
lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED);
AutoDisposableHelper.dispose(mainDisposable);
}

@Override public void onError(Throwable e) {
callMainSubscribeIfNecessary(d);
AutoDisposingMaybeObserverImpl.this.onError(e);
}
@Override public void onError(Throwable e) {
lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED);
AutoDisposingMaybeObserverImpl.this.onError(e);
}

@Override public void onComplete() {
callMainSubscribeIfNecessary(d);
// Noop - we're unbound now
}
}),
getClass())) {
if (AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass())) {
delegate.onSubscribe(this);
@Override public void onComplete() {
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
lifecycleDisposable.lazySet(AutoDisposableHelper.DISPOSED);
// Noop - we're unbound now
}
};
if (AutoDisposeEndConsumerHelper.setOnce(lifecycleDisposable, o, getClass())) {
delegate.onSubscribe(this);
lifecycle.subscribe(o);
AutoDisposeEndConsumerHelper.setOnce(mainDisposable, d, getClass());
}
}

Expand All @@ -70,46 +69,30 @@ final class AutoDisposingMaybeObserverImpl<T> implements AutoDisposingMaybeObser
}

@Override public void dispose() {
synchronized (this) {
AutoDisposableHelper.dispose(lifecycleDisposable);
AutoDisposableHelper.dispose(mainDisposable);
}
}

private void lazyDispose() {
synchronized (this) {
AutoDisposableHelper.dispose(lifecycleDisposable);
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
}
}

@SuppressWarnings("WeakerAccess") // Avoiding synthetic accessors
void callMainSubscribeIfNecessary(Disposable d) {
// If we've never actually called the downstream onSubscribe (i.e. requested immediately in
// onSubscribe and had a terminal event), we need to still send an empty disposable instance
// to abide by the Observer contract.
if (AutoDisposableHelper.setIfNotSet(mainDisposable, d)) {
delegate.onSubscribe(Disposables.disposed());
}
AutoDisposableHelper.dispose(lifecycleDisposable);
AutoDisposableHelper.dispose(mainDisposable);
}

@Override public void onSuccess(T value) {
if (!isDisposed()) {
lazyDispose();
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
AutoDisposableHelper.dispose(lifecycleDisposable);
delegate.onSuccess(value);
}
}

@Override public void onError(Throwable e) {
if (!isDisposed()) {
lazyDispose();
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
AutoDisposableHelper.dispose(lifecycleDisposable);
delegate.onError(e);
}
}

@Override public void onComplete() {
if (!isDisposed()) {
lazyDispose();
mainDisposable.lazySet(AutoDisposableHelper.DISPOSED);
AutoDisposableHelper.dispose(lifecycleDisposable);
delegate.onComplete();
}
}
Expand Down
Loading