diff --git a/rxjava-core/src/main/java/rx/operators/OperationConcat.java b/rxjava-core/src/main/java/rx/operators/OperationConcat.java
index 93c14b2acf..de65b89f02 100644
--- a/rxjava-core/src/main/java/rx/operators/OperationConcat.java
+++ b/rxjava-core/src/main/java/rx/operators/OperationConcat.java
@@ -21,10 +21,14 @@
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
+import java.util.Queue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.junit.Ignore;
import org.junit.Test;
import org.mockito.InOrder;
@@ -41,23 +45,10 @@ public final class OperationConcat {
/**
* Combine the observable sequences from the list of Observables into one
- * observable sequence without any transformation. If either the outer
+ * observable sequence without any transformation. If either the outer
* observable or an inner observable calls onError, we will call onError.
- *
- *
- *
- * The outer observable might run on a separate thread from (one of) the
- * inner observables; in this case care must be taken to avoid a deadlock.
- * The Concat operation may block the outer thread while servicing an inner
- * thread in order to ensure a well-defined ordering of elements; therefore
- * none of the inner threads must be implemented in a way that might wait on
- * the outer thread.
- *
*
*
- * Beware that concat(o1,o2).subscribe() is a blocking call from
- * which it is impossible to unsubscribe if observables are running on same thread.
- *
* @param sequences An observable sequence of elements to project.
* @return An observable sequence whose elements are the result of combining the output from the list of Observables.
*/
@@ -70,73 +61,101 @@ public static Func1, Subscription> concat(final List Func1, Subscription> concat(final Observable> sequences) {
- return new Func1, Subscription>() {
-
- @Override
- public Subscription call(Observer observer) {
- return new ConcatSubscription(sequences, observer);
- }
- };
+ return new Concat(sequences);
}
- private static class ConcatSubscription extends BooleanSubscription {
- // Might be updated by an inner thread's onError during the outer
- // thread's onNext, then read in the outer thread's onComplete.
- final AtomicBoolean innerError = new AtomicBoolean(false);
+ private static class Concat implements Func1, Subscription> {
+ private Observable> sequences;
+ private AtomicObservableSubscription innerSubscription = null;
- public ConcatSubscription(Observable> sequences, final Observer observer) {
+ public Concat(Observable> sequences) {
+ this.sequences = sequences;
+ }
+
+ public Subscription call(final Observer observer) {
+ final AtomicBoolean completedOrErred = new AtomicBoolean(false);
+ final AtomicBoolean allSequencesReceived = new AtomicBoolean(false);
+ final Queue> nextSequences = new ConcurrentLinkedQueue>();
final AtomicObservableSubscription outerSubscription = new AtomicObservableSubscription();
- outerSubscription.wrap(sequences.subscribe(new Observer>() {
+
+ final Observer reusableObserver = new Observer() {
@Override
- public void onNext(Observable nextSequence) {
- // We will not return from onNext until the inner observer completes.
- // NB: while we are in onNext, the well-behaved outer observable will not call onError or onCompleted.
- final CountDownLatch latch = new CountDownLatch(1);
- final AtomicObservableSubscription innerSubscription = new AtomicObservableSubscription();
- innerSubscription.wrap(nextSequence.subscribe(new Observer() {
- @Override
- public void onNext(T item) {
- // Make our best-effort to release resources in the face of unsubscribe.
- if (isUnsubscribed()) {
- innerSubscription.unsubscribe();
- outerSubscription.unsubscribe();
- } else {
- observer.onNext(item);
+ public void onNext(T item) {
+ observer.onNext(item);
+ }
+ @Override
+ public void onError(Exception e) {
+ if (completedOrErred.compareAndSet(false, true)) {
+ outerSubscription.unsubscribe();
+ observer.onError(e);
+ }
+ }
+ @Override
+ public void onCompleted() {
+ synchronized (nextSequences) {
+ if (nextSequences.isEmpty()) {
+ // No new sequences available at the moment
+ innerSubscription = null;
+ if (allSequencesReceived.get()) {
+ // No new sequences are coming, we are finished
+ if (completedOrErred.compareAndSet(false, true)) {
+ observer.onCompleted();
+ }
}
- }
- @Override
- public void onError(Exception e) {
- outerSubscription.unsubscribe();
- innerError.set(true);
- observer.onError(e);
- latch.countDown();
- }
- @Override
- public void onCompleted() {
+ } else {
// Continue on to the next sequence
- latch.countDown();
+ innerSubscription = new AtomicObservableSubscription();
+ innerSubscription.wrap(nextSequences.poll().subscribe(this));
+ }
+ }
+ }
+ };
+
+ outerSubscription.wrap(sequences.subscribe(new Observer>() {
+ @Override
+ public void onNext(Observable nextSequence) {
+ synchronized (nextSequences) {
+ if (innerSubscription == null) {
+ // We are currently not subscribed to any sequence
+ innerSubscription = new AtomicObservableSubscription();
+ innerSubscription.wrap(nextSequence.subscribe(reusableObserver));
+ } else {
+ // Put this sequence at the end of the queue
+ nextSequences.add(nextSequence);
}
- }));
- try {
- latch.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw Exceptions.propagate(e);
}
}
@Override
public void onError(Exception e) {
- // NB: a well-behaved observable will not interleave on{Next,Error,Completed} calls.
- observer.onError(e);
+ if (completedOrErred.compareAndSet(false, true)) {
+ if (innerSubscription != null) {
+ innerSubscription.unsubscribe();
+ }
+ observer.onError(e);
+ }
}
@Override
public void onCompleted() {
- // NB: a well-behaved observable will not interleave on{Next,Error,Completed} calls.
- if (!innerError.get()) {
- observer.onCompleted();
+ allSequencesReceived.set(true);
+ if (innerSubscription == null) {
+ // We are not subscribed to any sequence, and none are coming anymore
+ if (completedOrErred.compareAndSet(false, true)) {
+ observer.onCompleted();
+ }
}
}
}));
+
+ return new Subscription() {
+ @Override
+ public void unsubscribe() {
+ synchronized (nextSequences) {
+ if (innerSubscription != null)
+ innerSubscription.unsubscribe();
+ outerSubscription.unsubscribe();
+ }
+ }
+ };
}
}
@@ -443,9 +462,72 @@ public void testConcatConcurrentWithInfinity() {
}
+
+
+ @Test
+ public void testConcatUnSubscribeNotBlockingObservables() {
+
+ final CountDownLatch okToContinueW1 = new CountDownLatch(1);
+ final CountDownLatch okToContinueW2 = new CountDownLatch(1);
+
+ final TestObservable w1 = new TestObservable(null, okToContinueW1, "one", "two", "three");
+ final TestObservable w2 = new TestObservable(null, okToContinueW2, "four", "five", "six");
+
+ @SuppressWarnings("unchecked")
+ Observer aObserver = mock(Observer.class);
+ Observable> observableOfObservables = Observable.create(new Func1>, Subscription>() {
+
+ @Override
+ public Subscription call(Observer> observer) {
+ // simulate what would happen in an observable
+ observer.onNext(w1);
+ observer.onNext(w2);
+ observer.onCompleted();
+
+ return new Subscription() {
+
+ @Override
+ public void unsubscribe() {
+ }
+
+ };
+ }
+
+ });
+ Observable concat = Observable.create(concat(observableOfObservables));
+
+ concat.subscribe(aObserver);
+
+ verify(aObserver, times(0)).onCompleted();
+
+
+ //Wait for the thread to start up.
+ try {
+ Thread.sleep(25);
+ w1.t.join();
+ w2.t.join();
+ okToContinueW1.countDown();
+ okToContinueW2.countDown();
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ InOrder inOrder = inOrder(aObserver);
+ inOrder.verify(aObserver, times(1)).onNext("one");
+ inOrder.verify(aObserver, times(1)).onNext("two");
+ inOrder.verify(aObserver, times(1)).onNext("three");
+ inOrder.verify(aObserver, times(1)).onNext("four");
+ inOrder.verify(aObserver, times(1)).onNext("five");
+ inOrder.verify(aObserver, times(1)).onNext("six");
+ verify(aObserver, times(1)).onCompleted();
+
+
+ }
+
/**
- * The outer observable is running on the same thread and subscribe() in this case is a blocking call. Calling unsubscribe() is no-op because the sequence is complete.
+ * Test unsubscribing the concatenated Observable in a single thread.
*/
@Test
public void testConcatUnsubscribe() {
@@ -459,20 +541,13 @@ public void testConcatUnsubscribe() {
@SuppressWarnings("unchecked")
final Observable concat = Observable.create(concat(w1, w2));
final AtomicObservableSubscription s1 = new AtomicObservableSubscription();
- Thread t = new Thread() {
- @Override
- public void run() {
- // NB: this statement does not complete until after "six" has been delivered.
- s1.wrap(concat.subscribe(aObserver));
- }
- };
- t.start();
+
try {
+ // Subscribe
+ s1.wrap(concat.subscribe(aObserver));
//Block main thread to allow observable "w1" to complete and observable "w2" to call onNext once.
callOnce.await();
- // NB: This statement has no effect, since s1 cannot possibly
- // wrap anything until "six" has been delivered, which cannot
- // happen until we okToContinue.countDown()
+ // Unsubcribe
s1.unsubscribe();
//Unblock the observable to continue.
okToContinue.countDown();
@@ -488,10 +563,9 @@ public void run() {
inOrder.verify(aObserver, times(1)).onNext("two");
inOrder.verify(aObserver, times(1)).onNext("three");
inOrder.verify(aObserver, times(1)).onNext("four");
- // NB: you might hope that five and six are not delivered, but see above.
- inOrder.verify(aObserver, times(1)).onNext("five");
- inOrder.verify(aObserver, times(1)).onNext("six");
- inOrder.verify(aObserver, times(1)).onCompleted();
+ inOrder.verify(aObserver, never()).onNext("five");
+ inOrder.verify(aObserver, never()).onNext("six");
+ inOrder.verify(aObserver, never()).onCompleted();
}
@@ -599,7 +673,7 @@ public void run() {
once.countDown();
//Block until the main thread has called unsubscribe.
if (null != okToContinue)
- okToContinue.await();
+ okToContinue.await(1, TimeUnit.SECONDS);
}
if (subscribed)
observer.onCompleted();