Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

observeOn: allow configurable buffer size #3777

Merged
merged 1 commit into from
Mar 23, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 68 additions & 6 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6291,7 +6291,8 @@ public final Observable<T> mergeWith(Observable<? extends T> t1) {

/**
* Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler},
* asynchronously with a bounded buffer.
* asynchronously with a bounded buffer of {@link RxRingBuffer.SIZE} slots.
*
* <p>Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly
* asynchronous. If strict event ordering is required, consider using the {@link #observeOn(Scheduler, boolean)} overload.
* <p>
Expand All @@ -6308,13 +6309,41 @@ public final Observable<T> mergeWith(Observable<? extends T> t1) {
* @see <a href="http://reactivex.io/documentation/operators/observeon.html">ReactiveX operators documentation: ObserveOn</a>
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #subscribeOn
* @see #observeOn(Scheduler, int)
* @see #observeOn(Scheduler, boolean)
* @see #observeOn(Scheduler, boolean, int)
*/
public final Observable<T> observeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, false));
return observeOn(scheduler, RxRingBuffer.SIZE);
}

/**
* Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler},
* asynchronously with a bounded buffer of configurable size other than the {@link RxRingBuffer.SIZE}
* default.
*
* <p>Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly
* asynchronous. If strict event ordering is required, consider using the {@link #observeOn(Scheduler, boolean)} overload.
* <p>
* <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>you specify which {@link Scheduler} this operator will use</dd>
* </dl>
*
* @param scheduler the {@link Scheduler} to notify {@link Observer}s on
* @param bufferSize the size of the buffer.
* @return the source Observable modified so that its {@link Observer}s are notified on the specified
* {@link Scheduler}
* @see <a href="http://reactivex.io/documentation/operators/observeon.html">ReactiveX operators documentation: ObserveOn</a>
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #subscribeOn
* @see #observeOn(Scheduler)
* @see #observeOn(Scheduler, boolean)
* @see #observeOn(Scheduler, boolean, int)
*/
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add observeOn(Scheduler, boolean delayError, int bufferSize) overload as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still miss the overload observeOn(Scheduler, boolean delayError, int bufferSize)

return observeOn(scheduler, false, bufferSize);
}

/**
Expand All @@ -6339,12 +6368,45 @@ public final Observable<T> observeOn(Scheduler scheduler) {
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #subscribeOn
* @see #observeOn(Scheduler)
* @see #observeOn(Scheduler, int)
* @see #observeOn(Scheduler, boolean, int)
*/
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
return observeOn(scheduler, delayError, RxRingBuffer.SIZE);
}

/**
* Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler},
* asynchronously with a bounded buffer of configurable size other than the {@link RxRingBuffer.SIZE}
* default, and optionally delays onError notifications.
* <p>
* <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>you specify which {@link Scheduler} this operator will use</dd>
* </dl>
*
* @param scheduler
* the {@link Scheduler} to notify {@link Observer}s on
* @param delayError
* indicates if the onError notification may not cut ahead of onNext notification on the other side of the
* scheduling boundary. If true a sequence ending in onError will be replayed in the same order as was received
* from upstream
* @param bufferSize the size of the buffer.
* @return the source Observable modified so that its {@link Observer}s are notified on the specified
* {@link Scheduler}
* @see <a href="http://reactivex.io/documentation/operators/observeon.html">ReactiveX operators documentation: ObserveOn</a>
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #subscribeOn
* @see #observeOn(Scheduler)
* @see #observeOn(Scheduler, int)
* @see #observeOn(Scheduler, boolean)
*/
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError));
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

/**
Expand Down
25 changes: 19 additions & 6 deletions src/main/java/rx/internal/operators/OperatorObserveOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,25 @@ public final class OperatorObserveOn<T> implements Operator<T, T> {

private final Scheduler scheduler;
private final boolean delayError;
private final int bufferSize;

/**
* @param scheduler the scheduler to use
* @param delayError delay errors until all normal events are emitted in the other thread?
*/
public OperatorObserveOn(Scheduler scheduler, boolean delayError) {
this(scheduler, delayError, RxRingBuffer.SIZE);
}

/**
* @param scheduler the scheduler to use
* @param delayError delay errors until all normal events are emitted in the other thread?
* @param bufferSize for the buffer feeding the Scheduler workers, defaults to {@code RxRingBuffer.MAX} if <= 0
*/
public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
}

@Override
Expand All @@ -59,7 +70,7 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError);
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
Expand All @@ -72,6 +83,7 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> implemen
final NotificationLite<T> on;
final boolean delayError;
final Queue<Object> queue;
final int bufferSize;

// the status of the current stream
volatile boolean finished;
Expand All @@ -88,15 +100,16 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> implemen

// do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
// not prevent anything downstream from consuming, which will happen if the Subscription is chained
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError) {
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
this.delayError = delayError;
this.on = NotificationLite.instance();
this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
if (UnsafeAccess.isUnsafeAvailable()) {
queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);
queue = new SpscArrayQueue<Object>(this.bufferSize);
} else {
queue = new SpscAtomicArrayQueue<Object>(RxRingBuffer.SIZE);
queue = new SpscAtomicArrayQueue<Object>(this.bufferSize);
}
}

Expand All @@ -123,7 +136,7 @@ public void request(long n) {
@Override
public void onStart() {
// signal that this is an async operator capable of receiving this many
request(RxRingBuffer.SIZE);
request(this.bufferSize);
}

@Override
Expand Down Expand Up @@ -180,7 +193,7 @@ public void call() {

// requested and counter are not included to avoid JIT issues with register spilling
// and their access is is amortized because they are part of the outer loop which runs
// less frequently (usually after each RxRingBuffer.SIZE elements)
// less frequently (usually after each bufferSize elements)

for (;;) {
long requestAmount = requested.get();
Expand Down
63 changes: 63 additions & 0 deletions src/test/java/rx/internal/operators/OperatorObserveOnTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,69 @@ public void onNext(Integer t) {
}
}

@Test
public void testQueueFullEmitsErrorWithVaryingBufferSize() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: test prefix is not needed

final CountDownLatch latch = new CountDownLatch(1);
// randomize buffer size, note that underlying implementations may be tuning the real size to a power of 2
// which can lead to unexpected results when adding excess capacity (e.g.: see ConcurrentCircularArrayQueue)
for (int i = 1; i <= 1024; i = i * 2) {
final int capacity = i;
Observable<Integer> observable = Observable.create(new OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> o) {
for (int i = 0; i < capacity + 10; i++) {
o.onNext(i);
}
latch.countDown();
o.onCompleted();
}

});

TestSubscriber<Integer> testSubscriber = new TestSubscriber<Integer>(new Observer<Integer>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Integer t) {
try {
// force it to be slow wait until we have queued everything
latch.await(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

});
System.out.println("Using capacity " + capacity); // for post-failure debugging
observable.observeOn(Schedulers.newThread(), capacity).subscribe(testSubscriber);

testSubscriber.awaitTerminalEvent();
List<Throwable> errors = testSubscriber.getOnErrorEvents();
assertEquals(1, errors.size());
System.out.println("Errors: " + errors);
Throwable t = errors.get(0);
if (t instanceof MissingBackpressureException) {
// success, we expect this
} else {
if (t.getCause() instanceof MissingBackpressureException) {
// this is also okay
} else {
fail("Expecting MissingBackpressureException");
}
}
}
}

@Test
public void testAsyncChild() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Expand Down