diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index a738c20ac64..b8c0e0c5ef9 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -46,6 +46,7 @@ import rx.operators.OperationSkip; import rx.operators.OperationSynchronize; import rx.operators.OperationTake; +import rx.operators.OperationTakeLast; import rx.operators.OperationToObservableFuture; import rx.operators.OperationToObservableIterable; import rx.operators.OperationToObservableList; @@ -1347,6 +1348,22 @@ public static Observable take(final Observable items, final int num) { return _create(OperationTake.take(items, num)); } + /** + * Returns an Observable that emits the last count items emitted by the source + * Observable. + * + * @param items + * the source Observable + * @param count + * the number of items from the end of the sequence emitted by the source + * Observable to emit + * @return an Observable that only emits the last count items emitted by the source + * Observable + */ + public static Observable takeLast(final Observable items, final int count) { + return _create(OperationTakeLast.takeLast(items, count)); + } + /** * Returns an Observable that emits a single item, a list composed of all the items emitted by * the source Observable. @@ -2279,6 +2296,20 @@ public Observable take(final int num) { return take(this, num); } + /** + * Returns an Observable that emits the last count items emitted by the source + * Observable. + * + * @param count + * the number of items from the end of the sequence emitted by the source + * Observable to emit + * @return an Observable that only emits the last count items emitted by the source + * Observable + */ + public Observable takeLast(final int count) { + return takeLast(this, count); + } + /** * Returns an Observable that emits a single item, a list composed of all the items emitted by * the source Observable. diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java new file mode 100644 index 00000000000..fef59da6540 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java @@ -0,0 +1,174 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import org.junit.Test; +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.util.AtomicObservableSubscription; +import rx.util.functions.Func1; + +import java.util.Iterator; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + +/** + * Returns a specified number of contiguous elements from the end of an observable sequence. + */ +public final class OperationTakeLast { + + public static Func1, Subscription> takeLast(final Observable items, final int count) { + return new Func1, Subscription>() { + + @Override + public Subscription call(Observer observer) { + return new TakeLast(items, count).call(observer); + } + + }; + } + + private static class TakeLast implements Func1, Subscription> { + private final int count; + private final Observable items; + private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + + TakeLast(final Observable items, final int count) { + this.count = count; + this.items = items; + } + + public Subscription call(Observer observer) { + return subscription.wrap(items.subscribe(new ItemObserver(observer))); + } + + private class ItemObserver implements Observer { + + private LinkedBlockingDeque deque = new LinkedBlockingDeque(count); + private final Observer observer; + + public ItemObserver(Observer observer) { + this.observer = observer; + } + + @Override + public void onCompleted() { + Iterator reverse = deque.descendingIterator(); + while (reverse.hasNext()) { + observer.onNext(reverse.next()); + } + observer.onCompleted(); + } + + @Override + public void onError(Exception e) { + observer.onError(e); + } + + @Override + public void onNext(T args) { + while (!deque.offerFirst(args)) { + deque.removeLast(); + } + } + + } + + } + + public static class UnitTest { + + @Test + public void testTakeLastEmpty() { + Observable w = Observable.toObservable(); + Observable take = Observable.create(takeLast(w, 2)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + verify(aObserver, never()).onNext(any(String.class)); + verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testTakeLast1() { + Observable w = Observable.toObservable("one", "two", "three"); + Observable take = Observable.create(takeLast(w, 2)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, times(1)).onNext("three"); + verify(aObserver, never()).onNext("one"); + verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testTakeLast2() { + Observable w = Observable.toObservable("one"); + Observable take = Observable.create(takeLast(w, 10)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testTakeLastOrdering() { + Observable w = Observable.toObservable("one", "two", "three"); + Observable take = Observable.create(takeLast(w, 2)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(countingWrapper(aObserver)); + verify(aObserver, times(1)).onNext("two_1"); + verify(aObserver, times(1)).onNext("three_2"); + } + + + private static Observer countingWrapper(final Observer underlying) { + return new Observer() { + private final AtomicInteger counter = new AtomicInteger(); + @Override + public void onCompleted() { + underlying.onCompleted(); + } + + @Override + public void onError(Exception e) { + underlying.onCompleted(); + } + + @Override + public void onNext(String args) { + underlying.onNext(args + "_" + counter.incrementAndGet()); + } + }; + } + + } + +}