Skip to content

Commit

Permalink
Merge pull request ReactiveX#545 from akarnokd/ZipInfiniteFix
Browse files Browse the repository at this point in the history
Fixed Zip issue with infinite streams.
  • Loading branch information
benjchristensen committed Dec 3, 2013
2 parents 881db5c + e5912ed commit f28bc5b
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 5 deletions.
5 changes: 4 additions & 1 deletion rxjava-core/src/main/java/rx/operators/OperationZip.java
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,9 @@ public void onNext(T value) {
if (io.done) {
observer.onCompleted();
cancel.unsubscribe();
return;
}
return;
continue;
}
Object v = io.queue.peek();
if (v == NULL_SENTINEL) {
Expand All @@ -459,6 +460,8 @@ public void onNext(T value) {
io.queue.poll();
}
observer.onNext(values);
} else {
break;
}
}
} finally {
Expand Down
121 changes: 117 additions & 4 deletions rxjava-core/src/test/java/rx/operators/OperationZipTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,19 @@
*/
package rx.operators;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static rx.operators.OperationZip.*;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static rx.operators.OperationZip.zip;

import java.util.Arrays;
import java.util.Collection;

import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;

Expand All @@ -38,7 +44,32 @@
import rx.util.functions.Functions;

public class OperationZipTest {

Func2<String, String, String> concat2Strings;
PublishSubject<String> s1;
PublishSubject<String> s2;
Observable<String> zipped;

Observer<String> observer;
InOrder inOrder;
@Before
@SuppressWarnings("unchecked")
public void setUp() {
concat2Strings = new Func2<String, String, String>() {
@Override
public String call(String t1, String t2) {
return t1 + "-" + t2;
}
};

s1 = PublishSubject.create();
s2 = PublishSubject.create();
zipped = Observable.zip(s1, s2, concat2Strings);

observer = mock(Observer.class);
inOrder = inOrder(observer);

zipped.subscribe(observer);
}
@SuppressWarnings("unchecked")
@Test
public void testCollectionSizeDifferentThanFunction() {
Expand Down Expand Up @@ -702,4 +733,86 @@ public Subscription onSubscribe(Observer<? super String> Observer) {
}

}

@Test
public void testFirstCompletesThenSecondInfinite() {
s1.onNext("a");
s1.onNext("b");
s1.onCompleted();
s2.onNext("1");
inOrder.verify(observer, times(1)).onNext("a-1");
s2.onNext("2");
inOrder.verify(observer, times(1)).onNext("b-2");
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test
public void testSecondInfiniteThenFirstCompletes() {
s2.onNext("1");
s2.onNext("2");
s1.onNext("a");
inOrder.verify(observer, times(1)).onNext("a-1");
s1.onNext("b");
inOrder.verify(observer, times(1)).onNext("b-2");
s1.onCompleted();
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test
public void testSecondCompletesThenFirstInfinite() {
s2.onNext("1");
s2.onNext("2");
s2.onCompleted();
s1.onNext("a");
inOrder.verify(observer, times(1)).onNext("a-1");
s1.onNext("b");
inOrder.verify(observer, times(1)).onNext("b-2");
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test
public void testFirstInfiniteThenSecondCompletes() {
s1.onNext("a");
s1.onNext("b");
s2.onNext("1");
inOrder.verify(observer, times(1)).onNext("a-1");
s2.onNext("2");
inOrder.verify(observer, times(1)).onNext("b-2");
s2.onCompleted();
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}
@Test
public void testFirstFails() {
s2.onNext("a");
s1.onError(new RuntimeException("Forced failure"));

inOrder.verify(observer, times(1)).onError(any(RuntimeException.class));

s2.onNext("b");
s1.onNext("1");
s1.onNext("2");

inOrder.verify(observer, never()).onCompleted();
inOrder.verify(observer, never()).onNext(any(String.class));
inOrder.verifyNoMoreInteractions();
}
@Test
public void testSecondFails() {
s1.onNext("a");
s1.onNext("b");
s2.onError(new RuntimeException("Forced failure"));

inOrder.verify(observer, times(1)).onError(any(RuntimeException.class));

s2.onNext("1");
s2.onNext("2");

inOrder.verify(observer, never()).onCompleted();
inOrder.verify(observer, never()).onNext(any(String.class));
inOrder.verifyNoMoreInteractions();
}
}

0 comments on commit f28bc5b

Please sign in to comment.