Skip to content

Commit

Permalink
Merge pull request #586 from akarnokd/ConcatFix
Browse files Browse the repository at this point in the history
Fix Concat to allow multiple observers
  • Loading branch information
benjchristensen committed Dec 8, 2013
2 parents d378ef2 + 21f7d52 commit c2709b3
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 8 deletions.
30 changes: 23 additions & 7 deletions rxjava-core/src/main/java/rx/operators/OperationConcat.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ public static <T> OnSubscribeFunc<T> concat(final Iterable<? extends Observable<
}

public static <T> OnSubscribeFunc<T> concat(final Observable<? extends Observable<? extends T>> sequences) {
return new Concat<T>(sequences);
return new OnSubscribeFunc<T>() {
@Override
public Subscription onSubscribe(Observer<? super T> t1) {
return new Concat<T>(sequences).onSubscribe(t1);
}
};
}

private static class Concat<T> implements OnSubscribeFunc<T> {
Expand Down Expand Up @@ -121,8 +126,12 @@ public void onNext(Observable<? extends T> nextSequence) {
@Override
public void onError(Throwable e) {
if (completedOrErred.compareAndSet(false, true)) {
if (innerSubscription != null) {
innerSubscription.unsubscribe();
Subscription q;
synchronized (nextSequences) {
q = innerSubscription;
}
if (q != null) {
q.unsubscribe();
}
observer.onError(e);
}
Expand All @@ -131,7 +140,11 @@ public void onError(Throwable e) {
@Override
public void onCompleted() {
allSequencesReceived.set(true);
if (innerSubscription == null) {
Subscription q;
synchronized (nextSequences) {
q = innerSubscription;
}
if (q == null) {
// We are not subscribed to any sequence, and none are coming anymore
if (completedOrErred.compareAndSet(false, true)) {
observer.onCompleted();
Expand All @@ -143,11 +156,14 @@ public void onCompleted() {
return new Subscription() {
@Override
public void unsubscribe() {
Subscription q;
synchronized (nextSequences) {
if (innerSubscription != null)
innerSubscription.unsubscribe();
outerSubscription.unsubscribe();
q = innerSubscription;
}
if (q != null) {
q.unsubscribe();
}
outerSubscription.unsubscribe();
}
};
}
Expand Down
44 changes: 43 additions & 1 deletion rxjava-core/src/test/java/rx/operators/OperationConcatTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package rx.operators;

import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static rx.operators.OperationConcat.*;

Expand All @@ -33,6 +32,7 @@
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.concurrency.TestScheduler;
import rx.subscriptions.BooleanSubscription;

public class OperationConcatTest {
Expand Down Expand Up @@ -556,4 +556,46 @@ public void run() {
return s;
}
}
@Test
public void testMultipleObservers() {
Observer<Object> o1 = mock(Observer.class);
Observer<Object> o2 = mock(Observer.class);

TestScheduler s = new TestScheduler();

Observable<Long> timer = Observable.interval(500, TimeUnit.MILLISECONDS, s).take(2);
Observable<Long> o = Observable.concat(timer, timer);

o.subscribe(o1);
o.subscribe(o2);

InOrder inOrder1 = inOrder(o1);
InOrder inOrder2 = inOrder(o2);

s.advanceTimeBy(500, TimeUnit.MILLISECONDS);

inOrder1.verify(o1, times(1)).onNext(0L);
inOrder2.verify(o2, times(1)).onNext(0L);

s.advanceTimeBy(500, TimeUnit.MILLISECONDS);

inOrder1.verify(o1, times(1)).onNext(1L);
inOrder2.verify(o2, times(1)).onNext(1L);

s.advanceTimeBy(500, TimeUnit.MILLISECONDS);

inOrder1.verify(o1, times(1)).onNext(0L);
inOrder2.verify(o2, times(1)).onNext(0L);

s.advanceTimeBy(500, TimeUnit.MILLISECONDS);

inOrder1.verify(o1, times(1)).onNext(1L);
inOrder2.verify(o2, times(1)).onNext(1L);

inOrder1.verify(o1, times(1)).onCompleted();
inOrder2.verify(o2, times(1)).onCompleted();

verify(o1, never()).onError(any(Throwable.class));
verify(o2, never()).onError(any(Throwable.class));
}
}

0 comments on commit c2709b3

Please sign in to comment.