From ea575d0911b6b98b1b4ab537a97c6c56e26f96d9 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 26 Sep 2013 09:40:18 +0800 Subject: [PATCH 1/3] Merge branch 'master', remote-tracking branch 'origin' From eba034e2f98ff160103dda26247509bb9cc95026 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 29 Sep 2013 10:24:56 +0800 Subject: [PATCH 2/3] Fixed the issue of takeLast(items, 0) --- .../java/rx/operators/OperationTakeLast.java | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java index 46ff21e9e2..3ae2e7fc41 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java @@ -26,6 +26,7 @@ import rx.Observable; import rx.Observable.OnSubscribeFunc; +import rx.subscriptions.Subscriptions; import rx.Observer; import rx.Subscription; @@ -59,6 +60,26 @@ private static class TakeLast implements OnSubscribeFunc { } public Subscription onSubscribe(Observer observer) { + if(count == 0) { + items.subscribe(new Observer() { + + @Override + public void onCompleted() { + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onNext(T args) { + } + + }).unsubscribe(); + observer.onCompleted(); + return Subscriptions.empty(); + } + return subscription.wrap(items.subscribe(new ItemObserver(observer))); } @@ -140,6 +161,19 @@ public void testTakeLast2() { verify(aObserver, times(1)).onCompleted(); } + @Test + public void testTakeLastWithZeroCount() { + Observable w = Observable.from("one"); + Observable take = Observable.create(takeLast(w, 0)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + verify(aObserver, never()).onNext("one"); + verify(aObserver, never()).onError(any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + } } From f7ae906f19c6b1c608feb7fe3b0d40b6100fcf6d Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 29 Sep 2013 16:26:56 +0800 Subject: [PATCH 3/3] Fixed the issue about null values --- .../java/rx/operators/OperationTakeLast.java | 106 ++++++++++++------ 1 file changed, 74 insertions(+), 32 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java index 3ae2e7fc41..e83f5dc432 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java @@ -15,18 +15,22 @@ */ package rx.operators; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; -import java.util.Iterator; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.Deque; +import java.util.LinkedList; +import java.util.concurrent.locks.ReentrantLock; import org.junit.Test; import org.mockito.InOrder; import rx.Observable; import rx.Observable.OnSubscribeFunc; -import rx.subscriptions.Subscriptions; import rx.Observer; import rx.Subscription; @@ -60,33 +64,21 @@ private static class TakeLast implements OnSubscribeFunc { } public Subscription onSubscribe(Observer observer) { - if(count == 0) { - items.subscribe(new Observer() { - - @Override - public void onCompleted() { - } - - @Override - public void onError(Throwable e) { - } - - @Override - public void onNext(T args) { - } - - }).unsubscribe(); - observer.onCompleted(); - return Subscriptions.empty(); + if (count < 0) { + throw new IndexOutOfBoundsException( + "count could not be negative"); } - return subscription.wrap(items.subscribe(new ItemObserver(observer))); } private class ItemObserver implements Observer { - private LinkedBlockingDeque deque = new LinkedBlockingDeque(count); + /** + * Store the last count elements until now. + */ + private Deque deque = new LinkedList(); private final Observer observer; + private final ReentrantLock lock = new ReentrantLock(); public ItemObserver(Observer observer) { this.observer = observer; @@ -94,11 +86,14 @@ public ItemObserver(Observer observer) { @Override public void onCompleted() { - Iterator reverse = deque.descendingIterator(); - while (reverse.hasNext()) { - observer.onNext(reverse.next()); + try { + for (T value : deque) { + observer.onNext(value); + } + observer.onCompleted(); + } catch (Throwable e) { + observer.onError(e); } - observer.onCompleted(); } @Override @@ -107,9 +102,27 @@ public void onError(Throwable e) { } @Override - public void onNext(T args) { - while (!deque.offerFirst(args)) { - deque.removeLast(); + public void onNext(T value) { + if (count == 0) { + // If count == 0, we do not need to put value into deque and + // remove it at once. We can ignore the value directly. + return; + } + lock.lock(); + try { + deque.offerLast(value); + if (deque.size() > count) { + // Now deque has count + 1 elements, so the first + // element in the deque definitely does not belong + // to the last count elements of the source + // sequence. We can drop it now. + deque.removeFirst(); + } + } catch (Throwable e) { + observer.onError(e); + subscription.unsubscribe(); + } finally { + lock.unlock(); } } @@ -174,6 +187,35 @@ public void testTakeLastWithZeroCount() { verify(aObserver, times(1)).onCompleted(); } + @Test + public void testTakeLastWithNull() { + Observable w = Observable.from("one", null, "three"); + Observable take = Observable.create(takeLast(w, 2)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + verify(aObserver, never()).onNext("one"); + verify(aObserver, times(1)).onNext(null); + verify(aObserver, times(1)).onNext("three"); + verify(aObserver, never()).onError(any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testTakeLastWithNegativeCount() { + Observable w = Observable.from("one"); + Observable take = Observable.create(takeLast(w, -1)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + verify(aObserver, never()).onNext("one"); + verify(aObserver, times(1)).onError( + any(IndexOutOfBoundsException.class)); + verify(aObserver, never()).onCompleted(); + } + } }