Skip to content

Commit

Permalink
Drop volatile in favor of failing fast if not subscribed from UI thread
Browse files Browse the repository at this point in the history
  • Loading branch information
mttkay committed Oct 14, 2013
1 parent 715dcec commit 2998ac3
Showing 1 changed file with 29 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@

import android.app.Activity;
import android.app.Fragment;
import android.os.Looper;
import android.util.Log;

import java.lang.reflect.Field;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class OperationObserveFromAndroidComponent {

Expand All @@ -47,8 +52,8 @@ private static abstract class OnSubscribeBase<T, AndroidComponent> implements Ob
private static final String LOG_TAG = OperationObserveFromAndroidComponent.class.getSimpleName();

private final Observable<T> source;
private volatile AndroidComponent componentRef;
private volatile Observer<? super T> observerRef;
private AndroidComponent componentRef;
private Observer<? super T> observerRef;

private OnSubscribeBase(Observable<T> source, AndroidComponent component) {
this.source = source;
Expand All @@ -65,6 +70,7 @@ private void log(String message) {

@Override
public Subscription onSubscribe(Observer<? super T> observer) {
assertUiThread();
observerRef = observer;
final Subscription sourceSub = source.observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<T>() {
@Override
Expand Down Expand Up @@ -111,6 +117,12 @@ private void releaseReferences() {
observerRef = null;
componentRef = null;
}

private void assertUiThread() {
if (Looper.getMainLooper() != Looper.myLooper()) {
throw new IllegalStateException("Observers must subscribe from the main UI thread, but was " + Thread.currentThread());
}
}
}

private static final class OnSubscribeFragment<T> extends OnSubscribeBase<T, android.app.Fragment> {
Expand Down Expand Up @@ -171,6 +183,21 @@ public void setupMocks() {
when(mockFragment.isAdded()).thenReturn(true);
}

@Test
public void itThrowsIfObserverSubscribesFromBackgroundThread() throws Exception {
final Future<Object> future = Executors.newSingleThreadExecutor().submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
OperationObserveFromAndroidComponent.observeFromAndroidComponent(
mockObservable, mockFragment).subscribe(mockObserver);
return null;
}
});
future.get(1, TimeUnit.SECONDS);
verify(mockObserver).onError(any(IllegalStateException.class));
verifyNoMoreInteractions(mockObserver);
}

@Test
public void itObservesTheSourceSequenceOnTheMainUIThread() {
OperationObserveFromAndroidComponent.observeFromAndroidComponent(mockObservable, mockFragment).subscribe(mockObserver);
Expand Down

0 comments on commit 2998ac3

Please sign in to comment.