Skip to content

Commit

Permalink
Merge branch 'take-test' of git://github.com/johngmyers/RxJava into p…
Browse files Browse the repository at this point in the history
…ull-212-take-merge
  • Loading branch information
benjchristensen committed Mar 31, 2013
2 parents 669f8a7 + 983e142 commit 54fcb94
Show file tree
Hide file tree
Showing 4 changed files with 671 additions and 141 deletions.
13 changes: 8 additions & 5 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import rx.operators.OperationDefer;
import rx.operators.OperationDematerialize;
import rx.operators.OperationFilter;
import rx.operators.OperationTake;
import rx.operators.OperationTakeWhile;
import rx.operators.OperationWhere;
import rx.operators.OperationMap;
import rx.operators.OperationMaterialize;
Expand All @@ -54,7 +56,6 @@
import rx.operators.OperationScan;
import rx.operators.OperationSkip;
import rx.operators.OperationSynchronize;
import rx.operators.OperationTake;
import rx.operators.OperationTakeLast;
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationToObservableIterable;
Expand Down Expand Up @@ -1779,7 +1780,7 @@ public static <T> Observable<T> takeLast(final Observable<T> items, final int co
* @return
*/
public static <T> Observable<T> takeWhile(final Observable<T> items, Func1<T, Boolean> predicate) {
return create(OperationTake.takeWhile(items, predicate));
return create(OperationTakeWhile.takeWhile(items, predicate));
}

/**
Expand Down Expand Up @@ -1811,16 +1812,18 @@ public Boolean call(T t) {
* @return
*/
public static <T> Observable<T> takeWhileWithIndex(final Observable<T> items, Func2<T, Integer, Boolean> predicate) {
return create(OperationTake.takeWhileWithIndex(items, predicate));
return create(OperationTakeWhile.takeWhileWithIndex(items, predicate));
}

public static <T> Observable<T> takeWhileWithIndex(final Observable<T> items, Object predicate) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(predicate);

return create(OperationTake.takeWhileWithIndex(items, new Func2<T, Integer, Boolean>() {
return create(OperationTakeWhile.takeWhileWithIndex(items, new Func2<T, Integer, Boolean>()
{
@Override
public Boolean call(T t, Integer integer) {
public Boolean call(T t, Integer integer)
{
return (Boolean) _f.call(t, integer);
}
}));
Expand Down
233 changes: 97 additions & 136 deletions rxjava-core/src/main/java/rx/operators/OperationTake.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,26 @@
*/
package rx.operators;

import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.AtomicObservableSubscription;
import rx.util.functions.Func1;
import rx.util.functions.Func2;
import rx.subjects.Subject;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static rx.testing.TrustedObservableTester.assertTrustedObservable;

/**
* Returns a specified number of contiguous values from the start of an observable sequence.
*/
Expand All @@ -43,61 +48,17 @@ public final class OperationTake {
* @return
*/
public static <T> Func1<Observer<T>, Subscription> take(final Observable<T> items, final int num) {
return takeWhileWithIndex(items, OperationTake.<T> numPredicate(num));
}

/**
* Returns a specified number of contiguous values from the start of an observable sequence.
*
* @param items
* @param predicate
* a function to test each source element for a condition
* @return
*/
public static <T> Func1<Observer<T>, Subscription> takeWhile(final Observable<T> items, final Func1<T, Boolean> predicate) {
return takeWhileWithIndex(items, OperationTake.<T> skipIndex(predicate));
}

/**
* Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values.
*
* @param items
* @param predicate
* a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false.
* @return
*/
public static <T> Func1<Observer<T>, Subscription> takeWhileWithIndex(final Observable<T> items, final Func2<T, Integer, Boolean> predicate) {
// wrap in a Func so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take<T> rather than 1 handing both, which is not thread-safe.
return new Func1<Observer<T>, Subscription>() {

@Override
public Subscription call(Observer<T> observer) {
return new TakeWhile<T>(items, predicate).call(observer);
return new Take<T>(items, num).call(observer);
}

};
}

private static <T> Func2<T, Integer, Boolean> numPredicate(final int num) {
return new Func2<T, Integer, Boolean>() {

@Override
public Boolean call(T input, Integer index) {
return index < num;
}

};
}

private static <T> Func2<T, Integer, Boolean> skipIndex(final Func1<T, Boolean> underlying) {
return new Func2<T, Integer, Boolean>() {
@Override
public Boolean call(T input, Integer index) {
return underlying.call(input);
}
};
}

/**
* This class is NOT thread-safe if invoked and referenced multiple times. In other words, don't subscribe to it multiple times from different threads.
* <p>
Expand All @@ -109,19 +70,41 @@ public Boolean call(T input, Integer index) {
*
* @param <T>
*/
private static class TakeWhile<T> implements Func1<Observer<T>, Subscription> {
private static class Take<T> implements Func1<Observer<T>, Subscription> {
private final AtomicInteger counter = new AtomicInteger();
private final Observable<T> items;
private final Func2<T, Integer, Boolean> predicate;
private final int num;
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();

private TakeWhile(Observable<T> items, Func2<T, Integer, Boolean> predicate) {
private Take(Observable<T> items, int num) {
this.items = items;
this.predicate = predicate;
this.num = num;
}

@Override
public Subscription call(Observer<T> observer) {
if (num < 1) {
items.subscribe(new Observer<T>()
{
@Override
public void onCompleted()
{
}

@Override
public void onError(Exception e)
{
}

@Override
public void onNext(T args)
{
}
}).unsubscribe();
observer.onCompleted();
return Subscriptions.empty();
}

return subscription.wrap(items.subscribe(new ItemObserver(observer)));
}

Expand All @@ -134,20 +117,28 @@ public ItemObserver(Observer<T> observer) {

@Override
public void onCompleted() {
observer.onCompleted();
if (counter.getAndSet(num) < num) {
observer.onCompleted();
}
}

@Override
public void onError(Exception e) {
observer.onError(e);
if (counter.getAndSet(num) < num) {
observer.onError(e);
}
}

@Override
public void onNext(T args) {
if (predicate.call(args, counter.getAndIncrement())) {
final int count = counter.incrementAndGet();
if (count <= num) {
observer.onNext(args);
} else {
observer.onCompleted();
if (count == num) {
observer.onCompleted();
}
}
if (count >= num) {
// this will work if the sequence is asynchronous, it will have no effect on a synchronous observable
subscription.unsubscribe();
}
Expand All @@ -160,65 +151,9 @@ public void onNext(T args) {
public static class UnitTest {

@Test
public void testTakeWhile1() {
Observable<Integer> w = Observable.toObservable(1, 2, 3);
Observable<Integer> take = Observable.create(takeWhile(w, new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer input) {
return input < 3;
}
}));

@SuppressWarnings("unchecked")
Observer<Integer> aObserver = mock(Observer.class);
take.subscribe(aObserver);
verify(aObserver, times(1)).onNext(1);
verify(aObserver, times(1)).onNext(2);
verify(aObserver, never()).onNext(3);
verify(aObserver, never()).onError(any(Exception.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testTakeWhileOnSubject1() {
Subject<Integer> s = Subject.create();
Observable<Integer> w = (Observable<Integer>)s;
Observable<Integer> take = Observable.create(takeWhile(w, new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer input) {
return input < 3;
}
}));

@SuppressWarnings("unchecked")
Observer<Integer> aObserver = mock(Observer.class);
take.subscribe(aObserver);

s.onNext(1);
s.onNext(2);
s.onNext(3);
s.onNext(4);
s.onNext(5);
s.onCompleted();

verify(aObserver, times(1)).onNext(1);
verify(aObserver, times(1)).onNext(2);
verify(aObserver, never()).onNext(3);
verify(aObserver, never()).onNext(4);
verify(aObserver, never()).onNext(5);
verify(aObserver, never()).onError(any(Exception.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testTakeWhile2() {
public void testTake1() {
Observable<String> w = Observable.toObservable("one", "two", "three");
Observable<String> take = Observable.create(takeWhileWithIndex(w, new Func2<String, Integer, Boolean>() {
@Override
public Boolean call(String input, Integer index) {
return index < 2;
}
}));
Observable<String> take = Observable.create(assertTrustedObservable(take(w, 2)));

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
Expand All @@ -231,33 +166,59 @@ public Boolean call(String input, Integer index) {
}

@Test
public void testTake1() {
public void testTake2() {
Observable<String> w = Observable.toObservable("one", "two", "three");
Observable<String> take = Observable.create(take(w, 2));
Observable<String> take = Observable.create(assertTrustedObservable(take(w, 1)));

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
take.subscribe(aObserver);
verify(aObserver, times(1)).onNext("one");
verify(aObserver, times(1)).onNext("two");
verify(aObserver, never()).onNext("two");
verify(aObserver, never()).onNext("three");
verify(aObserver, never()).onError(any(Exception.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testTake2() {
Observable<String> w = Observable.toObservable("one", "two", "three");
Observable<String> take = Observable.create(take(w, 1));
public void testTakeDoesntLeakErrors() {
Observable<String> source = Observable.create(new Func1<Observer<String>, Subscription>()
{
@Override
public Subscription call(Observer<String> observer)
{
observer.onNext("one");
observer.onError(new Exception("test failed"));
return Subscriptions.empty();
}
});
Observable.create(assertTrustedObservable(take(source, 1))).last();
}

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
take.subscribe(aObserver);
verify(aObserver, times(1)).onNext("one");
verify(aObserver, never()).onNext("two");
verify(aObserver, never()).onNext("three");
verify(aObserver, never()).onError(any(Exception.class));
verify(aObserver, times(1)).onCompleted();
@Test
public void testTakeZeroDoesntLeakError() {
final AtomicBoolean subscribed = new AtomicBoolean(false);
final AtomicBoolean unSubscribed = new AtomicBoolean(false);
Observable<String> source = Observable.create(new Func1<Observer<String>, Subscription>()
{
@Override
public Subscription call(Observer<String> observer)
{
subscribed.set(true);
observer.onError(new Exception("test failed"));
return new Subscription()
{
@Override
public void unsubscribe()
{
unSubscribed.set(true);
}
};
}
});
Observable.create(assertTrustedObservable(take(source, 0))).lastOrDefault("ok");
assertTrue("source subscribed", subscribed.get());
assertTrue("source unsubscribed", unSubscribed.get());
}

@Test
Expand All @@ -267,7 +228,7 @@ public void testUnsubscribeAfterTake() {

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
Observable<String> take = Observable.create(take(w, 1));
Observable<String> take = Observable.create(assertTrustedObservable(take(w, 1)));
take.subscribe(aObserver);

// wait for the Observable to complete
Expand Down
Loading

0 comments on commit 54fcb94

Please sign in to comment.