From 8deba2892faf4893236c8d140f8e90fd70d37668 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sat, 30 Nov 2013 12:39:39 +0100 Subject: [PATCH 1/2] Fixed Zip issue with infinite streams. --- rxjava-core/src/main/java/rx/operators/OperationZip.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationZip.java b/rxjava-core/src/main/java/rx/operators/OperationZip.java index 4c7c70851f0..6ecbb87e497 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationZip.java +++ b/rxjava-core/src/main/java/rx/operators/OperationZip.java @@ -445,8 +445,9 @@ public void onNext(T value) { if (io.done) { observer.onCompleted(); cancel.unsubscribe(); + return; } - return; + continue; } Object v = io.queue.peek(); if (v == NULL_SENTINEL) { @@ -459,6 +460,8 @@ public void onNext(T value) { io.queue.poll(); } observer.onNext(values); + } else { + break; } } } finally { From e5912edc150860825f04b128f56e9b0368427dff Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sun, 1 Dec 2013 10:17:12 +0100 Subject: [PATCH 2/2] Tests to verify fix and error behavior --- .../java/rx/operators/OperationZipTest.java | 121 +++++++++++++++++- 1 file changed, 117 insertions(+), 4 deletions(-) diff --git a/rxjava-core/src/test/java/rx/operators/OperationZipTest.java b/rxjava-core/src/test/java/rx/operators/OperationZipTest.java index f3549a6fbd4..3f4ac10a556 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationZipTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationZipTest.java @@ -15,13 +15,19 @@ */ package rx.operators; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; -import static rx.operators.OperationZip.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static rx.operators.OperationZip.zip; import java.util.Arrays; import java.util.Collection; +import org.junit.Before; import org.junit.Test; import org.mockito.InOrder; @@ -38,7 +44,32 @@ import rx.util.functions.Functions; public class OperationZipTest { - + Func2 concat2Strings; + PublishSubject s1; + PublishSubject s2; + Observable zipped; + + Observer observer; + InOrder inOrder; + @Before + @SuppressWarnings("unchecked") + public void setUp() { + concat2Strings = new Func2() { + @Override + public String call(String t1, String t2) { + return t1 + "-" + t2; + } + }; + + s1 = PublishSubject.create(); + s2 = PublishSubject.create(); + zipped = Observable.zip(s1, s2, concat2Strings); + + observer = mock(Observer.class); + inOrder = inOrder(observer); + + zipped.subscribe(observer); + } @SuppressWarnings("unchecked") @Test public void testCollectionSizeDifferentThanFunction() { @@ -702,4 +733,86 @@ public Subscription onSubscribe(Observer Observer) { } } + + @Test + public void testFirstCompletesThenSecondInfinite() { + s1.onNext("a"); + s1.onNext("b"); + s1.onCompleted(); + s2.onNext("1"); + inOrder.verify(observer, times(1)).onNext("a-1"); + s2.onNext("2"); + inOrder.verify(observer, times(1)).onNext("b-2"); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testSecondInfiniteThenFirstCompletes() { + s2.onNext("1"); + s2.onNext("2"); + s1.onNext("a"); + inOrder.verify(observer, times(1)).onNext("a-1"); + s1.onNext("b"); + inOrder.verify(observer, times(1)).onNext("b-2"); + s1.onCompleted(); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testSecondCompletesThenFirstInfinite() { + s2.onNext("1"); + s2.onNext("2"); + s2.onCompleted(); + s1.onNext("a"); + inOrder.verify(observer, times(1)).onNext("a-1"); + s1.onNext("b"); + inOrder.verify(observer, times(1)).onNext("b-2"); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testFirstInfiniteThenSecondCompletes() { + s1.onNext("a"); + s1.onNext("b"); + s2.onNext("1"); + inOrder.verify(observer, times(1)).onNext("a-1"); + s2.onNext("2"); + inOrder.verify(observer, times(1)).onNext("b-2"); + s2.onCompleted(); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + @Test + public void testFirstFails() { + s2.onNext("a"); + s1.onError(new RuntimeException("Forced failure")); + + inOrder.verify(observer, times(1)).onError(any(RuntimeException.class)); + + s2.onNext("b"); + s1.onNext("1"); + s1.onNext("2"); + + inOrder.verify(observer, never()).onCompleted(); + inOrder.verify(observer, never()).onNext(any(String.class)); + inOrder.verifyNoMoreInteractions(); + } + @Test + public void testSecondFails() { + s1.onNext("a"); + s1.onNext("b"); + s2.onError(new RuntimeException("Forced failure")); + + inOrder.verify(observer, times(1)).onError(any(RuntimeException.class)); + + s2.onNext("1"); + s2.onNext("2"); + + inOrder.verify(observer, never()).onCompleted(); + inOrder.verify(observer, never()).onNext(any(String.class)); + inOrder.verifyNoMoreInteractions(); + } }