Skip to content

Commit

Permalink
implemented skipWhile and skipWhileWithIndex (ReactiveX#80)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmhofer committed Sep 7, 2013
1 parent 176280e commit 73a4ed2
Show file tree
Hide file tree
Showing 2 changed files with 222 additions and 0 deletions.
29 changes: 29 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import rx.operators.OperationSample;
import rx.operators.OperationScan;
import rx.operators.OperationSkip;
import rx.operators.OperationSkipWhile;
import rx.operators.OperationSubscribeOn;
import rx.operators.OperationSwitch;
import rx.operators.OperationSynchronize;
Expand Down Expand Up @@ -2341,6 +2342,34 @@ public <E> Observable<T> takeUntil(Observable<? extends E> other) {
return OperationTakeUntil.takeUntil(this, other);
}

/**
* Returns an Observable that bypasses all items from the source Observable as long as the specified
* condition holds true. Emits all further source items as soon as the condition becomes false.
* @param predicate
* A function to test each item emitted from the source Observable for a condition.
* It receives the emitted item as first parameter and the index of the emitted item as
* second parameter.
* @return an Observable that emits all items from the source Observable as soon as the condition
* becomes false.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211631%28v=vs.103%29.aspx">MSDN: Observable.SkipWhile</a>
*/
public Observable<T> skipWhileWithIndex(Func2<? super T, Integer, Boolean> predicate) {
return create(OperationSkipWhile.skipWhileWithIndex(this, predicate));
}

/**
* Returns an Observable that bypasses all items from the source Observable as long as the specified
* condition holds true. Emits all further source items as soon as the condition becomes false.
* @param predicate
* A function to test each item emitted from the source Observable for a condition.
* @return an Observable that emits all items from the source Observable as soon as the condition
* becomes false.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229685%28v=vs.103%29.aspx">MSDN: Observable.SkipWhile</a>
*/
public Observable<T> skipWhile(Func1<? super T, Boolean> predicate) {
return create(OperationSkipWhile.skipWhile(this, predicate));
}

/**
* Returns an Observable that emits a single item, a list composed of all the items emitted by
* the source Observable.
Expand Down
193 changes: 193 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationSkipWhile.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static rx.Observable.create;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;
import org.mockito.InOrder;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

/**
* Skips any emitted source items as long as the specified condition holds true. Emits all further source items
* as soon as the condition becomes false.
*/
public final class OperationSkipWhile {
public static <T> OnSubscribeFunc<T> skipWhileWithIndex(Observable<? extends T> source, Func2<? super T, Integer, Boolean> predicate) {
return new SkipWhile<T>(source, predicate);
}

public static <T> OnSubscribeFunc<T> skipWhile(Observable<? extends T> source, final Func1<? super T, Boolean> predicate) {
return new SkipWhile<T>(source, new Func2<T, Integer, Boolean>() {
@Override
public Boolean call(T value, Integer index) {
return predicate.call(value);
}
});
}

private static class SkipWhile<T> implements OnSubscribeFunc<T> {
private final Observable<? extends T> source;
private final Func2<? super T, Integer, Boolean> predicate;
private final AtomicBoolean skipping = new AtomicBoolean(true);
private final AtomicInteger index = new AtomicInteger(0);

SkipWhile(Observable<? extends T> source, Func2<? super T, Integer, Boolean> pred) {
this.source = source;
this.predicate = pred;
}

public Subscription onSubscribe(Observer<? super T> observer) {
return source.subscribe(new SkipWhileObserver(observer));
}

private class SkipWhileObserver implements Observer<T> {
private final Observer<? super T> observer;

public SkipWhileObserver(Observer<? super T> observer) {
this.observer = observer;
}

@Override
public void onCompleted() {
observer.onCompleted();
}

@Override
public void onError(Throwable e) {
observer.onError(e);
}

@Override
public void onNext(T next) {
if (!skipping.get()) {
observer.onNext(next);
} else {
try {
if (!predicate.call(next, index.getAndIncrement())) {
skipping.set(false);
observer.onNext(next);
} else {
}
} catch(Throwable t) {
observer.onError(t);
}
}
}

}

}

public static class UnitTest {
@SuppressWarnings("unchecked")
Observer<Integer> w = mock(Observer.class);

private static final Func1<Integer, Boolean> LESS_THAN_FIVE = new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer v) {
if (v == 42) throw new RuntimeException("that's not the answer to everything!");
return v < 5;
}
};

private static final Func2<Integer, Integer, Boolean> INDEX_LESS_THAN_THREE = new Func2<Integer, Integer, Boolean>() {
@Override
public Boolean call(Integer value, Integer index) {
return index < 3;
}
};

@Test
public void testSkipWithIndex() {
Observable<Integer> src = Observable.from(1, 2, 3, 4, 5);
create(skipWhileWithIndex(src, INDEX_LESS_THAN_THREE)).subscribe(w);

InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext(4);
inOrder.verify(w, times(1)).onNext(5);
inOrder.verify(w, times(1)).onCompleted();
inOrder.verify(w, never()).onError(any(Throwable.class));
}

@Test
public void testSkipEmpty() {
Observable<Integer> src = Observable.empty();
create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w);
verify(w, never()).onNext(anyInt());
verify(w, never()).onError(any(Throwable.class));
verify(w, times(1)).onCompleted();
}

@Test
public void testSkipEverything() {
Observable<Integer> src = Observable.from(1, 2, 3, 4, 3, 2, 1);
create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w);
verify(w, never()).onNext(anyInt());
verify(w, never()).onError(any(Throwable.class));
verify(w, times(1)).onCompleted();
}

@Test
public void testSkipNothing() {
Observable<Integer> src = Observable.from(5, 3, 1);
create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w);

InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext(5);
inOrder.verify(w, times(1)).onNext(3);
inOrder.verify(w, times(1)).onNext(1);
inOrder.verify(w, times(1)).onCompleted();
inOrder.verify(w, never()).onError(any(Throwable.class));
}

@Test
public void testSkipSome() {
Observable<Integer> src = Observable.from(1, 2, 3, 4, 5, 3, 1, 5);
create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w);

InOrder inOrder = inOrder(w);
inOrder.verify(w, times(1)).onNext(5);
inOrder.verify(w, times(1)).onNext(3);
inOrder.verify(w, times(1)).onNext(1);
inOrder.verify(w, times(1)).onNext(5);
inOrder.verify(w, times(1)).onCompleted();
inOrder.verify(w, never()).onError(any(Throwable.class));
}

@Test
public void testSkipError() {
Observable<Integer> src = Observable.from(1, 2, 42, 5, 3, 1);
create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w);

InOrder inOrder = inOrder(w);
inOrder.verify(w, never()).onNext(anyInt());
inOrder.verify(w, never()).onCompleted();
inOrder.verify(w, times(1)).onError(any(RuntimeException.class));
}
}
}

0 comments on commit 73a4ed2

Please sign in to comment.