Skip to content

Commit

Permalink
observeOn: allow configurable buffer size
Browse files Browse the repository at this point in the history
The observeOn operator is backed by a small queue of 128 slots that may
overflow quickly on slow producers.  This could only be avoided by
adding a backpressure operator before the observeOn (not only
inconvenient, but also taking a perf. hit as it forces hops between two
queues).

This patch allows modifying the default queue size on the observeOn
operator.

Fixes: ReactiveX#3751
Signed-off-by: Galo Navarro <[email protected]>
  • Loading branch information
srvaroa authored and sebaslogen committed Mar 28, 2016
1 parent 97331fb commit d38a3c7
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 12 deletions.
74 changes: 68 additions & 6 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6291,7 +6291,8 @@ public final Observable<T> mergeWith(Observable<? extends T> t1) {

/**
* Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler},
* asynchronously with a bounded buffer.
* asynchronously with a bounded buffer of {@link RxRingBuffer.SIZE} slots.
*
* <p>Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly
* asynchronous. If strict event ordering is required, consider using the {@link #observeOn(Scheduler, boolean)} overload.
* <p>
Expand All @@ -6308,13 +6309,41 @@ public final Observable<T> mergeWith(Observable<? extends T> t1) {
* @see <a href="http://reactivex.io/documentation/operators/observeon.html">ReactiveX operators documentation: ObserveOn</a>
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #subscribeOn
* @see #observeOn(Scheduler, int)
* @see #observeOn(Scheduler, boolean)
* @see #observeOn(Scheduler, boolean, int)
*/
public final Observable<T> observeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, false));
return observeOn(scheduler, RxRingBuffer.SIZE);
}

/**
* Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler},
* asynchronously with a bounded buffer of configurable size other than the {@link RxRingBuffer.SIZE}
* default.
*
* <p>Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly
* asynchronous. If strict event ordering is required, consider using the {@link #observeOn(Scheduler, boolean)} overload.
* <p>
* <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>you specify which {@link Scheduler} this operator will use</dd>
* </dl>
*
* @param scheduler the {@link Scheduler} to notify {@link Observer}s on
* @param bufferSize the size of the buffer.
* @return the source Observable modified so that its {@link Observer}s are notified on the specified
* {@link Scheduler}
* @see <a href="http://reactivex.io/documentation/operators/observeon.html">ReactiveX operators documentation: ObserveOn</a>
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #subscribeOn
* @see #observeOn(Scheduler)
* @see #observeOn(Scheduler, boolean)
* @see #observeOn(Scheduler, boolean, int)
*/
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
return observeOn(scheduler, false, bufferSize);
}

/**
Expand All @@ -6339,12 +6368,45 @@ public final Observable<T> observeOn(Scheduler scheduler) {
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #subscribeOn
* @see #observeOn(Scheduler)
* @see #observeOn(Scheduler, int)
* @see #observeOn(Scheduler, boolean, int)
*/
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
return observeOn(scheduler, delayError, RxRingBuffer.SIZE);
}

/**
* Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler},
* asynchronously with a bounded buffer of configurable size other than the {@link RxRingBuffer.SIZE}
* default, and optionally delays onError notifications.
* <p>
* <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>you specify which {@link Scheduler} this operator will use</dd>
* </dl>
*
* @param scheduler
* the {@link Scheduler} to notify {@link Observer}s on
* @param delayError
* indicates if the onError notification may not cut ahead of onNext notification on the other side of the
* scheduling boundary. If true a sequence ending in onError will be replayed in the same order as was received
* from upstream
* @param bufferSize the size of the buffer.
* @return the source Observable modified so that its {@link Observer}s are notified on the specified
* {@link Scheduler}
* @see <a href="http://reactivex.io/documentation/operators/observeon.html">ReactiveX operators documentation: ObserveOn</a>
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #subscribeOn
* @see #observeOn(Scheduler)
* @see #observeOn(Scheduler, int)
* @see #observeOn(Scheduler, boolean)
*/
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError));
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

/**
Expand Down
25 changes: 19 additions & 6 deletions src/main/java/rx/internal/operators/OperatorObserveOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,25 @@ public final class OperatorObserveOn<T> implements Operator<T, T> {

private final Scheduler scheduler;
private final boolean delayError;
private final int bufferSize;

/**
* @param scheduler the scheduler to use
* @param delayError delay errors until all normal events are emitted in the other thread?
*/
public OperatorObserveOn(Scheduler scheduler, boolean delayError) {
this(scheduler, delayError, RxRingBuffer.SIZE);
}

/**
* @param scheduler the scheduler to use
* @param delayError delay errors until all normal events are emitted in the other thread?
* @param bufferSize for the buffer feeding the Scheduler workers, defaults to {@code RxRingBuffer.MAX} if <= 0
*/
public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
}

@Override
Expand All @@ -59,7 +70,7 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError);
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
Expand All @@ -72,6 +83,7 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> implemen
final NotificationLite<T> on;
final boolean delayError;
final Queue<Object> queue;
final int bufferSize;

// the status of the current stream
volatile boolean finished;
Expand All @@ -88,15 +100,16 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> implemen

// do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
// not prevent anything downstream from consuming, which will happen if the Subscription is chained
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError) {
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
this.delayError = delayError;
this.on = NotificationLite.instance();
this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
if (UnsafeAccess.isUnsafeAvailable()) {
queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);
queue = new SpscArrayQueue<Object>(this.bufferSize);
} else {
queue = new SpscAtomicArrayQueue<Object>(RxRingBuffer.SIZE);
queue = new SpscAtomicArrayQueue<Object>(this.bufferSize);
}
}

Expand All @@ -123,7 +136,7 @@ public void request(long n) {
@Override
public void onStart() {
// signal that this is an async operator capable of receiving this many
request(RxRingBuffer.SIZE);
request(this.bufferSize);
}

@Override
Expand Down Expand Up @@ -180,7 +193,7 @@ public void call() {

// requested and counter are not included to avoid JIT issues with register spilling
// and their access is is amortized because they are part of the outer loop which runs
// less frequently (usually after each RxRingBuffer.SIZE elements)
// less frequently (usually after each bufferSize elements)

for (;;) {
long requestAmount = requested.get();
Expand Down
63 changes: 63 additions & 0 deletions src/test/java/rx/internal/operators/OperatorObserveOnTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,69 @@ public void onNext(Integer t) {
}
}

@Test
public void testQueueFullEmitsErrorWithVaryingBufferSize() {
final CountDownLatch latch = new CountDownLatch(1);
// randomize buffer size, note that underlying implementations may be tuning the real size to a power of 2
// which can lead to unexpected results when adding excess capacity (e.g.: see ConcurrentCircularArrayQueue)
for (int i = 1; i <= 1024; i = i * 2) {
final int capacity = i;
Observable<Integer> observable = Observable.create(new OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> o) {
for (int i = 0; i < capacity + 10; i++) {
o.onNext(i);
}
latch.countDown();
o.onCompleted();
}

});

TestSubscriber<Integer> testSubscriber = new TestSubscriber<Integer>(new Observer<Integer>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Integer t) {
try {
// force it to be slow wait until we have queued everything
latch.await(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

});
System.out.println("Using capacity " + capacity); // for post-failure debugging
observable.observeOn(Schedulers.newThread(), capacity).subscribe(testSubscriber);

testSubscriber.awaitTerminalEvent();
List<Throwable> errors = testSubscriber.getOnErrorEvents();
assertEquals(1, errors.size());
System.out.println("Errors: " + errors);
Throwable t = errors.get(0);
if (t instanceof MissingBackpressureException) {
// success, we expect this
} else {
if (t.getCause() instanceof MissingBackpressureException) {
// this is also okay
} else {
fail("Expecting MissingBackpressureException");
}
}
}
}

@Test
public void testAsyncChild() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Expand Down

0 comments on commit d38a3c7

Please sign in to comment.