From 5200cfcd432beaa21e20052ce9e54de1c274f8fa Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 29 Sep 2013 16:26:56 +0800 Subject: [PATCH] Fixed the issue about null values --- .../java/rx/operators/OperationTakeLast.java | 106 ++++++++++++------ 1 file changed, 74 insertions(+), 32 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java index 3ae2e7fc410..e83f5dc4328 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java @@ -15,18 +15,22 @@ */ package rx.operators; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; -import java.util.Iterator; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.Deque; +import java.util.LinkedList; +import java.util.concurrent.locks.ReentrantLock; import org.junit.Test; import org.mockito.InOrder; import rx.Observable; import rx.Observable.OnSubscribeFunc; -import rx.subscriptions.Subscriptions; import rx.Observer; import rx.Subscription; @@ -60,33 +64,21 @@ private static class TakeLast implements OnSubscribeFunc { } public Subscription onSubscribe(Observer observer) { - if(count == 0) { - items.subscribe(new Observer() { - - @Override - public void onCompleted() { - } - - @Override - public void onError(Throwable e) { - } - - @Override - public void onNext(T args) { - } - - }).unsubscribe(); - observer.onCompleted(); - return Subscriptions.empty(); + if (count < 0) { + throw new IndexOutOfBoundsException( + "count could not be negative"); } - return subscription.wrap(items.subscribe(new ItemObserver(observer))); } private class ItemObserver implements Observer { - private LinkedBlockingDeque deque = new LinkedBlockingDeque(count); + /** + * Store the last count elements until now. + */ + private Deque deque = new LinkedList(); private final Observer observer; + private final ReentrantLock lock = new ReentrantLock(); public ItemObserver(Observer observer) { this.observer = observer; @@ -94,11 +86,14 @@ public ItemObserver(Observer observer) { @Override public void onCompleted() { - Iterator reverse = deque.descendingIterator(); - while (reverse.hasNext()) { - observer.onNext(reverse.next()); + try { + for (T value : deque) { + observer.onNext(value); + } + observer.onCompleted(); + } catch (Throwable e) { + observer.onError(e); } - observer.onCompleted(); } @Override @@ -107,9 +102,27 @@ public void onError(Throwable e) { } @Override - public void onNext(T args) { - while (!deque.offerFirst(args)) { - deque.removeLast(); + public void onNext(T value) { + if (count == 0) { + // If count == 0, we do not need to put value into deque and + // remove it at once. We can ignore the value directly. + return; + } + lock.lock(); + try { + deque.offerLast(value); + if (deque.size() > count) { + // Now deque has count + 1 elements, so the first + // element in the deque definitely does not belong + // to the last count elements of the source + // sequence. We can drop it now. + deque.removeFirst(); + } + } catch (Throwable e) { + observer.onError(e); + subscription.unsubscribe(); + } finally { + lock.unlock(); } } @@ -174,6 +187,35 @@ public void testTakeLastWithZeroCount() { verify(aObserver, times(1)).onCompleted(); } + @Test + public void testTakeLastWithNull() { + Observable w = Observable.from("one", null, "three"); + Observable take = Observable.create(takeLast(w, 2)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + verify(aObserver, never()).onNext("one"); + verify(aObserver, times(1)).onNext(null); + verify(aObserver, times(1)).onNext("three"); + verify(aObserver, never()).onError(any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testTakeLastWithNegativeCount() { + Observable w = Observable.from("one"); + Observable take = Observable.create(takeLast(w, -1)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + verify(aObserver, never()).onNext("one"); + verify(aObserver, times(1)).onError( + any(IndexOutOfBoundsException.class)); + verify(aObserver, never()).onCompleted(); + } + } }