From 2367f90bb0f3bce5493ac5e014b599133c4410a7 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 9 Feb 2016 12:52:30 +0100 Subject: [PATCH] 1.x: fix observeOn resource handling, add delayError capability --- src/main/java/rx/Observable.java | 37 ++- src/main/java/rx/Single.java | 4 +- .../internal/operators/OperatorObserveOn.java | 237 ++++++++++-------- .../util/atomic/SpscAtomicArrayQueue.java | 5 + .../internal/util/unsafe/SpscArrayQueue.java | 5 + .../operators/OperatorObserveOnTest.java | 93 ++++--- 6 files changed, 240 insertions(+), 141 deletions(-) diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 7a2d91a2af..b0c5e3b935 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -5999,7 +5999,9 @@ public final Observable mergeWith(Observable t1) { /** * Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler}, - * asynchronously with an unbounded buffer. + * asynchronously with a bounded buffer. + *

Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly + * asynchronous. If strict event ordering is required, consider using the {@link #observeOn(Scheduler, boolean)} overload. *

* *

@@ -6014,12 +6016,43 @@ public final Observable mergeWith(Observable t1) { * @see ReactiveX operators documentation: ObserveOn * @see RxJava Threading Examples * @see #subscribeOn + * @see #observeOn(Scheduler, boolean) */ public final Observable observeOn(Scheduler scheduler) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable)this).scalarScheduleOn(scheduler); } - return lift(new OperatorObserveOn(scheduler)); + return lift(new OperatorObserveOn(scheduler, false)); + } + + /** + * Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler}, + * asynchronously with a bounded buffer and optionally delays onError notifications. + *

+ * + *

+ *
Scheduler:
+ *
you specify which {@link Scheduler} this operator will use
+ *
+ * + * @param scheduler + * the {@link Scheduler} to notify {@link Observer}s on + * @param delayError + * indicates if the onError notification may not cut ahead of onNext notification on the other side of the + * scheduling boundary. If true a sequence ending in onError will be replayed in the same order as was received + * from upstream + * @return the source Observable modified so that its {@link Observer}s are notified on the specified + * {@link Scheduler} + * @see ReactiveX operators documentation: ObserveOn + * @see RxJava Threading Examples + * @see #subscribeOn + * @see #observeOn(Scheduler) + */ + public final Observable observeOn(Scheduler scheduler, boolean delayError) { + if (this instanceof ScalarSynchronousObservable) { + return ((ScalarSynchronousObservable)this).scalarScheduleOn(scheduler); + } + return lift(new OperatorObserveOn(scheduler, delayError)); } /** diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index a768779a4d..5ad3d92f73 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -1381,7 +1381,9 @@ public final Single observeOn(Scheduler scheduler) { if (this instanceof ScalarSynchronousSingle) { return ((ScalarSynchronousSingle)this).scalarScheduleOn(scheduler); } - return lift(new OperatorObserveOn(scheduler)); + // Note that since Single emits onSuccess xor onError, + // there is no cut-ahead possible like with regular Observable sequences. + return lift(new OperatorObserveOn(scheduler, false)); } /** diff --git a/src/main/java/rx/internal/operators/OperatorObserveOn.java b/src/main/java/rx/internal/operators/OperatorObserveOn.java index 8aff74e67f..98464efb89 100644 --- a/src/main/java/rx/internal/operators/OperatorObserveOn.java +++ b/src/main/java/rx/internal/operators/OperatorObserveOn.java @@ -16,22 +16,17 @@ package rx.internal.operators; import java.util.Queue; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import rx.*; import rx.Observable.Operator; -import rx.Producer; -import rx.Scheduler; -import rx.Subscriber; -import rx.Subscription; import rx.exceptions.MissingBackpressureException; import rx.functions.Action0; -import rx.internal.util.RxRingBuffer; -import rx.internal.util.SynchronizedQueue; -import rx.internal.util.unsafe.SpscArrayQueue; -import rx.internal.util.unsafe.UnsafeAccess; -import rx.schedulers.ImmediateScheduler; -import rx.schedulers.TrampolineScheduler; +import rx.internal.util.*; +import rx.internal.util.atomic.SpscAtomicArrayQueue; +import rx.internal.util.unsafe.*; +import rx.plugins.RxJavaPlugins; +import rx.schedulers.*; /** * Delivers events on the specified {@code Scheduler} asynchronously via an unbounded buffer. @@ -44,12 +39,15 @@ public final class OperatorObserveOn implements Operator { private final Scheduler scheduler; + private final boolean delayError; /** - * @param scheduler + * @param scheduler the scheduler to use + * @param delayError delay errors until all normal events are emitted in the other thread? */ - public OperatorObserveOn(Scheduler scheduler) { + public OperatorObserveOn(Scheduler scheduler, boolean delayError) { this.scheduler = scheduler; + this.delayError = delayError; } @Override @@ -61,58 +59,65 @@ public Subscriber call(Subscriber child) { // avoid overhead, execute directly return child; } else { - ObserveOnSubscriber parent = new ObserveOnSubscriber(scheduler, child); + ObserveOnSubscriber parent = new ObserveOnSubscriber(scheduler, child, delayError); parent.init(); return parent; } } /** Observe through individual queue per observer. */ - private static final class ObserveOnSubscriber extends Subscriber { + private static final class ObserveOnSubscriber extends Subscriber implements Action0 { final Subscriber child; final Scheduler.Worker recursiveScheduler; - final ScheduledUnsubscribe scheduledUnsubscribe; - final NotificationLite on = NotificationLite.instance(); - + final NotificationLite on; + final boolean delayError; final Queue queue; // the status of the current stream - volatile boolean finished = false; + volatile boolean finished; final AtomicLong requested = new AtomicLong(); final AtomicLong counter = new AtomicLong(); - volatile Throwable error; + /** + * The single exception if not null, should be written before setting finished (release) and read after + * reading finished (acquire). + */ + Throwable error; // do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should // not prevent anything downstream from consuming, which will happen if the Subscription is chained - public ObserveOnSubscriber(Scheduler scheduler, Subscriber child) { + public ObserveOnSubscriber(Scheduler scheduler, Subscriber child, boolean delayError) { this.child = child; this.recursiveScheduler = scheduler.createWorker(); + this.delayError = delayError; + this.on = NotificationLite.instance(); if (UnsafeAccess.isUnsafeAvailable()) { queue = new SpscArrayQueue(RxRingBuffer.SIZE); } else { - queue = new SynchronizedQueue(RxRingBuffer.SIZE); + queue = new SpscAtomicArrayQueue(RxRingBuffer.SIZE); } - this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler); } void init() { // don't want this code in the constructor because `this` can escape through the // setProducer call - child.add(scheduledUnsubscribe); - child.setProducer(new Producer() { + Subscriber localChild = child; + + localChild.setProducer(new Producer() { @Override public void request(long n) { - BackpressureUtils.getAndAddRequest(requested, n); - schedule(); + if (n > 0L) { + BackpressureUtils.getAndAddRequest(requested, n); + schedule(); + } } }); - child.add(recursiveScheduler); - child.add(this); + localChild.add(recursiveScheduler); + localChild.add(this); } @Override @@ -123,7 +128,7 @@ public void onStart() { @Override public void onNext(final T t) { - if (isUnsubscribed()) { + if (isUnsubscribed() || finished) { return; } if (!queue.offer(on.next(t))) { @@ -145,106 +150,126 @@ public void onCompleted() { @Override public void onError(final Throwable e) { if (isUnsubscribed() || finished) { + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); return; } error = e; - // unsubscribe eagerly since time will pass before the scheduled onError results in an unsubscribe event - unsubscribe(); finished = true; - // polling thread should skip any onNext still in the queue schedule(); } - final Action0 action = new Action0() { - - @Override - public void call() { - pollQueue(); - } - - }; - protected void schedule() { if (counter.getAndIncrement() == 0) { - recursiveScheduler.schedule(action); + recursiveScheduler.schedule(this); } } // only execute this from schedule() - void pollQueue() { - int emitted = 0; - final AtomicLong localRequested = this.requested; - final AtomicLong localCounter = this.counter; - do { - localCounter.set(1); - long produced = 0; - long r = localRequested.get(); - for (;;) { - if (child.isUnsubscribed()) + @Override + public void call() { + long emitted = 0L; + + long missed = 1L; + + // these are accessed in a tight loop around atomics so + // loading them into local variables avoids the mandatory re-reading + // of the constant fields + final Queue q = this.queue; + final Subscriber localChild = this.child; + final NotificationLite localOn = this.on; + + // requested and counter are not included to avoid JIT issues with register spilling + // and their access is is amortized because they are part of the outer loop which runs + // less frequently (usually after each RxRingBuffer.SIZE elements) + + for (;;) { + if (checkTerminated(finished, q.isEmpty(), localChild, q)) { + return; + } + + long requestAmount = requested.get(); + boolean unbounded = requestAmount == Long.MAX_VALUE; + long currentEmission = 0L; + + while (requestAmount != 0L) { + boolean done = finished; + Object v = q.poll(); + boolean empty = v == null; + + if (checkTerminated(done, empty, localChild, q)) { return; - Throwable error; - if (finished) { - if ((error = this.error) != null) { - // errors shortcut the queue so - // release the elements in the queue for gc - queue.clear(); - child.onError(error); - return; - } else - if (queue.isEmpty()) { - child.onCompleted(); - return; - } } - if (r > 0) { - Object o = queue.poll(); - if (o != null) { - child.onNext(on.getValue(o)); - r--; - emitted++; - produced++; - } else { - break; - } - } else { + + if (empty) { break; } + + localChild.onNext(localOn.getValue(v)); + + requestAmount--; + currentEmission--; + emitted++; + } + + if (currentEmission != 0L && !unbounded) { + requested.addAndGet(currentEmission); } - if (produced > 0 && localRequested.get() != Long.MAX_VALUE) { - localRequested.addAndGet(-produced); + + missed = counter.addAndGet(-missed); + if (missed == 0L) { + break; } - } while (localCounter.decrementAndGet() > 0); - if (emitted > 0) { + } + + if (emitted != 0L) { request(emitted); } } - } - - static final class ScheduledUnsubscribe extends AtomicInteger implements Subscription { - final Scheduler.Worker worker; - volatile boolean unsubscribed = false; - - public ScheduledUnsubscribe(Scheduler.Worker worker) { - this.worker = worker; - } - - @Override - public boolean isUnsubscribed() { - return unsubscribed; - } - - @Override - public void unsubscribe() { - if (getAndSet(1) == 0) { - worker.schedule(new Action0() { - @Override - public void call() { - worker.unsubscribe(); - unsubscribed = true; + + boolean checkTerminated(boolean done, boolean isEmpty, Subscriber a, Queue q) { + if (a.isUnsubscribed()) { + q.clear(); + return true; + } + + if (done) { + if (delayError) { + if (isEmpty) { + Throwable e = error; + try { + if (e != null) { + a.onError(e); + } else { + a.onCompleted(); + } + } finally { + recursiveScheduler.unsubscribe(); + } + } + } else { + Throwable e = error; + if (e != null) { + q.clear(); + try { + a.onError(e); + } finally { + recursiveScheduler.unsubscribe(); + } + return true; + } else + if (isEmpty) { + try { + a.onCompleted(); + } finally { + recursiveScheduler.unsubscribe(); + } + return true; } - }); + } + } + + return false; } - } } diff --git a/src/main/java/rx/internal/util/atomic/SpscAtomicArrayQueue.java b/src/main/java/rx/internal/util/atomic/SpscAtomicArrayQueue.java index 65c29e3ce8..cadf772d49 100644 --- a/src/main/java/rx/internal/util/atomic/SpscAtomicArrayQueue.java +++ b/src/main/java/rx/internal/util/atomic/SpscAtomicArrayQueue.java @@ -107,6 +107,11 @@ public int size() { } } + @Override + public boolean isEmpty() { + return lvProducerIndex() == lvConsumerIndex(); + } + private void soProducerIndex(long newIndex) { producerIndex.lazySet(newIndex); } diff --git a/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java b/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java index 88c6d491c6..17fee1c804 100644 --- a/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java +++ b/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java @@ -162,6 +162,11 @@ public int size() { } } } + + @Override + public boolean isEmpty() { + return lvProducerIndex() == lvConsumerIndex(); + } private void soProducerIndex(long v) { UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, v); diff --git a/src/test/java/rx/internal/operators/OperatorObserveOnTest.java b/src/test/java/rx/internal/operators/OperatorObserveOnTest.java index 65a4085384..0b4b98bc8e 100644 --- a/src/test/java/rx/internal/operators/OperatorObserveOnTest.java +++ b/src/test/java/rx/internal/operators/OperatorObserveOnTest.java @@ -15,47 +15,26 @@ */ package rx.internal.operators; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; import org.junit.Test; import org.mockito.InOrder; -import rx.Notification; +import rx.*; import rx.Observable; import rx.Observable.OnSubscribe; import rx.Observer; -import rx.Scheduler; -import rx.Subscriber; -import rx.Subscription; -import rx.exceptions.MissingBackpressureException; -import rx.exceptions.TestException; -import rx.functions.Action0; -import rx.functions.Action1; -import rx.functions.Func1; -import rx.functions.Func2; +import rx.exceptions.*; +import rx.functions.*; import rx.internal.util.RxRingBuffer; import rx.observers.TestSubscriber; -import rx.schedulers.Schedulers; -import rx.schedulers.TestScheduler; +import rx.schedulers.*; import rx.subjects.PublishSubject; public class OperatorObserveOnTest { @@ -804,5 +783,55 @@ public void onNext(Integer t) { assertTrue(latch.await(10, TimeUnit.SECONDS)); assertEquals(1, requests.size()); } + + @Test + public void testErrorDelayed() { + TestScheduler s = Schedulers.test(); + + Observable source = Observable.just(1, 2 ,3) + .concatWith(Observable.error(new TestException())); + + TestSubscriber ts = TestSubscriber.create(0); + + source.observeOn(s, true).subscribe(ts); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + s.advanceTimeBy(1, TimeUnit.SECONDS); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + ts.requestMore(1); + s.advanceTimeBy(1, TimeUnit.SECONDS); + + ts.assertValues(1); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + ts.requestMore(3); // requesting 2 doesn't switch to the error() source for some reason in concat. + s.advanceTimeBy(1, TimeUnit.SECONDS); + + ts.assertValues(1, 2, 3); + ts.assertError(TestException.class); + ts.assertNotCompleted(); + } + @Test + public void testErrorDelayedAsync() { + Observable source = Observable.just(1, 2 ,3) + .concatWith(Observable.error(new TestException())); + + TestSubscriber ts = TestSubscriber.create(); + + source.observeOn(Schedulers.computation(), true).subscribe(ts); + + ts.awaitTerminalEvent(2, TimeUnit.SECONDS); + ts.assertValues(1, 2, 3); + ts.assertError(TestException.class); + ts.assertNotCompleted(); + } }