diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 1a72f61501..010071e92f 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -49,6 +49,7 @@ import rx.operators.OperationSample; import rx.operators.OperationScan; import rx.operators.OperationSkip; +import rx.operators.OperationSkipWhile; import rx.operators.OperationSubscribeOn; import rx.operators.OperationSwitch; import rx.operators.OperationSynchronize; @@ -2341,6 +2342,34 @@ public Observable takeUntil(Observable other) { return OperationTakeUntil.takeUntil(this, other); } + /** + * Returns an Observable that bypasses all items from the source Observable as long as the specified + * condition holds true. Emits all further source items as soon as the condition becomes false. + * @param predicate + * A function to test each item emitted from the source Observable for a condition. + * It receives the emitted item as first parameter and the index of the emitted item as + * second parameter. + * @return an Observable that emits all items from the source Observable as soon as the condition + * becomes false. + * @see MSDN: Observable.SkipWhile + */ + public Observable skipWhileWithIndex(Func2 predicate) { + return create(OperationSkipWhile.skipWhileWithIndex(this, predicate)); + } + + /** + * Returns an Observable that bypasses all items from the source Observable as long as the specified + * condition holds true. Emits all further source items as soon as the condition becomes false. + * @param predicate + * A function to test each item emitted from the source Observable for a condition. + * @return an Observable that emits all items from the source Observable as soon as the condition + * becomes false. + * @see MSDN: Observable.SkipWhile + */ + public Observable skipWhile(Func1 predicate) { + return create(OperationSkipWhile.skipWhile(this, predicate)); + } + /** * Returns an Observable that emits a single item, a list composed of all the items emitted by * the source Observable. diff --git a/rxjava-core/src/main/java/rx/operators/OperationSkipWhile.java b/rxjava-core/src/main/java/rx/operators/OperationSkipWhile.java new file mode 100644 index 0000000000..8277f80fc2 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationSkipWhile.java @@ -0,0 +1,193 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; +import static rx.Observable.create; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; +import org.mockito.InOrder; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; +import rx.util.functions.Func1; +import rx.util.functions.Func2; + +/** + * Skips any emitted source items as long as the specified condition holds true. Emits all further source items + * as soon as the condition becomes false. + */ +public final class OperationSkipWhile { + public static OnSubscribeFunc skipWhileWithIndex(Observable source, Func2 predicate) { + return new SkipWhile(source, predicate); + } + + public static OnSubscribeFunc skipWhile(Observable source, final Func1 predicate) { + return new SkipWhile(source, new Func2() { + @Override + public Boolean call(T value, Integer index) { + return predicate.call(value); + } + }); + } + + private static class SkipWhile implements OnSubscribeFunc { + private final Observable source; + private final Func2 predicate; + private final AtomicBoolean skipping = new AtomicBoolean(true); + private final AtomicInteger index = new AtomicInteger(0); + + SkipWhile(Observable source, Func2 pred) { + this.source = source; + this.predicate = pred; + } + + public Subscription onSubscribe(Observer observer) { + return source.subscribe(new SkipWhileObserver(observer)); + } + + private class SkipWhileObserver implements Observer { + private final Observer observer; + + public SkipWhileObserver(Observer observer) { + this.observer = observer; + } + + @Override + public void onCompleted() { + observer.onCompleted(); + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + } + + @Override + public void onNext(T next) { + if (!skipping.get()) { + observer.onNext(next); + } else { + try { + if (!predicate.call(next, index.getAndIncrement())) { + skipping.set(false); + observer.onNext(next); + } else { + } + } catch(Throwable t) { + observer.onError(t); + } + } + } + + } + + } + + public static class UnitTest { + @SuppressWarnings("unchecked") + Observer w = mock(Observer.class); + + private static final Func1 LESS_THAN_FIVE = new Func1() { + @Override + public Boolean call(Integer v) { + if (v == 42) throw new RuntimeException("that's not the answer to everything!"); + return v < 5; + } + }; + + private static final Func2 INDEX_LESS_THAN_THREE = new Func2() { + @Override + public Boolean call(Integer value, Integer index) { + return index < 3; + } + }; + + @Test + public void testSkipWithIndex() { + Observable src = Observable.from(1, 2, 3, 4, 5); + create(skipWhileWithIndex(src, INDEX_LESS_THAN_THREE)).subscribe(w); + + InOrder inOrder = inOrder(w); + inOrder.verify(w, times(1)).onNext(4); + inOrder.verify(w, times(1)).onNext(5); + inOrder.verify(w, times(1)).onCompleted(); + inOrder.verify(w, never()).onError(any(Throwable.class)); + } + + @Test + public void testSkipEmpty() { + Observable src = Observable.empty(); + create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w); + verify(w, never()).onNext(anyInt()); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + + @Test + public void testSkipEverything() { + Observable src = Observable.from(1, 2, 3, 4, 3, 2, 1); + create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w); + verify(w, never()).onNext(anyInt()); + verify(w, never()).onError(any(Throwable.class)); + verify(w, times(1)).onCompleted(); + } + + @Test + public void testSkipNothing() { + Observable src = Observable.from(5, 3, 1); + create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w); + + InOrder inOrder = inOrder(w); + inOrder.verify(w, times(1)).onNext(5); + inOrder.verify(w, times(1)).onNext(3); + inOrder.verify(w, times(1)).onNext(1); + inOrder.verify(w, times(1)).onCompleted(); + inOrder.verify(w, never()).onError(any(Throwable.class)); + } + + @Test + public void testSkipSome() { + Observable src = Observable.from(1, 2, 3, 4, 5, 3, 1, 5); + create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w); + + InOrder inOrder = inOrder(w); + inOrder.verify(w, times(1)).onNext(5); + inOrder.verify(w, times(1)).onNext(3); + inOrder.verify(w, times(1)).onNext(1); + inOrder.verify(w, times(1)).onNext(5); + inOrder.verify(w, times(1)).onCompleted(); + inOrder.verify(w, never()).onError(any(Throwable.class)); + } + + @Test + public void testSkipError() { + Observable src = Observable.from(1, 2, 42, 5, 3, 1); + create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w); + + InOrder inOrder = inOrder(w); + inOrder.verify(w, never()).onNext(anyInt()); + inOrder.verify(w, never()).onCompleted(); + inOrder.verify(w, times(1)).onError(any(RuntimeException.class)); + } + } +}