From d0339e5d77710d4e000c8d58afd76593d7bd2ba3 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Wed, 13 Feb 2013 01:00:19 +0200 Subject: [PATCH 1/5] TakeLast basic implementation --- .../java/rx/operators/OperationTakeLast.java | 139 ++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationTakeLast.java diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java new file mode 100644 index 00000000000..a3116dea583 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java @@ -0,0 +1,139 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import org.junit.Test; +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.util.AtomicObservableSubscription; +import rx.util.functions.Func1; + +import java.util.Iterator; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + +public final class OperationTakeLast { + + public static Func1, Subscription> takeLast(final Observable items, final int count) { + return new Func1, Subscription>() { + + @Override + public Subscription call(Observer observer) { + return new TakeLast(items, count).call(observer); + } + + }; + } + + private static class TakeLast implements Func1, Subscription> { + private final int count; + private final Observable items; + private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + + TakeLast(final Observable items, final int count) { + this.count = count; + this.items = items; + } + + public Subscription call(Observer observer) { + return subscription.wrap(items.subscribe(new ItemObserver(observer))); + } + + private class ItemObserver implements Observer { + + private LinkedBlockingDeque deque = new LinkedBlockingDeque(count); + private final Observer observer; + + public ItemObserver(Observer observer) { + this.observer = observer; + } + + @Override + public void onCompleted() { + Iterator reverse = deque.descendingIterator(); + while (reverse.hasNext()) { + observer.onNext(reverse.next()); + } + observer.onCompleted(); + } + + @Override + public void onError(Exception e) { + observer.onError(e); + } + + @Override + public void onNext(T args) { + while (!deque.offerFirst(args)) { + deque.removeLast(); + } + } + + } + + } + + public static class UnitTest { + + @Test + public void testTakeLastEmpty() { + Observable w = Observable.toObservable(); + Observable take = Observable.create(takeLast(w, 2)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + verify(aObserver, never()).onNext(any(String.class)); + verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testTakeLast1() { + Observable w = Observable.toObservable("one", "two", "three"); + Observable take = Observable.create(takeLast(w, 2)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + verify(aObserver, times(1)).onNext("two"); + verify(aObserver, times(1)).onNext("three"); + verify(aObserver, never()).onNext("one"); + verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testTakeLast2() { + Observable w = Observable.toObservable("one"); + Observable take = Observable.create(takeLast(w, 10)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + } + + } + +} \ No newline at end of file From da1ab613c9acba303e70589b242f906ffdad001a Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Wed, 13 Feb 2013 01:06:35 +0200 Subject: [PATCH 2/5] Added takeLast to Observable --- rxjava-core/src/main/java/rx/Observable.java | 30 +++++++++++++++++++ .../java/rx/operators/OperationTakeLast.java | 5 ++-- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 0df09cb3c7f..d234a73ba25 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -1347,6 +1347,22 @@ public static Observable take(final Observable items, final int num) { return _create(OperationTake.take(items, num)); } + /** + * Returns an Observable that emits the last count items emitted by the source + * Observable. + * + * @param items + * the source Observable + * @param count + * the number of items from the end of the sequence emitted by the source + * Observable to emit + * @return an Observable that only emits the last count items emitted by the source + * Observable + */ + public static Observable takeLast(final Observable items, final int count) { + return _create(OperationTakeLast.takeLast(items, count)); + } + /** * Returns an Observable that emits a single item, a list composed of all the items emitted by * the source Observable. @@ -2235,6 +2251,20 @@ public Observable take(final int num) { return take(this, num); } + /** + * Returns an Observable that emits the last count items emitted by the source + * Observable. + * + * @param count + * the number of items from the end of the sequence emitted by the source + * Observable to emit + * @return an Observable that only emits the last count items emitted by the source + * Observable + */ + public Observable takeLast(final int count) { + return takeLast(this, count); + } + /** * Returns an Observable that emits a single item, a list composed of all the items emitted by * the source Observable. diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java index a3116dea583..57e336739e2 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java @@ -23,13 +23,14 @@ import rx.util.functions.Func1; import java.util.Iterator; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.atomic.AtomicInteger; import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; +/** + * Returns a specified number of contiguous elements from the end of an observable sequence. + */ public final class OperationTakeLast { public static Func1, Subscription> takeLast(final Observable items, final int count) { From a59b069d6f52f7c537b1b314240829acdf4c5ee9 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Wed, 13 Feb 2013 01:17:00 +0200 Subject: [PATCH 3/5] Fixed import --- rxjava-core/src/main/java/rx/Observable.java | 1 + 1 file changed, 1 insertion(+) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index d234a73ba25..fbb001dc538 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -46,6 +46,7 @@ import rx.operators.OperationSkip; import rx.operators.OperationSynchronize; import rx.operators.OperationTake; +import rx.operators.OperationTakeLast; import rx.operators.OperationToObservableFuture; import rx.operators.OperationToObservableIterable; import rx.operators.OperationToObservableList; From 6275de4179523c39a685386ff055b6bc50f172eb Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Wed, 13 Feb 2013 01:17:18 +0200 Subject: [PATCH 4/5] Added ordering unit test --- .../java/rx/operators/OperationTakeLast.java | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java index 57e336739e2..2e8547fd7b3 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicInteger; import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; @@ -135,6 +136,39 @@ public void testTakeLast2() { verify(aObserver, times(1)).onCompleted(); } + @Test + public void testTakeLastOrdering() { + Observable w = Observable.toObservable("one", "two", "three"); + Observable take = Observable.create(takeLast(w, 2)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(countingWrapper(aObserver)); + verify(aObserver, times(1)).onNext("two_1"); + verify(aObserver, times(1)).onNext("three_2"); + } + + + private static Observer countingWrapper(final Observer underlying) { + return new Observer() { + private final AtomicInteger counter = new AtomicInteger(); + @Override + public void onCompleted() { + underlying.onCompleted(); + } + + @Override + public void onError(Exception e) { + underlying.onCompleted(); + } + + @Override + public void onNext(String args) { + underlying.onNext(args + "_" + counter.incrementAndGet()); + } + }; + } + } } \ No newline at end of file From cc4789dcd88c76edc3d55df0b6486fd736f044e7 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Wed, 13 Feb 2013 01:18:23 +0200 Subject: [PATCH 5/5] styling: new line at the end of the file --- rxjava-core/src/main/java/rx/operators/OperationTakeLast.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java index 2e8547fd7b3..fef59da6540 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java @@ -171,4 +171,4 @@ public void onNext(String args) { } -} \ No newline at end of file +}