Skip to content

Commit

Permalink
Merge pull request #7 from zsxwing/subscribeOn+groupBy
Browse files Browse the repository at this point in the history
Add timeout to CoundDownLatch, ignore InterruptException and fix the tes...
  • Loading branch information
benjchristensen committed Feb 14, 2014
2 parents 54b19be + 0000b2c commit 6863f57
Showing 1 changed file with 42 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,20 @@
*/
package rx.operators;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static org.junit.Assert.assertFalse;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Test;
import org.mockito.InOrder;
Expand All @@ -30,6 +38,7 @@
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Scheduler;
import rx.Subscriber;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
Expand Down Expand Up @@ -329,6 +338,8 @@ public void testTimeoutSelectorWithTimeoutAndOnNextRaceCondition() throws Interr
final CountDownLatch observerReceivedTwo = new CountDownLatch(1);
final CountDownLatch timeoutEmittedOne = new CountDownLatch(1);
final CountDownLatch observerCompleted = new CountDownLatch(1);
final CountDownLatch enteredTimeoutOne = new CountDownLatch(1);
final AtomicBoolean latchTimeout = new AtomicBoolean(false);

final Func1<Integer, Observable<Integer>> timeoutFunc = new Func1<Integer, Observable<Integer>>() {
@Override
Expand All @@ -338,31 +349,23 @@ public Observable<Integer> call(Integer t1) {
return Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
try {
// emulate "unsubscribe" is busy and finishes after timeout.onNext(1)
timeoutEmittedOne.await();
} catch (InterruptedException e) {
// if we are interrupted then we complete (as this can happen when unsubscribed)
observerCompleted.countDown();
e.printStackTrace();
enteredTimeoutOne.countDown();
// force the timeout message be sent after observer.onNext(2)
while (true) {
try {
if (!observerReceivedTwo.await(30, TimeUnit.SECONDS)) {
// CountDownLatch timeout
// There should be something wrong
latchTimeout.set(true);
}
break;
} catch (InterruptedException e) {
// Since we just want to emulate a busy method,
// we ignore the interrupt signal from Scheduler.
}
}));
// force the timeout message be sent after observer.onNext(2)
try {
observerReceivedTwo.await();
} catch (InterruptedException e) {
// if we are interrupted then we complete (as this can happen when unsubscribed)
observerCompleted.countDown();
e.printStackTrace();
}
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(1);
timeoutEmittedOne.countDown();
}
subscriber.onNext(1);
timeoutEmittedOne.countDown();
}
}).subscribeOn(Schedulers.newThread());
} else {
Expand Down Expand Up @@ -401,9 +404,18 @@ public void run() {
PublishSubject<Integer> source = PublishSubject.create();
source.timeout(timeoutFunc, Observable.from(3)).subscribe(ts);
source.onNext(1); // start timeout
try {
if(!enteredTimeoutOne.await(30, TimeUnit.SECONDS)) {
latchTimeout.set(true);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
source.onNext(2); // disable timeout
try {
timeoutEmittedOne.await();
if(!timeoutEmittedOne.await(30, TimeUnit.SECONDS)) {
latchTimeout.set(true);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
Expand All @@ -412,7 +424,11 @@ public void run() {

}).start();

observerCompleted.await();
if(!observerCompleted.await(30, TimeUnit.SECONDS)) {
latchTimeout.set(true);
}

assertFalse("CoundDownLatch timeout", latchTimeout.get());

InOrder inOrder = inOrder(o);
inOrder.verify(o).onNext(1);
Expand Down

0 comments on commit 6863f57

Please sign in to comment.