diff --git a/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java b/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java index 6d5404ed03..658f941430 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java +++ b/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java @@ -183,7 +183,10 @@ public void onCompleted(int index, boolean hadValue) { } } - public void onNext(int index, T t) { + /** + * @return boolean true if propagated value + */ + public boolean onNext(int index, T t) { synchronized (this) { if (!haveValues.get(index)) { haveValues.set(index); @@ -192,17 +195,19 @@ public void onNext(int index, T t) { collectedValues[index] = t; if (haveValuesCount != collectedValues.length) { // haven't received value from each source yet so won't emit - return; - } - try { - buffer.onNext(combinator.call(collectedValues)); - } catch (MissingBackpressureException e) { - onError(e); - } catch (Throwable e) { - onError(e); + return false; + } else { + try { + buffer.onNext(combinator.call(collectedValues)); + } catch (MissingBackpressureException e) { + onError(e); + } catch (Throwable e) { + onError(e); + } } } tick(); + return true; } public void onError(Throwable e) { @@ -244,7 +249,10 @@ public void onError(Throwable e) { public void onNext(T t) { hasValue = true; emitted.incrementAndGet(); - producer.onNext(index, t); + boolean emitted = producer.onNext(index, t); + if (!emitted) { + request(1); + } } } diff --git a/src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java b/src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java index 31f772e272..65510c5e29 100644 --- a/src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java +++ b/src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java @@ -29,14 +29,18 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import org.mockito.InOrder; import org.mockito.Matchers; +import rx.Notification; import rx.Observable; import rx.Observer; import rx.Subscriber; +import rx.functions.Action1; import rx.functions.Func2; import rx.functions.Func3; import rx.functions.Func4; @@ -803,4 +807,42 @@ public void testBackpressure() { assertEquals("two4", events.get(2)); assertEquals(NUM, events.size()); } + + @Test + public void testWithCombineLatestIssue1717() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicInteger count = new AtomicInteger(); + final int SIZE = 2000; + Observable timer = Observable.timer(0, 1, TimeUnit.MILLISECONDS) + .observeOn(Schedulers.newThread()) + .doOnEach(new Action1>() { + + @Override + public void call(Notification n) { + // System.out.println(n); + if (count.incrementAndGet() >= SIZE) { + latch.countDown(); + } + } + + }).take(SIZE); + + TestSubscriber ts = new TestSubscriber(); + + Observable.combineLatest(timer, Observable. never(), new Func2() { + + @Override + public Long call(Long t1, Integer t2) { + return t1; + } + + }).subscribe(ts); + + if (!latch.await(SIZE + 1000, TimeUnit.MILLISECONDS)) { + fail("timed out"); + } + + assertEquals(SIZE, count.get()); + } + }