* The Reactive Streams specification is relatively strict when defining interactions between {@code Publisher}s and {@code Subscriber}s, so much so
- * that there is a significant performance penalty due certain timing requirements and the need to prepare for invalid
+ * that there is a significant performance penalty due certain timing requirements and the need to prepare for invalid
* request amounts via {@link Subscription#request(long)}.
* Therefore, RxJava has introduced the {@link FlowableSubscriber} interface that indicates the consumer can be driven with relaxed rules.
* All RxJava operators are implemented with these relaxed rules in mind.
@@ -112,7 +112,7 @@
*
* // could be some blocking operation
* Thread.sleep(1000);
- *
+ *
* // the consumer might have cancelled the flow
* if (emitter.isCancelled() {
* return;
@@ -138,7 +138,7 @@
* RxJava reactive sources, such as {@code Flowable}, are generally synchronous and sequential in nature. In the ReactiveX design, the location (thread)
* where operators run is orthogonal to when the operators can work with data. This means that asynchrony and parallelism
* has to be explicitly expressed via operators such as {@link #subscribeOn(Scheduler)}, {@link #observeOn(Scheduler)} and {@link #parallel()}. In general,
- * operators featuring a {@link Scheduler} parameter are introducing this type of asynchrony into the flow.
+ * operators featuring a {@link Scheduler} parameter are introducing this type of asynchrony into the flow.
*
* For more information see the ReactiveX
* documentation.
diff --git a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableIterable.java b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableIterable.java
index af6613b224..d09af33ee0 100644
--- a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableIterable.java
+++ b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableIterable.java
@@ -62,7 +62,7 @@ static final class BlockingFlowableIterator
long produced;
volatile boolean done;
- Throwable error;
+ volatile Throwable error;
BlockingFlowableIterator(int batchSize) {
this.queue = new SpscArrayQueue(batchSize);
@@ -75,6 +75,13 @@ static final class BlockingFlowableIterator
@Override
public boolean hasNext() {
for (;;) {
+ if (isDisposed()) {
+ Throwable e = error;
+ if (e != null) {
+ throw ExceptionHelper.wrapOrThrow(e);
+ }
+ return false;
+ }
boolean d = done;
boolean empty = queue.isEmpty();
if (d) {
@@ -90,7 +97,7 @@ public boolean hasNext() {
BlockingHelper.verifyNonBlocking();
lock.lock();
try {
- while (!done && queue.isEmpty()) {
+ while (!done && queue.isEmpty() && !isDisposed()) {
condition.await();
}
} catch (InterruptedException ex) {
@@ -175,6 +182,7 @@ public void remove() {
@Override
public void dispose() {
SubscriptionHelper.cancel(this);
+ signalConsumer();
}
@Override
diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java
index d3832d4edc..4d0bc47158 100644
--- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java
+++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java
@@ -419,7 +419,7 @@ public void onComplete() {
public void cancel() {
SubscriptionHelper.cancel(this);
}
-
+
public void request(long n) {
if (fusionMode != QueueSubscription.SYNC) {
get().request(n);
diff --git a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableIterable.java b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableIterable.java
index 3fdd26f9b6..24a7cb7701 100644
--- a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableIterable.java
+++ b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableIterable.java
@@ -53,7 +53,7 @@ static final class BlockingObservableIterator
final Condition condition;
volatile boolean done;
- Throwable error;
+ volatile Throwable error;
BlockingObservableIterator(int batchSize) {
this.queue = new SpscLinkedArrayQueue(batchSize);
@@ -64,6 +64,13 @@ static final class BlockingObservableIterator
@Override
public boolean hasNext() {
for (;;) {
+ if (isDisposed()) {
+ Throwable e = error;
+ if (e != null) {
+ throw ExceptionHelper.wrapOrThrow(e);
+ }
+ return false;
+ }
boolean d = done;
boolean empty = queue.isEmpty();
if (d) {
@@ -80,7 +87,7 @@ public boolean hasNext() {
BlockingHelper.verifyNonBlocking();
lock.lock();
try {
- while (!done && queue.isEmpty()) {
+ while (!done && queue.isEmpty() && !isDisposed()) {
condition.await();
}
} finally {
@@ -146,6 +153,7 @@ public void remove() {
@Override
public void dispose() {
DisposableHelper.dispose(this);
+ signalConsumer();
}
@Override
diff --git a/src/test/java/io/reactivex/internal/operators/flowable/BlockingFlowableToIteratorTest.java b/src/test/java/io/reactivex/internal/operators/flowable/BlockingFlowableToIteratorTest.java
index df3fe6d62c..68490f9257 100644
--- a/src/test/java/io/reactivex/internal/operators/flowable/BlockingFlowableToIteratorTest.java
+++ b/src/test/java/io/reactivex/internal/operators/flowable/BlockingFlowableToIteratorTest.java
@@ -16,14 +16,18 @@
import static org.junit.Assert.*;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import org.junit.*;
import org.reactivestreams.*;
import io.reactivex.Flowable;
+import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.*;
import io.reactivex.internal.operators.flowable.BlockingFlowableIterable.BlockingFlowableIterator;
import io.reactivex.internal.subscriptions.BooleanSubscription;
+import io.reactivex.processors.PublishProcessor;
+import io.reactivex.schedulers.Schedulers;
public class BlockingFlowableToIteratorTest {
@@ -185,4 +189,28 @@ protected void subscribeActual(Subscriber super Integer> s) {
it.next();
}
+
+ @Test(expected = NoSuchElementException.class)
+ public void disposedIteratorHasNextReturns() {
+ Iterator it = PublishProcessor.create()
+ .blockingIterable().iterator();
+ ((Disposable)it).dispose();
+ assertFalse(it.hasNext());
+ it.next();
+ }
+
+ @Test
+ public void asyncDisposeUnblocks() {
+ final Iterator it = PublishProcessor.create()
+ .blockingIterable().iterator();
+
+ Schedulers.single().scheduleDirect(new Runnable() {
+ @Override
+ public void run() {
+ ((Disposable)it).dispose();
+ }
+ }, 1, TimeUnit.SECONDS);
+
+ assertFalse(it.hasNext());
+ }
}
diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java
index 8edd8e7cd2..d0023d3e38 100644
--- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java
+++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableBufferTest.java
@@ -2770,6 +2770,7 @@ public void timedSizeBufferAlreadyCleared() {
}
@Test
+ @SuppressWarnings("unchecked")
public void bufferExactFailingSupplier() {
Flowable.empty()
.buffer(1, TimeUnit.SECONDS, Schedulers.computation(), 10, new Callable>() {
diff --git a/src/test/java/io/reactivex/internal/operators/observable/BlockingObservableNextTest.java b/src/test/java/io/reactivex/internal/operators/observable/BlockingObservableNextTest.java
index 947a2a027d..b854c317b2 100644
--- a/src/test/java/io/reactivex/internal/operators/observable/BlockingObservableNextTest.java
+++ b/src/test/java/io/reactivex/internal/operators/observable/BlockingObservableNextTest.java
@@ -28,7 +28,6 @@
import io.reactivex.exceptions.TestException;
import io.reactivex.internal.operators.observable.BlockingObservableNext.NextObserver;
import io.reactivex.plugins.RxJavaPlugins;
-import io.reactivex.processors.BehaviorProcessor;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.*;
@@ -332,9 +331,9 @@ public void testSingleSourceManyIterators() throws InterruptedException {
@Test
public void testSynchronousNext() {
- assertEquals(1, BehaviorProcessor.createDefault(1).take(1).blockingSingle().intValue());
- assertEquals(2, BehaviorProcessor.createDefault(2).blockingIterable().iterator().next().intValue());
- assertEquals(3, BehaviorProcessor.createDefault(3).blockingNext().iterator().next().intValue());
+ assertEquals(1, BehaviorSubject.createDefault(1).take(1).blockingSingle().intValue());
+ assertEquals(2, BehaviorSubject.createDefault(2).blockingIterable().iterator().next().intValue());
+ assertEquals(3, BehaviorSubject.createDefault(3).blockingNext().iterator().next().intValue());
}
@Test
diff --git a/src/test/java/io/reactivex/internal/operators/observable/BlockingObservableToIteratorTest.java b/src/test/java/io/reactivex/internal/operators/observable/BlockingObservableToIteratorTest.java
index f79b2a294d..324b7c2172 100644
--- a/src/test/java/io/reactivex/internal/operators/observable/BlockingObservableToIteratorTest.java
+++ b/src/test/java/io/reactivex/internal/operators/observable/BlockingObservableToIteratorTest.java
@@ -16,15 +16,18 @@
import static org.junit.Assert.*;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import org.junit.*;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
-import io.reactivex.disposables.Disposables;
+import io.reactivex.disposables.*;
import io.reactivex.exceptions.TestException;
import io.reactivex.internal.operators.observable.BlockingObservableIterable.BlockingObservableIterator;
+import io.reactivex.schedulers.Schedulers;
+import io.reactivex.subjects.PublishSubject;
public class BlockingObservableToIteratorTest {
@@ -119,4 +122,28 @@ public void remove() {
BlockingObservableIterator it = new BlockingObservableIterator(128);
it.remove();
}
+
+ @Test(expected = NoSuchElementException.class)
+ public void disposedIteratorHasNextReturns() {
+ Iterator it = PublishSubject.create()
+ .blockingIterable().iterator();
+ ((Disposable)it).dispose();
+ assertFalse(it.hasNext());
+ it.next();
+ }
+
+ @Test
+ public void asyncDisposeUnblocks() {
+ final Iterator it = PublishSubject.create()
+ .blockingIterable().iterator();
+
+ Schedulers.single().scheduleDirect(new Runnable() {
+ @Override
+ public void run() {
+ ((Disposable)it).dispose();
+ }
+ }, 1, TimeUnit.SECONDS);
+
+ assertFalse(it.hasNext());
+ }
}
diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java
index 92299a5754..16ba2fab6d 100644
--- a/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java
+++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableBufferTest.java
@@ -2137,6 +2137,7 @@ public ObservableSource> apply(Observable