Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concat #273

Merged
merged 4 commits into from
May 16, 2013
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 154 additions & 80 deletions rxjava-core/src/main/java/rx/operators/OperationConcat.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.junit.Ignore;
import org.junit.Test;

import org.mockito.InOrder;
Expand All @@ -41,23 +45,10 @@ public final class OperationConcat {

/**
* Combine the observable sequences from the list of Observables into one
* observable sequence without any transformation. If either the outer
* observable sequence without any transformation. If either the outer
* observable or an inner observable calls onError, we will call onError.
*
* <p/>
*
* The outer observable might run on a separate thread from (one of) the
* inner observables; in this case care must be taken to avoid a deadlock.
* The Concat operation may block the outer thread while servicing an inner
* thread in order to ensure a well-defined ordering of elements; therefore
* none of the inner threads must be implemented in a way that might wait on
* the outer thread.
*
* <p/>
*
* Beware that concat(o1,o2).subscribe() is a blocking call from
* which it is impossible to unsubscribe if observables are running on same thread.
*
* @param sequences An observable sequence of elements to project.
* @return An observable sequence whose elements are the result of combining the output from the list of Observables.
*/
Expand All @@ -70,73 +61,101 @@ public static <T> Func1<Observer<T>, Subscription> concat(final List<Observable<
}

public static <T> Func1<Observer<T>, Subscription> concat(final Observable<Observable<T>> sequences) {
return new Func1<Observer<T>, Subscription>() {

@Override
public Subscription call(Observer<T> observer) {
return new ConcatSubscription<T>(sequences, observer);
}
};
return new Concat<T>(sequences);
}

private static class ConcatSubscription<T> extends BooleanSubscription {
// Might be updated by an inner thread's onError during the outer
// thread's onNext, then read in the outer thread's onComplete.
final AtomicBoolean innerError = new AtomicBoolean(false);
private static class Concat<T> implements Func1<Observer<T>, Subscription> {
private Observable<Observable<T>> sequences;
private AtomicObservableSubscription innerSubscription = null;

public ConcatSubscription(Observable<Observable<T>> sequences, final Observer<T> observer) {
public Concat(Observable<Observable<T>> sequences) {
this.sequences = sequences;
}

public Subscription call(final Observer<T> observer) {
final AtomicBoolean completedOrErred = new AtomicBoolean(false);
final AtomicBoolean allSequencesReceived = new AtomicBoolean(false);
final Queue<Observable<T>> nextSequences = new ConcurrentLinkedQueue<Observable<T>>();
final AtomicObservableSubscription outerSubscription = new AtomicObservableSubscription();
outerSubscription.wrap(sequences.subscribe(new Observer<Observable<T>>() {

final Observer<T> reusableObserver = new Observer<T>() {
@Override
public void onNext(Observable<T> nextSequence) {
// We will not return from onNext until the inner observer completes.
// NB: while we are in onNext, the well-behaved outer observable will not call onError or onCompleted.
final CountDownLatch latch = new CountDownLatch(1);
final AtomicObservableSubscription innerSubscription = new AtomicObservableSubscription();
innerSubscription.wrap(nextSequence.subscribe(new Observer<T>() {
@Override
public void onNext(T item) {
// Make our best-effort to release resources in the face of unsubscribe.
if (isUnsubscribed()) {
innerSubscription.unsubscribe();
outerSubscription.unsubscribe();
} else {
observer.onNext(item);
public void onNext(T item) {
observer.onNext(item);
}
@Override
public void onError(Exception e) {
if (completedOrErred.compareAndSet(false, true)) {
outerSubscription.unsubscribe();
observer.onError(e);
}
}
@Override
public void onCompleted() {
synchronized (nextSequences) {
if (nextSequences.isEmpty()) {
// No new sequences available at the moment
innerSubscription = null;
if (allSequencesReceived.get()) {
// No new sequences are coming, we are finished
if (completedOrErred.compareAndSet(false, true)) {
observer.onCompleted();
}
}
}
@Override
public void onError(Exception e) {
outerSubscription.unsubscribe();
innerError.set(true);
observer.onError(e);
latch.countDown();
}
@Override
public void onCompleted() {
} else {
// Continue on to the next sequence
latch.countDown();
innerSubscription = new AtomicObservableSubscription();
innerSubscription.wrap(nextSequences.poll().subscribe(this));
}
}
}
};

outerSubscription.wrap(sequences.subscribe(new Observer<Observable<T>>() {
@Override
public void onNext(Observable<T> nextSequence) {
synchronized (nextSequences) {
if (innerSubscription == null) {
// We are currently not subscribed to any sequence
innerSubscription = new AtomicObservableSubscription();
innerSubscription.wrap(nextSequence.subscribe(reusableObserver));
} else {
// Put this sequence at the end of the queue
nextSequences.add(nextSequence);
}
}));
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Exceptions.propagate(e);
}
}
@Override
public void onError(Exception e) {
// NB: a well-behaved observable will not interleave on{Next,Error,Completed} calls.
observer.onError(e);
if (completedOrErred.compareAndSet(false, true)) {
if (innerSubscription != null) {
innerSubscription.unsubscribe();
}
observer.onError(e);
}
}
@Override
public void onCompleted() {
// NB: a well-behaved observable will not interleave on{Next,Error,Completed} calls.
if (!innerError.get()) {
observer.onCompleted();
allSequencesReceived.set(true);
if (innerSubscription == null) {
// We are not subscribed to any sequence, and none are coming anymore
if (completedOrErred.compareAndSet(false, true)) {
observer.onCompleted();
}
}
}
}));

return new Subscription() {
@Override
public void unsubscribe() {
synchronized (nextSequences) {
if (innerSubscription != null)
innerSubscription.unsubscribe();
outerSubscription.unsubscribe();
}
}
};
}
}

Expand Down Expand Up @@ -443,9 +462,72 @@ public void testConcatConcurrentWithInfinity() {

}



@Test
public void testConcatUnSubscribeNotBlockingObservables() {

final CountDownLatch okToContinueW1 = new CountDownLatch(1);
final CountDownLatch okToContinueW2 = new CountDownLatch(1);

final TestObservable<String> w1 = new TestObservable<String>(null, okToContinueW1, "one", "two", "three");
final TestObservable<String> w2 = new TestObservable<String>(null, okToContinueW2, "four", "five", "six");

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
Observable<Observable<String>> observableOfObservables = Observable.create(new Func1<Observer<Observable<String>>, Subscription>() {

@Override
public Subscription call(Observer<Observable<String>> observer) {
// simulate what would happen in an observable
observer.onNext(w1);
observer.onNext(w2);
observer.onCompleted();

return new Subscription() {

@Override
public void unsubscribe() {
}

};
}

});
Observable<String> concat = Observable.create(concat(observableOfObservables));

concat.subscribe(aObserver);

verify(aObserver, times(0)).onCompleted();


//Wait for the thread to start up.
try {
Thread.sleep(25);
w1.t.join();
w2.t.join();
okToContinueW1.countDown();
okToContinueW2.countDown();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

InOrder inOrder = inOrder(aObserver);
inOrder.verify(aObserver, times(1)).onNext("one");
inOrder.verify(aObserver, times(1)).onNext("two");
inOrder.verify(aObserver, times(1)).onNext("three");
inOrder.verify(aObserver, times(1)).onNext("four");
inOrder.verify(aObserver, times(1)).onNext("five");
inOrder.verify(aObserver, times(1)).onNext("six");
verify(aObserver, times(1)).onCompleted();


}


/**
* The outer observable is running on the same thread and subscribe() in this case is a blocking call. Calling unsubscribe() is no-op because the sequence is complete.
* Test unsubscribing the concatenated Observable in a single thread.
*/
@Test
public void testConcatUnsubscribe() {
Expand All @@ -459,20 +541,13 @@ public void testConcatUnsubscribe() {
@SuppressWarnings("unchecked")
final Observable<String> concat = Observable.create(concat(w1, w2));
final AtomicObservableSubscription s1 = new AtomicObservableSubscription();
Thread t = new Thread() {
@Override
public void run() {
// NB: this statement does not complete until after "six" has been delivered.
s1.wrap(concat.subscribe(aObserver));
}
};
t.start();

try {
// Subscribe
s1.wrap(concat.subscribe(aObserver));
//Block main thread to allow observable "w1" to complete and observable "w2" to call onNext once.
callOnce.await();
// NB: This statement has no effect, since s1 cannot possibly
// wrap anything until "six" has been delivered, which cannot
// happen until we okToContinue.countDown()
// Unsubcribe
s1.unsubscribe();
//Unblock the observable to continue.
okToContinue.countDown();
Expand All @@ -488,10 +563,9 @@ public void run() {
inOrder.verify(aObserver, times(1)).onNext("two");
inOrder.verify(aObserver, times(1)).onNext("three");
inOrder.verify(aObserver, times(1)).onNext("four");
// NB: you might hope that five and six are not delivered, but see above.
inOrder.verify(aObserver, times(1)).onNext("five");
inOrder.verify(aObserver, times(1)).onNext("six");
inOrder.verify(aObserver, times(1)).onCompleted();
inOrder.verify(aObserver, never()).onNext("five");
inOrder.verify(aObserver, never()).onNext("six");
inOrder.verify(aObserver, never()).onCompleted();

}

Expand Down Expand Up @@ -599,7 +673,7 @@ public void run() {
once.countDown();
//Block until the main thread has called unsubscribe.
if (null != okToContinue)
okToContinue.await();
okToContinue.await(1, TimeUnit.SECONDS);
}
if (subscribed)
observer.onCompleted();
Expand Down