Skip to content

Commit

Permalink
Merge pull request ReactiveX#572 from akarnokd/ObserveOn3
Browse files Browse the repository at this point in the history
ObserveOn fix for observing the same source on the same scheduler by two...
  • Loading branch information
benjchristensen committed Dec 8, 2013
2 parents 3eea8ad + dd5544b commit caf6a82
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 52 deletions.
97 changes: 45 additions & 52 deletions rxjava-core/src/main/java/rx/operators/OperationObserveOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import rx.concurrency.CurrentThreadScheduler;
import rx.concurrency.ImmediateScheduler;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SerialSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
Expand All @@ -46,10 +47,6 @@ public static <T> OnSubscribeFunc<T> observeOn(Observable<? extends T> source, S
private static class ObserveOn<T> implements OnSubscribeFunc<T> {
private final Observable<? extends T> source;
private final Scheduler scheduler;
private volatile Scheduler recursiveScheduler;

final ConcurrentLinkedQueue<Notification<? extends T>> queue = new ConcurrentLinkedQueue<Notification<? extends T>>();
final AtomicInteger counter = new AtomicInteger(0);

public ObserveOn(Observable<? extends T> source, Scheduler scheduler) {
this.source = source;
Expand All @@ -65,71 +62,67 @@ public Subscription onSubscribe(final Observer<? super T> observer) {
// do nothing if we request CurrentThreadScheduler so we don't invoke overhead
return source.subscribe(observer);
} else {
return observeOn(observer, scheduler);
return new Observation(observer).init();
}
}
/** Observe through individual queue per observer. */
private class Observation implements Action1<Notification<? extends T>> {
final Observer<? super T> observer;
final CompositeSubscription s;
final ConcurrentLinkedQueue<Notification<? extends T>> queue;
final AtomicInteger counter;
private volatile Scheduler recursiveScheduler;
public Observation(Observer<? super T> observer) {
this.observer = observer;
this.queue = new ConcurrentLinkedQueue<Notification<? extends T>>();
this.counter = new AtomicInteger(0);
this.s = new CompositeSubscription();
}
public Subscription init() {
s.add(source.materialize().subscribe(this));
return s;
}

public Subscription observeOn(final Observer<? super T> observer, final Scheduler scheduler) {
final CompositeSubscription s = new CompositeSubscription();

s.add(source.materialize().subscribe(new Action1<Notification<? extends T>>() {

@Override
public void call(Notification<? extends T> e) {
// this must happen before 'counter' is used to provide synchronization between threads
queue.offer(e);

// we now use counter to atomically determine if we need to start processing or not
// it will be 0 if it's the first notification or the scheduler has finished processing work
// and we need to start doing it again
if (counter.getAndIncrement() == 0) {
if (recursiveScheduler == null) {
s.add(scheduler.schedule(null, new Func2<Scheduler, T, Subscription>() {

@Override
public void call(Notification<? extends T> e) {
queue.offer(e);
if (counter.getAndIncrement() == 0) {
if (recursiveScheduler == null) {
s.add(scheduler.schedule(null, new Func2<Scheduler, T, Subscription>() {
@Override
public Subscription call(Scheduler innerScheduler, T state) {
// record innerScheduler so 'processQueue' can use it for all subsequent executions
recursiveScheduler = innerScheduler;

processQueue(s, observer);
processQueue();

return Subscriptions.empty();
}
}));
} else {
processQueue(s, observer);
}
} else {
processQueue();
}

}
}));

return s;
}

/**
* This uses 'recursiveScheduler' NOT 'scheduler' as it should reuse the same scheduler each time it processes.
* This means it must first get the recursiveScheduler when it first executes.
*/
private void processQueue(final CompositeSubscription s, final Observer<? super T> observer) {
}
void processQueue() {
s.add(recursiveScheduler.schedule(new Action1<Action0>() {
@Override
public void call(Action0 self) {
Notification<? extends T> not = queue.poll();
if (not != null) {
not.accept(observer);
}

s.add(recursiveScheduler.schedule(new Action1<Action0>() {
@Override
public void call(Action0 self) {
Notification<? extends T> not = queue.poll();
if (not != null) {
not.accept(observer);
}
// decrement count and if we still have work to do
// recursively schedule ourselves to process again
if (counter.decrementAndGet() > 0) {
self.call();
}

// decrement count and if we still have work to do
// recursively schedule ourselves to process again
if (counter.decrementAndGet() > 0) {
self.call();
}

}
}));
}));
}
}
}

}
}
74 changes: 74 additions & 0 deletions rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import rx.Observable;
import rx.Observer;
import rx.concurrency.Schedulers;
import rx.concurrency.TestScheduler;
import rx.util.functions.Action1;

public class OperationObserveOnTest {
Expand Down Expand Up @@ -132,4 +133,77 @@ public void call(String t1) {

inOrder.verify(observer, times(1)).onCompleted();
}
@Test
public void observeOnTheSameSchedulerTwice() {
TestScheduler scheduler = new TestScheduler();

Observable<Integer> o = Observable.from(1, 2, 3);
Observable<Integer> o2 = o.observeOn(scheduler);

@SuppressWarnings("unchecked")
Observer<Object> observer1 = mock(Observer.class);
@SuppressWarnings("unchecked")
Observer<Object> observer2 = mock(Observer.class);

InOrder inOrder1 = inOrder(observer1);
InOrder inOrder2 = inOrder(observer2);

o2.subscribe(observer1);
o2.subscribe(observer2);

scheduler.advanceTimeBy(1, TimeUnit.SECONDS);

inOrder1.verify(observer1, times(1)).onNext(1);
inOrder1.verify(observer1, times(1)).onNext(2);
inOrder1.verify(observer1, times(1)).onNext(3);
inOrder1.verify(observer1, times(1)).onCompleted();
verify(observer1, never()).onError(any(Throwable.class));
inOrder1.verifyNoMoreInteractions();

inOrder2.verify(observer2, times(1)).onNext(1);
inOrder2.verify(observer2, times(1)).onNext(2);
inOrder2.verify(observer2, times(1)).onNext(3);
inOrder2.verify(observer2, times(1)).onCompleted();
verify(observer2, never()).onError(any(Throwable.class));
inOrder2.verifyNoMoreInteractions();

}
@Test
public void observeSameOnMultipleSchedulers() {
TestScheduler scheduler1 = new TestScheduler();
TestScheduler scheduler2 = new TestScheduler();

Observable<Integer> o = Observable.from(1, 2, 3);
Observable<Integer> o1 = o.observeOn(scheduler1);
Observable<Integer> o2 = o.observeOn(scheduler2);

@SuppressWarnings("unchecked")
Observer<Object> observer1 = mock(Observer.class);
@SuppressWarnings("unchecked")
Observer<Object> observer2 = mock(Observer.class);

InOrder inOrder1 = inOrder(observer1);
InOrder inOrder2 = inOrder(observer2);

o1.subscribe(observer1);
o2.subscribe(observer2);

scheduler1.advanceTimeBy(1, TimeUnit.SECONDS);
scheduler2.advanceTimeBy(1, TimeUnit.SECONDS);

inOrder1.verify(observer1, times(1)).onNext(1);
inOrder1.verify(observer1, times(1)).onNext(2);
inOrder1.verify(observer1, times(1)).onNext(3);
inOrder1.verify(observer1, times(1)).onCompleted();
verify(observer1, never()).onError(any(Throwable.class));
inOrder1.verifyNoMoreInteractions();

inOrder2.verify(observer2, times(1)).onNext(1);
inOrder2.verify(observer2, times(1)).onNext(2);
inOrder2.verify(observer2, times(1)).onNext(3);
inOrder2.verify(observer2, times(1)).onCompleted();
verify(observer2, never()).onError(any(Throwable.class));
inOrder2.verifyNoMoreInteractions();

}
}

0 comments on commit caf6a82

Please sign in to comment.