Skip to content

Commit

Permalink
Rewrite concat operation to not block on subscribe
Browse files Browse the repository at this point in the history
The concat operator previously blocked on calling subscribe until all the
sequences had finished. In quite some cases this results in unwanted (and
unexpected) behaviour, such as when prefixing an infinite Observable
with a fixed one, for example when using startWith (which calls concat):
someInputStream.startWith(123).subscribe(x -> print(x));
This statement will block indefinitely if the input stream is infinite. Also
on finite sequences it seems silly to have to wait for them to finish.

In this new approach the incoming observables are put into a queue, instead
of waiting for the whole sequence to finish. When the first observable
completes, the next one is taken from the queue and subscribed to, and so
on. The queue can be extended while processing the observables, and
onCompleted is only called when both the source of observables has completed
and all observables in the queue have been read.
  • Loading branch information
Treora committed May 14, 2013
1 parent 62ec36e commit cfa7155
Showing 1 changed file with 88 additions and 79 deletions.
167 changes: 88 additions & 79 deletions rxjava-core/src/main/java/rx/operators/OperationConcat.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.junit.Test;

Expand All @@ -41,23 +43,10 @@ public final class OperationConcat {

/**
* Combine the observable sequences from the list of Observables into one
* observable sequence without any transformation. If either the outer
* observable sequence without any transformation. If either the outer
* observable or an inner observable calls onError, we will call onError.
*
* <p/>
*
* The outer observable might run on a separate thread from (one of) the
* inner observables; in this case care must be taken to avoid a deadlock.
* The Concat operation may block the outer thread while servicing an inner
* thread in order to ensure a well-defined ordering of elements; therefore
* none of the inner threads must be implemented in a way that might wait on
* the outer thread.
*
* <p/>
*
* Beware that concat(o1,o2).subscribe() is a blocking call from
* which it is impossible to unsubscribe if observables are running on same thread.
*
* @param sequences An observable sequence of elements to project.
* @return An observable sequence whose elements are the result of combining the output from the list of Observables.
*/
Expand All @@ -70,73 +59,101 @@ public static <T> Func1<Observer<T>, Subscription> concat(final List<Observable<
}

public static <T> Func1<Observer<T>, Subscription> concat(final Observable<Observable<T>> sequences) {
return new Func1<Observer<T>, Subscription>() {

@Override
public Subscription call(Observer<T> observer) {
return new ConcatSubscription<T>(sequences, observer);
}
};
return new Concat<T>(sequences);
}

private static class ConcatSubscription<T> extends BooleanSubscription {
// Might be updated by an inner thread's onError during the outer
// thread's onNext, then read in the outer thread's onComplete.
final AtomicBoolean innerError = new AtomicBoolean(false);
private static class Concat<T> implements Func1<Observer<T>, Subscription> {
private Observable<Observable<T>> sequences;
private AtomicObservableSubscription innerSubscription = null;

public Concat(Observable<Observable<T>> sequences) {
this.sequences = sequences;
}

public ConcatSubscription(Observable<Observable<T>> sequences, final Observer<T> observer) {
public Subscription call(final Observer<T> observer) {
final AtomicBoolean completedOrErred = new AtomicBoolean(false);
final AtomicBoolean allSequencesReceived = new AtomicBoolean(false);
final Queue<Observable<T>> nextSequences = new ConcurrentLinkedQueue<Observable<T>>();
final AtomicObservableSubscription outerSubscription = new AtomicObservableSubscription();
outerSubscription.wrap(sequences.subscribe(new Observer<Observable<T>>() {

final Observer<T> reusableObserver = new Observer<T>() {
@Override
public void onNext(Observable<T> nextSequence) {
// We will not return from onNext until the inner observer completes.
// NB: while we are in onNext, the well-behaved outer observable will not call onError or onCompleted.
final CountDownLatch latch = new CountDownLatch(1);
final AtomicObservableSubscription innerSubscription = new AtomicObservableSubscription();
innerSubscription.wrap(nextSequence.subscribe(new Observer<T>() {
@Override
public void onNext(T item) {
// Make our best-effort to release resources in the face of unsubscribe.
if (isUnsubscribed()) {
innerSubscription.unsubscribe();
outerSubscription.unsubscribe();
} else {
observer.onNext(item);
public void onNext(T item) {
observer.onNext(item);
}
@Override
public void onError(Exception e) {
if (completedOrErred.compareAndSet(false, true)) {
outerSubscription.unsubscribe();
observer.onError(e);
}
}
@Override
public void onCompleted() {
synchronized (nextSequences) {
if (nextSequences.isEmpty()) {
// No new sequences available at the moment
innerSubscription = null;
if (allSequencesReceived.get()) {
// No new sequences are coming, we are finished
if (completedOrErred.compareAndSet(false, true)) {
observer.onCompleted();
}
}
}
@Override
public void onError(Exception e) {
outerSubscription.unsubscribe();
innerError.set(true);
observer.onError(e);
latch.countDown();
}
@Override
public void onCompleted() {
} else {
// Continue on to the next sequence
latch.countDown();
innerSubscription = new AtomicObservableSubscription();
innerSubscription.wrap(nextSequences.poll().subscribe(this));
}
}
}
};

outerSubscription.wrap(sequences.subscribe(new Observer<Observable<T>>() {
@Override
public void onNext(Observable<T> nextSequence) {
synchronized (nextSequences) {
if (innerSubscription == null) {
// We are currently not subscribed to any sequence
innerSubscription = new AtomicObservableSubscription();
innerSubscription.wrap(nextSequence.subscribe(reusableObserver));
} else {
// Put this sequence at the end of the queue
nextSequences.add(nextSequence);
}
}));
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Exceptions.propagate(e);
}
}
@Override
public void onError(Exception e) {
// NB: a well-behaved observable will not interleave on{Next,Error,Completed} calls.
observer.onError(e);
if (completedOrErred.compareAndSet(false, true)) {
if (innerSubscription != null) {
innerSubscription.unsubscribe();
}
observer.onError(e);
}
}
@Override
public void onCompleted() {
// NB: a well-behaved observable will not interleave on{Next,Error,Completed} calls.
if (!innerError.get()) {
observer.onCompleted();
allSequencesReceived.set(true);
if (innerSubscription == null) {
// We are not subscribed to any sequence, and none are coming anymore
if (completedOrErred.compareAndSet(false, true)) {
observer.onCompleted();
}
}
}
}));

return new Subscription() {
@Override
public void unsubscribe() {
synchronized (nextSequences) {
if (innerSubscription != null)
innerSubscription.unsubscribe();
outerSubscription.unsubscribe();
}
}
};
}
}

Expand Down Expand Up @@ -445,7 +462,7 @@ public void testConcatConcurrentWithInfinity() {


/**
* The outer observable is running on the same thread and subscribe() in this case is a blocking call. Calling unsubscribe() is no-op because the sequence is complete.
* Test unsubscribing the concatenated Observable in a single thread.
*/
@Test
public void testConcatUnsubscribe() {
Expand All @@ -459,20 +476,13 @@ public void testConcatUnsubscribe() {
@SuppressWarnings("unchecked")
final Observable<String> concat = Observable.create(concat(w1, w2));
final AtomicObservableSubscription s1 = new AtomicObservableSubscription();
Thread t = new Thread() {
@Override
public void run() {
// NB: this statement does not complete until after "six" has been delivered.
s1.wrap(concat.subscribe(aObserver));
}
};
t.start();

try {
// Subscribe
s1.wrap(concat.subscribe(aObserver));
//Block main thread to allow observable "w1" to complete and observable "w2" to call onNext once.
callOnce.await();
// NB: This statement has no effect, since s1 cannot possibly
// wrap anything until "six" has been delivered, which cannot
// happen until we okToContinue.countDown()
// Unsubcribe
s1.unsubscribe();
//Unblock the observable to continue.
okToContinue.countDown();
Expand All @@ -488,10 +498,9 @@ public void run() {
inOrder.verify(aObserver, times(1)).onNext("two");
inOrder.verify(aObserver, times(1)).onNext("three");
inOrder.verify(aObserver, times(1)).onNext("four");
// NB: you might hope that five and six are not delivered, but see above.
inOrder.verify(aObserver, times(1)).onNext("five");
inOrder.verify(aObserver, times(1)).onNext("six");
inOrder.verify(aObserver, times(1)).onCompleted();
inOrder.verify(aObserver, never()).onNext("five");
inOrder.verify(aObserver, never()).onNext("six");
inOrder.verify(aObserver, never()).onCompleted();

}

Expand Down

0 comments on commit cfa7155

Please sign in to comment.