Skip to content

Commit

Permalink
Merge pull request ReactiveX#413 from zsxwing/take-last
Browse files Browse the repository at this point in the history
Fixed the issues of takeLast(items, 0) and null values
  • Loading branch information
benjchristensen committed Oct 9, 2013
2 parents eaff4d5 + 5200cfc commit 3956ca2
Showing 1 changed file with 88 additions and 12 deletions.
100 changes: 88 additions & 12 deletions rxjava-core/src/main/java/rx/operators/OperationTakeLast.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
*/
package rx.operators;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.Iterator;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.locks.ReentrantLock;

import org.junit.Test;
import org.mockito.InOrder;
Expand Down Expand Up @@ -59,25 +64,36 @@ private static class TakeLast<T> implements OnSubscribeFunc<T> {
}

public Subscription onSubscribe(Observer<? super T> observer) {
if (count < 0) {
throw new IndexOutOfBoundsException(
"count could not be negative");
}
return subscription.wrap(items.subscribe(new ItemObserver(observer)));
}

private class ItemObserver implements Observer<T> {

private LinkedBlockingDeque<T> deque = new LinkedBlockingDeque<T>(count);
/**
* Store the last count elements until now.
*/
private Deque<T> deque = new LinkedList<T>();
private final Observer<? super T> observer;
private final ReentrantLock lock = new ReentrantLock();

public ItemObserver(Observer<? super T> observer) {
this.observer = observer;
}

@Override
public void onCompleted() {
Iterator<T> reverse = deque.descendingIterator();
while (reverse.hasNext()) {
observer.onNext(reverse.next());
try {
for (T value : deque) {
observer.onNext(value);
}
observer.onCompleted();
} catch (Throwable e) {
observer.onError(e);
}
observer.onCompleted();
}

@Override
Expand All @@ -86,9 +102,27 @@ public void onError(Throwable e) {
}

@Override
public void onNext(T args) {
while (!deque.offerFirst(args)) {
deque.removeLast();
public void onNext(T value) {
if (count == 0) {
// If count == 0, we do not need to put value into deque and
// remove it at once. We can ignore the value directly.
return;
}
lock.lock();
try {
deque.offerLast(value);
if (deque.size() > count) {
// Now deque has count + 1 elements, so the first
// element in the deque definitely does not belong
// to the last count elements of the source
// sequence. We can drop it now.
deque.removeFirst();
}
} catch (Throwable e) {
observer.onError(e);
subscription.unsubscribe();
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -140,6 +174,48 @@ public void testTakeLast2() {
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testTakeLastWithZeroCount() {
Observable<String> w = Observable.from("one");
Observable<String> take = Observable.create(takeLast(w, 0));

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
take.subscribe(aObserver);
verify(aObserver, never()).onNext("one");
verify(aObserver, never()).onError(any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testTakeLastWithNull() {
Observable<String> w = Observable.from("one", null, "three");
Observable<String> take = Observable.create(takeLast(w, 2));

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
take.subscribe(aObserver);
verify(aObserver, never()).onNext("one");
verify(aObserver, times(1)).onNext(null);
verify(aObserver, times(1)).onNext("three");
verify(aObserver, never()).onError(any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testTakeLastWithNegativeCount() {
Observable<String> w = Observable.from("one");
Observable<String> take = Observable.create(takeLast(w, -1));

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
take.subscribe(aObserver);
verify(aObserver, never()).onNext("one");
verify(aObserver, times(1)).onError(
any(IndexOutOfBoundsException.class));
verify(aObserver, never()).onCompleted();
}

}

}

0 comments on commit 3956ca2

Please sign in to comment.