From a1bdaf6ac3186d4ec559d141068310108983b3b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Sun, 26 May 2019 20:22:19 +0200 Subject: [PATCH] 2.x: Fix zip not stopping the subscription upon eager error --- .../operators/observable/ObservableZip.java | 4 +++ .../operators/flowable/FlowableZipTest.java | 30 +++++++++++++++++++ .../observable/ObservableZipTest.java | 30 +++++++++++++++++++ 3 files changed, 64 insertions(+) diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java index a259cd6d56..af465c43c6 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java @@ -179,6 +179,7 @@ public void drain() { if (z.done && !delayError) { Throwable ex = z.error; if (ex != null) { + cancelled = true; cancel(); a.onError(ex); return; @@ -224,6 +225,7 @@ boolean checkTerminated(boolean d, boolean empty, Observer a, boolean if (delayError) { if (empty) { Throwable e = source.error; + cancelled = true; cancel(); if (e != null) { a.onError(e); @@ -235,11 +237,13 @@ boolean checkTerminated(boolean d, boolean empty, Observer a, boolean } else { Throwable e = source.error; if (e != null) { + cancelled = true; cancel(); a.onError(e); return true; } else if (empty) { + cancelled = true; cancel(); a.onComplete(); return true; diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableZipTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableZipTest.java index ef1223d66a..12f81c33e0 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableZipTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableZipTest.java @@ -1895,4 +1895,34 @@ public Integer apply(Integer a, Integer b) throws Exception { ts.assertResult(4); } + + @Test + public void firstErrorPreventsSecondSubscription() { + final AtomicInteger counter = new AtomicInteger(); + + List> flowableList = new ArrayList>(); + flowableList.add(Flowable.create(new FlowableOnSubscribe() { + @Override + public void subscribe(FlowableEmitter e) + throws Exception { throw new TestException(); } + }, BackpressureStrategy.MISSING)); + flowableList.add(Flowable.create(new FlowableOnSubscribe() { + @Override + public void subscribe(FlowableEmitter e) + throws Exception { counter.getAndIncrement(); } + }, BackpressureStrategy.MISSING)); + + Flowable.zip(flowableList, + new Function() { + @Override + public Object apply(Object[] a) throws Exception { + return a; + } + }) + .test() + .assertFailure(TestException.class) + ; + + assertEquals(0, counter.get()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableZipTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableZipTest.java index 2fc7d7cb52..ba86f16175 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableZipTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableZipTest.java @@ -1428,4 +1428,34 @@ public Integer apply(Integer t1, Integer t2) throws Exception { ps2.onNext(2); to.assertResult(3); } + + @Test + public void firstErrorPreventsSecondSubscription() { + final AtomicInteger counter = new AtomicInteger(); + + List> observableList = new ArrayList>(); + observableList.add(Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter e) + throws Exception { throw new TestException(); } + })); + observableList.add(Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter e) + throws Exception { counter.getAndIncrement(); } + })); + + Observable.zip(observableList, + new Function() { + @Override + public Object apply(Object[] a) throws Exception { + return a; + } + }) + .test() + .assertFailure(TestException.class) + ; + + assertEquals(0, counter.get()); + } }