diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java index 46ff21e9e20..e83f5dc4328 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java @@ -15,11 +15,16 @@ */ package rx.operators; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; -import java.util.Iterator; -import java.util.concurrent.LinkedBlockingDeque; +import java.util.Deque; +import java.util.LinkedList; +import java.util.concurrent.locks.ReentrantLock; import org.junit.Test; import org.mockito.InOrder; @@ -59,13 +64,21 @@ private static class TakeLast implements OnSubscribeFunc { } public Subscription onSubscribe(Observer observer) { + if (count < 0) { + throw new IndexOutOfBoundsException( + "count could not be negative"); + } return subscription.wrap(items.subscribe(new ItemObserver(observer))); } private class ItemObserver implements Observer { - private LinkedBlockingDeque deque = new LinkedBlockingDeque(count); + /** + * Store the last count elements until now. + */ + private Deque deque = new LinkedList(); private final Observer observer; + private final ReentrantLock lock = new ReentrantLock(); public ItemObserver(Observer observer) { this.observer = observer; @@ -73,11 +86,14 @@ public ItemObserver(Observer observer) { @Override public void onCompleted() { - Iterator reverse = deque.descendingIterator(); - while (reverse.hasNext()) { - observer.onNext(reverse.next()); + try { + for (T value : deque) { + observer.onNext(value); + } + observer.onCompleted(); + } catch (Throwable e) { + observer.onError(e); } - observer.onCompleted(); } @Override @@ -86,9 +102,27 @@ public void onError(Throwable e) { } @Override - public void onNext(T args) { - while (!deque.offerFirst(args)) { - deque.removeLast(); + public void onNext(T value) { + if (count == 0) { + // If count == 0, we do not need to put value into deque and + // remove it at once. We can ignore the value directly. + return; + } + lock.lock(); + try { + deque.offerLast(value); + if (deque.size() > count) { + // Now deque has count + 1 elements, so the first + // element in the deque definitely does not belong + // to the last count elements of the source + // sequence. We can drop it now. + deque.removeFirst(); + } + } catch (Throwable e) { + observer.onError(e); + subscription.unsubscribe(); + } finally { + lock.unlock(); } } @@ -140,6 +174,48 @@ public void testTakeLast2() { verify(aObserver, times(1)).onCompleted(); } + @Test + public void testTakeLastWithZeroCount() { + Observable w = Observable.from("one"); + Observable take = Observable.create(takeLast(w, 0)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + verify(aObserver, never()).onNext("one"); + verify(aObserver, never()).onError(any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testTakeLastWithNull() { + Observable w = Observable.from("one", null, "three"); + Observable take = Observable.create(takeLast(w, 2)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + verify(aObserver, never()).onNext("one"); + verify(aObserver, times(1)).onNext(null); + verify(aObserver, times(1)).onNext("three"); + verify(aObserver, never()).onError(any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testTakeLastWithNegativeCount() { + Observable w = Observable.from("one"); + Observable take = Observable.create(takeLast(w, -1)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + take.subscribe(aObserver); + verify(aObserver, never()).onNext("one"); + verify(aObserver, times(1)).onError( + any(IndexOutOfBoundsException.class)); + verify(aObserver, never()).onCompleted(); + } + } }