diff --git a/src/main/java/rx/internal/operators/OperatorScan.java b/src/main/java/rx/internal/operators/OperatorScan.java index 5b132fd767..f91d9b28f2 100644 --- a/src/main/java/rx/internal/operators/OperatorScan.java +++ b/src/main/java/rx/internal/operators/OperatorScan.java @@ -16,6 +16,7 @@ package rx.internal.operators; import java.util.Queue; +import java.util.concurrent.atomic.AtomicLong; import rx.*; import rx.Observable.Operator; @@ -175,12 +176,10 @@ static final class InitialProducer implements Producer, Observer { boolean missed; /** Missed a request. */ long missedRequested; - /** Missed a producer. */ - Producer missedProducer; /** The current requested amount. */ - long requested; + final AtomicLong requested; /** The current producer. */ - Producer producer; + volatile Producer producer; volatile boolean done; Throwable error; @@ -196,41 +195,7 @@ public InitialProducer(R initialValue, Subscriber child) { } this.queue = q; q.offer(NotificationLite.instance().next(initialValue)); - } - - @Override - public void request(long n) { - if (n < 0L) { - throw new IllegalArgumentException("n >= required but it was " + n); - } else - if (n != 0L) { - synchronized (this) { - if (emitting) { - long mr = missedRequested; - long mu = mr + n; - if (mu < 0L) { - mu = Long.MAX_VALUE; - } - missedRequested = mu; - return; - } - emitting = true; - } - - long r = requested; - long u = r + n; - if (u < 0L) { - u = Long.MAX_VALUE; - } - requested = u; - - Producer p = producer; - if (p != null) { - p.request(n); - } - - emitLoop(); - } + this.requested = new AtomicLong(); } @Override @@ -270,23 +235,51 @@ public void onCompleted() { emit(); } + @Override + public void request(long n) { + if (n < 0L) { + throw new IllegalArgumentException("n >= required but it was " + n); + } else + if (n != 0L) { + BackpressureUtils.getAndAddRequest(requested, n); + Producer p = producer; + if (p == null) { + // not synchronizing on this to avoid clash with emit() + synchronized (requested) { + p = producer; + if (p == null) { + long mr = missedRequested; + missedRequested = BackpressureUtils.addCap(mr, n); + } + } + } + if (p != null) { + p.request(n); + } + emit(); + } + } + public void setProducer(Producer p) { if (p == null) { throw new NullPointerException(); } - synchronized (this) { - if (emitting) { - missedProducer = p; - return; + long mr; + // not synchronizing on this to avoid clash with emit() + synchronized (requested) { + if (producer != null) { + throw new IllegalStateException("Can't set more than one Producer!"); } - emitting = true; + // request one less because of the initial value, this happens once + mr = missedRequested - 1; + missedRequested = 0L; + producer = p; } - producer = p; - long r = requested; - if (r != 0L) { - p.request(r); + + if (mr > 0L) { + p.request(mr); } - emitLoop(); + emit(); } void emit() { @@ -304,7 +297,9 @@ void emitLoop() { final Subscriber child = this.child; final Queue queue = this.queue; final NotificationLite nl = NotificationLite.instance(); - long r = requested; + AtomicLong requested = this.requested; + + long r = requested.get(); for (;;) { boolean max = r == Long.MAX_VALUE; boolean d = done; @@ -312,6 +307,7 @@ void emitLoop() { if (checkTerminated(d, empty, child)) { return; } + long e = 0L; while (r != 0L) { d = done; Object o = queue.poll(); @@ -325,52 +321,25 @@ void emitLoop() { R v = nl.getValue(o); try { child.onNext(v); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - child.onError(OnErrorThrowable.addValueAsLastCause(e, v)); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + child.onError(OnErrorThrowable.addValueAsLastCause(ex, v)); return; } - if (!max) { - r--; - } + r--; + e--; } - if (!max) { - requested = r; + + if (e != 0 && !max) { + r = requested.addAndGet(e); } - Producer p; - long mr; synchronized (this) { - p = missedProducer; - mr = missedRequested; - if (!missed && p == null && mr == 0L) { + if (!missed) { emitting = false; return; } missed = false; - missedProducer = null; - missedRequested = 0L; - } - - if (mr != 0L && !max) { - long u = r + mr; - if (u < 0L) { - u = Long.MAX_VALUE; - } - requested = u; - r = u; - } - - if (p != null) { - producer = p; - if (r != 0L) { - p.request(r); - } - } else { - p = producer; - if (p != null && mr != 0L) { - p.request(mr); - } } } } diff --git a/src/main/java/rx/internal/producers/ProducerObserverArbiter.java b/src/main/java/rx/internal/producers/ProducerObserverArbiter.java index 7600815094..985352a3f4 100644 --- a/src/main/java/rx/internal/producers/ProducerObserverArbiter.java +++ b/src/main/java/rx/internal/producers/ProducerObserverArbiter.java @@ -20,6 +20,7 @@ import rx.*; import rx.Observer; import rx.exceptions.*; +import rx.internal.operators.BackpressureUtils; /** * Producer that serializes any event emission with requesting and producer changes. @@ -135,6 +136,7 @@ public void request(long n) { } emitting = true; } + Producer p = currentProducer; boolean skipFinal = false; try { long r = requested; @@ -143,12 +145,7 @@ public void request(long n) { u = Long.MAX_VALUE; } requested = u; - - Producer p = currentProducer; - if (p != null) { - p.request(n); - } - + emitLoop(); skipFinal = true; } finally { @@ -158,6 +155,9 @@ public void request(long n) { } } } + if (p != null) { + p.request(n); + } } public void setProducer(Producer p) { @@ -169,12 +169,9 @@ public void setProducer(Producer p) { emitting = true; } boolean skipFinal = false; + currentProducer = p; + long r = requested; try { - currentProducer = p; - long r = requested; - if (p != null && r != 0) { - p.request(r); - } emitLoop(); skipFinal = true; } finally { @@ -184,17 +181,24 @@ public void setProducer(Producer p) { } } } + if (p != null && r != 0) { + p.request(r); + } } void emitLoop() { final Subscriber c = child; + long toRequest = 0L; + Producer requestFrom = null; + outer: for (;;) { long localRequested; Producer localProducer; Object localTerminal; List q; + boolean quit = false; synchronized (this) { localRequested = missedRequested; localProducer = missedProducer; @@ -203,13 +207,21 @@ void emitLoop() { if (localRequested == 0L && localProducer == null && q == null && localTerminal == null) { emitting = false; - return; + quit = true; + } else { + missedRequested = 0L; + missedProducer = null; + queue = null; + missedTerminal = null; } - missedRequested = 0L; - missedProducer = null; - queue = null; - missedTerminal = null; } + if (quit) { + if (toRequest != 0L && requestFrom != null) { + requestFrom.request(toRequest); + } + return; + } + boolean empty = q == null || q.isEmpty(); if (localTerminal != null) { if (localTerminal != Boolean.TRUE) { @@ -266,13 +278,15 @@ void emitLoop() { } else { currentProducer = localProducer; if (r != 0L) { - localProducer.request(r); + toRequest = BackpressureUtils.addCap(toRequest, r); + requestFrom = localProducer; } } } else { Producer p = currentProducer; if (p != null && localRequested != 0L) { - p.request(localRequested); + toRequest = BackpressureUtils.addCap(toRequest, localRequested); + requestFrom = p; } } } diff --git a/src/test/java/rx/internal/operators/OperatorScanTest.java b/src/test/java/rx/internal/operators/OperatorScanTest.java index 96c1b1dbe1..d053694dd9 100644 --- a/src/test/java/rx/internal/operators/OperatorScanTest.java +++ b/src/test/java/rx/internal/operators/OperatorScanTest.java @@ -426,4 +426,24 @@ public Integer call(Integer t1, Integer t2) { ts.assertNoErrors(); ts.assertCompleted(); } + + @Test(timeout = 1000) + public void testUnboundedSource() { + Observable.range(0, Integer.MAX_VALUE) + .scan(0, new Func2() { + @Override + public Integer call(Integer a, Integer b) { + return 0; + } + }) + .subscribe(new TestSubscriber() { + int count; + @Override + public void onNext(Integer t) { + if (++count == 2) { + unsubscribe(); + } + } + }); + } } diff --git a/src/test/java/rx/internal/producers/ProducersTest.java b/src/test/java/rx/internal/producers/ProducersTest.java index 0e5beacdfa..81377f29a3 100644 --- a/src/test/java/rx/internal/producers/ProducersTest.java +++ b/src/test/java/rx/internal/producers/ProducersTest.java @@ -23,8 +23,8 @@ import org.junit.*; import rx.*; -import rx.Observable.OnSubscribe; import rx.Observable; +import rx.Observable.*; import rx.Observer; import rx.functions.*; import rx.observers.TestSubscriber; @@ -378,4 +378,52 @@ public void testObserverArbiterAsync() { 20L, 21L, 22L, 23L, 24L, 40L, 41L, 42L, 43L, 44L)); } + + @Test(timeout = 1000) + public void testProducerObserverArbiterUnbounded() { + Observable.range(0, Integer.MAX_VALUE) + .lift(new Operator() { + @Override + public Subscriber call(Subscriber t) { + final ProducerObserverArbiter poa = new ProducerObserverArbiter(t); + + Subscriber parent = new Subscriber() { + + @Override + public void onCompleted() { + poa.onCompleted(); + } + + @Override + public void onError(Throwable e) { + poa.onError(e); + } + + @Override + public void onNext(Integer t) { + poa.onNext(t); + } + + + @Override + public void setProducer(Producer p) { + poa.setProducer(p); + } + }; + + t.add(parent); + t.setProducer(poa); + + return parent; + } + }).subscribe(new TestSubscriber() { + int count; + @Override + public void onNext(Integer t) { + if (++count == 2) { + unsubscribe(); + } + } + }); + } }