Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert Bounded ObserveOn #888

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 2 additions & 20 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import rx.operators.OperationReplay;
import rx.operators.OperationRetry;
import rx.operators.OperationSample;
import rx.operators.OperatorObserveOnBounded;
import rx.operators.OperatorScan;
import rx.operators.OperationSequenceEqual;
import rx.operators.OperationSingle;
Expand Down Expand Up @@ -5148,7 +5149,7 @@ public final <R> ConnectableObservable<R> multicast(Subject<? super T, ? extends
}

/**
* Move notifications to the specified {@link Scheduler} one {@code onNext} at a time.
* Move notifications to the specified {@link Scheduler} asynchronously with an unbounded buffer.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/observeOn.png">
*
Expand All @@ -5162,25 +5163,6 @@ public final Observable<T> observeOn(Scheduler scheduler) {
return lift(new OperatorObserveOn<T>(scheduler));
}

/**
* Move notifications to the specified {@link Scheduler} asynchronously with a buffer of a specified size.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/observeOn.png">
* <p>
* If the buffer fills to its maximum size...
*
* @param scheduler
* the {@link Scheduler} to notify {@link Observer}s on
* @param bufferSize
* that will be rounded up to the next power of 2
* @return the source Observable modified so that its {@link Observer}s are notified on the specified
* {@link Scheduler}
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-observeon">RxJava Wiki: observeOn()</a>
*/
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
return lift(new OperatorObserveOn<T>(scheduler, bufferSize));
}

/**
* Filters the items emitted by an Observable, only emitting those of the specified type.
* <p>
Expand Down
231 changes: 28 additions & 203 deletions rxjava-core/src/main/java/rx/operators/OperatorObserveOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,69 +15,31 @@
*/
package rx.operators;

import java.util.concurrent.Semaphore;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;

import rx.Observable.Operator;
import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.Subscriber;
import rx.schedulers.ImmediateScheduler;
import rx.schedulers.TestScheduler;
import rx.schedulers.TrampolineScheduler;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;

/**
* Delivers events on the specified Scheduler.
* <p>
* This provides backpressure by blocking the incoming onNext when there is already one in the queue.
* <p>
* This means that at any given time the max number of "onNext" in flight is 3:
* -> 1 being delivered on the Scheduler
* -> 1 in the queue waiting for the Scheduler
* -> 1 blocking on the queue waiting to deliver it
*
* I have chosen to allow 1 in the queue rather than using an Exchanger style process so that the Scheduler
* can loop and have something to do each time around to optimize for avoiding rescheduling when it
* can instead just loop. I'm avoiding having the Scheduler thread ever block as it could be an event-loop
* thus if the queue is empty it exits and next time something is added it will reschedule.
* Delivers events on the specified Scheduler asynchronously via an unbounded buffer.
*
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/observeOn.png">
*/
public class OperatorObserveOn<T> implements Operator<T, T> {

private final Scheduler scheduler;
private final int bufferSize;

/**
*
* @param scheduler
* @param bufferSize
* that will be rounded up to the next power of 2
*/
public OperatorObserveOn(Scheduler scheduler, int bufferSize) {
this.scheduler = scheduler;
this.bufferSize = roundToNextPowerOfTwoIfNecessary(bufferSize);
}

public OperatorObserveOn(Scheduler scheduler) {
this(scheduler, 1);
}

private static int roundToNextPowerOfTwoIfNecessary(int num) {
if ((num & -num) == num) {
return num;
} else {
int result = 1;
while (num != 0)
{
num >>= 1;
result <<= 1;
}
return result;
}
this.scheduler = scheduler;
}

@Override
Expand All @@ -88,19 +50,19 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TestScheduler) {
// this one will deadlock as it is single-threaded and won't run the scheduled
// work until it manually advances, which it won't be able to do as it will block
return child;
} else {
return new ObserveOnSubscriber(child);
}
}

private static Object NULL_SENTINEL = new Object();
private static Object COMPLETE_SENTINEL = new Object();
private static class Sentinel {

private static class ErrorSentinel {
}

private static Sentinel NULL_SENTINEL = new Sentinel();
private static Sentinel COMPLETE_SENTINEL = new Sentinel();

private static class ErrorSentinel extends Sentinel {
final Throwable e;

ErrorSentinel(Throwable e) {
Expand All @@ -113,7 +75,7 @@ private class ObserveOnSubscriber extends Subscriber<T> {
final Subscriber<? super T> observer;
private volatile Scheduler.Inner recursiveScheduler;

private final InterruptibleBlockingQueue<Object> queue = new InterruptibleBlockingQueue<Object>(bufferSize);
private final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>();
final AtomicLong counter = new AtomicLong(0);

public ObserveOnSubscriber(Subscriber<? super T> observer) {
Expand All @@ -123,62 +85,29 @@ public ObserveOnSubscriber(Subscriber<? super T> observer) {

@Override
public void onNext(final T t) {
try {
// we want to block for natural back-pressure
// so that the producer waits for each value to be consumed
if (t == null) {
queue.addBlocking(NULL_SENTINEL);
} else {
queue.addBlocking(t);
}
schedule();
} catch (InterruptedException e) {
if (!isUnsubscribed()) {
onError(e);
}
if (t == null) {
queue.offer(NULL_SENTINEL);
} else {
queue.offer(t);
}
schedule();
}

@Override
public void onCompleted() {
try {
// we want to block for natural back-pressure
// so that the producer waits for each value to be consumed
queue.addBlocking(COMPLETE_SENTINEL);
schedule();
} catch (InterruptedException e) {
onError(e);
}
queue.offer(COMPLETE_SENTINEL);
schedule();
}

@Override
public void onError(final Throwable e) {
try {
// we want to block for natural back-pressure
// so that the producer waits for each value to be consumed
queue.addBlocking(new ErrorSentinel(e));
schedule();
} catch (InterruptedException e2) {
// call directly if we can't schedule
observer.onError(e2);
}
queue.offer(new ErrorSentinel(e));
schedule();
}

protected void schedule() {
if (counter.getAndIncrement() == 0) {
if (recursiveScheduler == null) {
// first time through, register a Subscription
// that can interrupt this thread
add(Subscriptions.create(new Action0() {

@Override
public void call() {
// we have to interrupt the parent thread because
// it can be blocked on queue.put
queue.interrupt();
}

}));
add(scheduler.schedule(new Action1<Inner>() {

@Override
Expand Down Expand Up @@ -206,12 +135,14 @@ private void pollQueue() {
do {
Object v = queue.poll();
if (v != null) {
if (v == NULL_SENTINEL) {
observer.onNext(null);
} else if (v == COMPLETE_SENTINEL) {
observer.onCompleted();
} else if (v instanceof ErrorSentinel) {
observer.onError(((ErrorSentinel) v).e);
if (v instanceof Sentinel) {
if (v == NULL_SENTINEL) {
observer.onNext(null);
} else if (v == COMPLETE_SENTINEL) {
observer.onCompleted();
} else if (v instanceof ErrorSentinel) {
observer.onError(((ErrorSentinel) v).e);
}
} else {
observer.onNext((T) v);
}
Expand All @@ -221,110 +152,4 @@ private void pollQueue() {

}

/**
* Single-producer-single-consumer queue (only thread-safe for 1 producer thread with 1 consumer thread).
*
* This supports an interrupt() being called externally rather than needing to interrupt the thread. This allows
* unsubscribe behavior when this queue is being used.
*
* @param <E>
*/
private static class InterruptibleBlockingQueue<E> {

private final Semaphore semaphore;
private volatile boolean interrupted = false;

private final E[] buffer;

private AtomicLong tail = new AtomicLong();
private AtomicLong head = new AtomicLong();
private final int capacity;
private final int mask;

@SuppressWarnings("unchecked")
public InterruptibleBlockingQueue(final int size) {
this.semaphore = new Semaphore(size);
this.capacity = size;
this.mask = size - 1;
buffer = (E[]) new Object[size];
}

/**
* Used to unsubscribe and interrupt the producer if blocked in put()
*/
public void interrupt() {
interrupted = true;
semaphore.release();
}

public void addBlocking(final E e) throws InterruptedException {
if (interrupted) {
throw new InterruptedException("Interrupted by Unsubscribe");
}
semaphore.acquire();
if (interrupted) {
throw new InterruptedException("Interrupted by Unsubscribe");
}
if (e == null) {
throw new IllegalArgumentException("Can not put null");
}

if (offer(e)) {
return;
} else {
throw new IllegalStateException("Queue is full");
}
}

private boolean offer(final E e) {
final long _t = tail.get();
if (_t - head.get() == capacity) {
// queue is full
return false;
}
int index = (int) (_t & mask);
buffer[index] = e;
// move the tail forward
tail.lazySet(_t + 1);

return true;
}

public E poll() {
if (interrupted) {
return null;
}
final long _h = head.get();
if (tail.get() == _h) {
// nothing available
return null;
}
int index = (int) (_h & mask);

// fetch the item
E v = buffer[index];
// allow GC to happen
buffer[index] = null;
// increment and signal we're done
head.lazySet(_h + 1);
if (v != null) {
semaphore.release();
}
return v;
}

public int size()
{
int size;
do
{
final long currentHead = head.get();
final long currentTail = tail.get();
size = (int) (currentTail - currentHead);
} while (size > buffer.length);

return size;
}

}
}
Loading