Skip to content

Commit

Permalink
Merge pull request ReactiveX#602 from benjchristensen/observeOn
Browse files Browse the repository at this point in the history
ObserveOn Fixes
  • Loading branch information
benjchristensen committed Dec 13, 2013
2 parents b4b0621 + 7054bc8 commit 5412e32
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 66 deletions.
78 changes: 41 additions & 37 deletions rxjava-core/src/main/java/rx/operators/OperationObserveOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import rx.schedulers.CurrentThreadScheduler;
import rx.schedulers.ImmediateScheduler;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import rx.subscriptions.MultipleAssignmentSubscription;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func2;
Expand Down Expand Up @@ -64,62 +64,66 @@ public Subscription onSubscribe(final Observer<? super T> observer) {
return new Observation(observer).init();
}
}

/** Observe through individual queue per observer. */
private class Observation implements Action1<Notification<? extends T>> {
private class Observation {
final Observer<? super T> observer;
final CompositeSubscription s;
final ConcurrentLinkedQueue<Notification<? extends T>> queue;
final AtomicInteger counter;
final CompositeSubscription compositeSubscription = new CompositeSubscription();
final MultipleAssignmentSubscription recursiveSubscription = new MultipleAssignmentSubscription();
final ConcurrentLinkedQueue<Notification<? extends T>> queue = new ConcurrentLinkedQueue<Notification<? extends T>>();
final AtomicInteger counter = new AtomicInteger(0);
private volatile Scheduler recursiveScheduler;

public Observation(Observer<? super T> observer) {
this.observer = observer;
this.queue = new ConcurrentLinkedQueue<Notification<? extends T>>();
this.counter = new AtomicInteger(0);
this.s = new CompositeSubscription();
}

public Subscription init() {
s.add(source.materialize().subscribe(this));
return s;
compositeSubscription.add(source.materialize().subscribe(new SourceObserver()));
return compositeSubscription;
}

@Override
public void call(Notification<? extends T> e) {
queue.offer(e);
if (counter.getAndIncrement() == 0) {
if (recursiveScheduler == null) {
s.add(scheduler.schedule(null, new Func2<Scheduler, T, Subscription>() {
private class SourceObserver implements Action1<Notification<? extends T>> {

@Override
public void call(Notification<? extends T> e) {
queue.offer(e);
if (counter.getAndIncrement() == 0) {
if (recursiveScheduler == null) {
// compositeSubscription for the outer scheduler, recursive for inner
compositeSubscription.add(scheduler.schedule(null, new Func2<Scheduler, T, Subscription>() {
@Override
public Subscription call(Scheduler innerScheduler, T state) {
// record innerScheduler so 'processQueue' can use it for all subsequent executions
recursiveScheduler = innerScheduler;

// once we have the innerScheduler we can start doing real work
processQueue();

return Subscriptions.empty();
return recursiveSubscription;
}
}));
} else {
processQueue();
} else {
processQueue();
}
}
}
}
void processQueue() {
s.add(recursiveScheduler.schedule(new Action1<Action0>() {
@Override
public void call(Action0 self) {
Notification<? extends T> not = queue.poll();
if (not != null) {
not.accept(observer);
}

// decrement count and if we still have work to do
// recursively schedule ourselves to process again
if (counter.decrementAndGet() > 0) {
self.call();
}
void processQueue() {
recursiveSubscription.setSubscription(recursiveScheduler.schedule(new Action1<Action0>() {
@Override
public void call(Action0 self) {
Notification<? extends T> not = queue.poll();
if (not != null) {
not.accept(observer);
}

}
}));
// decrement count and if we still have work to do
// recursively schedule ourselves to process again
if (counter.decrementAndGet() > 0) {
self.call();
}
}
}));
}
}
}
}
Expand Down
98 changes: 98 additions & 0 deletions rxjava-core/src/test/java/rx/ErrorHandlingTests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx;

import static org.junit.Assert.*;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Test;

import rx.schedulers.Schedulers;

public class ErrorHandlingTests {

/**
* Test that an error from a user provided Observer.onNext is handled and emitted to the onError
*/
@Test
public void testOnNextError() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> caughtError = new AtomicReference<Throwable>();
Observable<Long> o = Observable.interval(50, TimeUnit.MILLISECONDS);
Observer<Long> observer = new Observer<Long>() {

@Override
public void onCompleted() {
System.out.println("completed");
latch.countDown();
}

@Override
public void onError(Throwable e) {
System.out.println("error: " + e);
caughtError.set(e);
latch.countDown();
}

@Override
public void onNext(Long args) {
throw new RuntimeException("forced failure");
}
};
o.subscribe(observer);

latch.await(2000, TimeUnit.MILLISECONDS);
assertNotNull(caughtError.get());
}

/**
* Test that an error from a user provided Observer.onNext is handled and emitted to the onError
* even when done across thread boundaries with observeOn
*/
@Test
public void testOnNextErrorAcrossThread() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> caughtError = new AtomicReference<Throwable>();
Observable<Long> o = Observable.interval(50, TimeUnit.MILLISECONDS);
Observer<Long> observer = new Observer<Long>() {

@Override
public void onCompleted() {
System.out.println("completed");
latch.countDown();
}

@Override
public void onError(Throwable e) {
System.out.println("error: " + e);
caughtError.set(e);
latch.countDown();
}

@Override
public void onNext(Long args) {
throw new RuntimeException("forced failure");
}
};
o.observeOn(Schedulers.newThread()).subscribe(observer);

latch.await(2000, TimeUnit.MILLISECONDS);
assertNotNull(caughtError.get());
}
}
64 changes: 35 additions & 29 deletions rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import rx.Observer;
import rx.schedulers.Schedulers;
import rx.schedulers.TestScheduler;
import rx.util.functions.Action0;
import rx.util.functions.Action1;

public class OperationObserveOnTest {
Expand Down Expand Up @@ -88,71 +89,75 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
@Test
@SuppressWarnings("unchecked")
public void testThreadName() throws InterruptedException {
System.out.println("Main Thread: " + Thread.currentThread().getName());
Observable<String> obs = Observable.from("one", null, "two", "three", "four");

Observer<String> observer = mock(Observer.class);

InOrder inOrder = inOrder(observer);
final String parentThreadName = Thread.currentThread().getName();

final CountDownLatch completedLatch = new CountDownLatch(1);
doAnswer(new Answer<Void>() {

@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
completedLatch.countDown();

return null;
}
}).when(observer).onCompleted();

doAnswer(new Answer<Void>() {
// assert subscribe is on main thread
obs = obs.doOnEach(new Action1<String>() {

@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
completedLatch.countDown();

return null;
public void call(String s) {
String threadName = Thread.currentThread().getName();
System.out.println("Source ThreadName: " + threadName + " Expected => " + parentThreadName);
assertEquals(parentThreadName, threadName);
}
}).when(observer).onError(any(Exception.class));

});

// assert observe is on new thread
obs.observeOn(Schedulers.newThread()).doOnEach(new Action1<String>() {

@Override
public void call(String t1) {
String threadName = Thread.currentThread().getName();
boolean correctThreadName = threadName.startsWith("RxNewThreadScheduler");
System.out.println("ThreadName: " + threadName + " Correct => " + correctThreadName);
System.out.println("ObserveOn ThreadName: " + threadName + " Correct => " + correctThreadName);
assertTrue(correctThreadName);
}

}).finallyDo(new Action0() {

@Override
public void call() {
completedLatch.countDown();

}
}).subscribe(observer);

if (!completedLatch.await(1000, TimeUnit.MILLISECONDS)) {
fail("timed out waiting");
}

inOrder.verify(observer, times(1)).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(5)).onNext(any(String.class));
verify(observer, times(1)).onCompleted();
}

@Test
public void observeOnTheSameSchedulerTwice() {
TestScheduler scheduler = new TestScheduler();

Observable<Integer> o = Observable.from(1, 2, 3);
Observable<Integer> o2 = o.observeOn(scheduler);

@SuppressWarnings("unchecked")
Observer<Object> observer1 = mock(Observer.class);
@SuppressWarnings("unchecked")
Observer<Object> observer2 = mock(Observer.class);

InOrder inOrder1 = inOrder(observer1);
InOrder inOrder2 = inOrder(observer2);

o2.subscribe(observer1);
o2.subscribe(observer2);

scheduler.advanceTimeBy(1, TimeUnit.SECONDS);

inOrder1.verify(observer1, times(1)).onNext(1);
inOrder1.verify(observer1, times(1)).onNext(2);
inOrder1.verify(observer1, times(1)).onNext(3);
Expand All @@ -168,11 +173,12 @@ public void observeOnTheSameSchedulerTwice() {
inOrder2.verifyNoMoreInteractions();

}

@Test
public void observeSameOnMultipleSchedulers() {
TestScheduler scheduler1 = new TestScheduler();
TestScheduler scheduler2 = new TestScheduler();

Observable<Integer> o = Observable.from(1, 2, 3);
Observable<Integer> o1 = o.observeOn(scheduler1);
Observable<Integer> o2 = o.observeOn(scheduler2);
Expand All @@ -181,16 +187,16 @@ public void observeSameOnMultipleSchedulers() {
Observer<Object> observer1 = mock(Observer.class);
@SuppressWarnings("unchecked")
Observer<Object> observer2 = mock(Observer.class);

InOrder inOrder1 = inOrder(observer1);
InOrder inOrder2 = inOrder(observer2);

o1.subscribe(observer1);
o2.subscribe(observer2);

scheduler1.advanceTimeBy(1, TimeUnit.SECONDS);
scheduler2.advanceTimeBy(1, TimeUnit.SECONDS);

inOrder1.verify(observer1, times(1)).onNext(1);
inOrder1.verify(observer1, times(1)).onNext(2);
inOrder1.verify(observer1, times(1)).onNext(3);
Expand Down

0 comments on commit 5412e32

Please sign in to comment.