diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 7468fccb47..7d8bd10f5c 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -6291,7 +6291,8 @@ public final Observable mergeWith(Observable t1) { /** * Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler}, - * asynchronously with a bounded buffer. + * asynchronously with a bounded buffer of {@link RxRingBuffer.SIZE} slots. + * *

Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly * asynchronous. If strict event ordering is required, consider using the {@link #observeOn(Scheduler, boolean)} overload. *

@@ -6308,13 +6309,41 @@ public final Observable mergeWith(Observable t1) { * @see ReactiveX operators documentation: ObserveOn * @see RxJava Threading Examples * @see #subscribeOn + * @see #observeOn(Scheduler, int) * @see #observeOn(Scheduler, boolean) + * @see #observeOn(Scheduler, boolean, int) */ public final Observable observeOn(Scheduler scheduler) { - if (this instanceof ScalarSynchronousObservable) { - return ((ScalarSynchronousObservable)this).scalarScheduleOn(scheduler); - } - return lift(new OperatorObserveOn(scheduler, false)); + return observeOn(scheduler, RxRingBuffer.SIZE); + } + + /** + * Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler}, + * asynchronously with a bounded buffer of configurable size other than the {@link RxRingBuffer.SIZE} + * default. + * + *

Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly + * asynchronous. If strict event ordering is required, consider using the {@link #observeOn(Scheduler, boolean)} overload. + *

+ * + *

+ *
Scheduler:
+ *
you specify which {@link Scheduler} this operator will use
+ *
+ * + * @param scheduler the {@link Scheduler} to notify {@link Observer}s on + * @param bufferSize the size of the buffer. + * @return the source Observable modified so that its {@link Observer}s are notified on the specified + * {@link Scheduler} + * @see ReactiveX operators documentation: ObserveOn + * @see RxJava Threading Examples + * @see #subscribeOn + * @see #observeOn(Scheduler) + * @see #observeOn(Scheduler, boolean) + * @see #observeOn(Scheduler, boolean, int) + */ + public final Observable observeOn(Scheduler scheduler, int bufferSize) { + return observeOn(scheduler, false, bufferSize); } /** @@ -6339,12 +6368,45 @@ public final Observable observeOn(Scheduler scheduler) { * @see RxJava Threading Examples * @see #subscribeOn * @see #observeOn(Scheduler) + * @see #observeOn(Scheduler, int) + * @see #observeOn(Scheduler, boolean, int) */ public final Observable observeOn(Scheduler scheduler, boolean delayError) { + return observeOn(scheduler, delayError, RxRingBuffer.SIZE); + } + + /** + * Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler}, + * asynchronously with a bounded buffer of configurable size other than the {@link RxRingBuffer.SIZE} + * default, and optionally delays onError notifications. + *

+ * + *

+ *
Scheduler:
+ *
you specify which {@link Scheduler} this operator will use
+ *
+ * + * @param scheduler + * the {@link Scheduler} to notify {@link Observer}s on + * @param delayError + * indicates if the onError notification may not cut ahead of onNext notification on the other side of the + * scheduling boundary. If true a sequence ending in onError will be replayed in the same order as was received + * from upstream + * @param bufferSize the size of the buffer. + * @return the source Observable modified so that its {@link Observer}s are notified on the specified + * {@link Scheduler} + * @see ReactiveX operators documentation: ObserveOn + * @see RxJava Threading Examples + * @see #subscribeOn + * @see #observeOn(Scheduler) + * @see #observeOn(Scheduler, int) + * @see #observeOn(Scheduler, boolean) + */ + public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable)this).scalarScheduleOn(scheduler); } - return lift(new OperatorObserveOn(scheduler, delayError)); + return lift(new OperatorObserveOn(scheduler, delayError, bufferSize)); } /** diff --git a/src/main/java/rx/internal/operators/OperatorObserveOn.java b/src/main/java/rx/internal/operators/OperatorObserveOn.java index 51d6fc7a23..2a7c7684dd 100644 --- a/src/main/java/rx/internal/operators/OperatorObserveOn.java +++ b/src/main/java/rx/internal/operators/OperatorObserveOn.java @@ -40,14 +40,25 @@ public final class OperatorObserveOn implements Operator { private final Scheduler scheduler; private final boolean delayError; + private final int bufferSize; /** * @param scheduler the scheduler to use * @param delayError delay errors until all normal events are emitted in the other thread? */ public OperatorObserveOn(Scheduler scheduler, boolean delayError) { + this(scheduler, delayError, RxRingBuffer.SIZE); + } + + /** + * @param scheduler the scheduler to use + * @param delayError delay errors until all normal events are emitted in the other thread? + * @param bufferSize for the buffer feeding the Scheduler workers, defaults to {@code RxRingBuffer.MAX} if <= 0 + */ + public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) { this.scheduler = scheduler; this.delayError = delayError; + this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE; } @Override @@ -59,7 +70,7 @@ public Subscriber call(Subscriber child) { // avoid overhead, execute directly return child; } else { - ObserveOnSubscriber parent = new ObserveOnSubscriber(scheduler, child, delayError); + ObserveOnSubscriber parent = new ObserveOnSubscriber(scheduler, child, delayError, bufferSize); parent.init(); return parent; } @@ -72,6 +83,7 @@ private static final class ObserveOnSubscriber extends Subscriber implemen final NotificationLite on; final boolean delayError; final Queue queue; + final int bufferSize; // the status of the current stream volatile boolean finished; @@ -88,15 +100,16 @@ private static final class ObserveOnSubscriber extends Subscriber implemen // do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should // not prevent anything downstream from consuming, which will happen if the Subscription is chained - public ObserveOnSubscriber(Scheduler scheduler, Subscriber child, boolean delayError) { + public ObserveOnSubscriber(Scheduler scheduler, Subscriber child, boolean delayError, int bufferSize) { this.child = child; this.recursiveScheduler = scheduler.createWorker(); this.delayError = delayError; this.on = NotificationLite.instance(); + this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE; if (UnsafeAccess.isUnsafeAvailable()) { - queue = new SpscArrayQueue(RxRingBuffer.SIZE); + queue = new SpscArrayQueue(this.bufferSize); } else { - queue = new SpscAtomicArrayQueue(RxRingBuffer.SIZE); + queue = new SpscAtomicArrayQueue(this.bufferSize); } } @@ -123,7 +136,7 @@ public void request(long n) { @Override public void onStart() { // signal that this is an async operator capable of receiving this many - request(RxRingBuffer.SIZE); + request(this.bufferSize); } @Override @@ -180,7 +193,7 @@ public void call() { // requested and counter are not included to avoid JIT issues with register spilling // and their access is is amortized because they are part of the outer loop which runs - // less frequently (usually after each RxRingBuffer.SIZE elements) + // less frequently (usually after each bufferSize elements) for (;;) { long requestAmount = requested.get(); diff --git a/src/test/java/rx/internal/operators/OperatorObserveOnTest.java b/src/test/java/rx/internal/operators/OperatorObserveOnTest.java index d0ba44be23..8ebc69eed7 100644 --- a/src/test/java/rx/internal/operators/OperatorObserveOnTest.java +++ b/src/test/java/rx/internal/operators/OperatorObserveOnTest.java @@ -580,6 +580,69 @@ public void onNext(Integer t) { } } + @Test + public void testQueueFullEmitsErrorWithVaryingBufferSize() { + final CountDownLatch latch = new CountDownLatch(1); + // randomize buffer size, note that underlying implementations may be tuning the real size to a power of 2 + // which can lead to unexpected results when adding excess capacity (e.g.: see ConcurrentCircularArrayQueue) + for (int i = 1; i <= 1024; i = i * 2) { + final int capacity = i; + Observable observable = Observable.create(new OnSubscribe() { + + @Override + public void call(Subscriber o) { + for (int i = 0; i < capacity + 10; i++) { + o.onNext(i); + } + latch.countDown(); + o.onCompleted(); + } + + }); + + TestSubscriber testSubscriber = new TestSubscriber(new Observer() { + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onNext(Integer t) { + try { + // force it to be slow wait until we have queued everything + latch.await(500, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + }); + System.out.println("Using capacity " + capacity); // for post-failure debugging + observable.observeOn(Schedulers.newThread(), capacity).subscribe(testSubscriber); + + testSubscriber.awaitTerminalEvent(); + List errors = testSubscriber.getOnErrorEvents(); + assertEquals(1, errors.size()); + System.out.println("Errors: " + errors); + Throwable t = errors.get(0); + if (t instanceof MissingBackpressureException) { + // success, we expect this + } else { + if (t.getCause() instanceof MissingBackpressureException) { + // this is also okay + } else { + fail("Expecting MissingBackpressureException"); + } + } + } + } + @Test public void testAsyncChild() { TestSubscriber ts = new TestSubscriber();