Skip to content

Commit

Permalink
Fixed the issue about null values
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Sep 29, 2013
1 parent 02e14dc commit 5200cfc
Showing 1 changed file with 74 additions and 32 deletions.
106 changes: 74 additions & 32 deletions rxjava-core/src/main/java/rx/operators/OperationTakeLast.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,22 @@
*/
package rx.operators;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.Iterator;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.locks.ReentrantLock;

import org.junit.Test;
import org.mockito.InOrder;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.subscriptions.Subscriptions;
import rx.Observer;
import rx.Subscription;

Expand Down Expand Up @@ -60,45 +64,36 @@ private static class TakeLast<T> implements OnSubscribeFunc<T> {
}

public Subscription onSubscribe(Observer<? super T> observer) {
if(count == 0) {
items.subscribe(new Observer<T>() {

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(T args) {
}

}).unsubscribe();
observer.onCompleted();
return Subscriptions.empty();
if (count < 0) {
throw new IndexOutOfBoundsException(
"count could not be negative");
}

return subscription.wrap(items.subscribe(new ItemObserver(observer)));
}

private class ItemObserver implements Observer<T> {

private LinkedBlockingDeque<T> deque = new LinkedBlockingDeque<T>(count);
/**
* Store the last count elements until now.
*/
private Deque<T> deque = new LinkedList<T>();
private final Observer<? super T> observer;
private final ReentrantLock lock = new ReentrantLock();

public ItemObserver(Observer<? super T> observer) {
this.observer = observer;
}

@Override
public void onCompleted() {
Iterator<T> reverse = deque.descendingIterator();
while (reverse.hasNext()) {
observer.onNext(reverse.next());
try {
for (T value : deque) {
observer.onNext(value);
}
observer.onCompleted();
} catch (Throwable e) {
observer.onError(e);
}
observer.onCompleted();
}

@Override
Expand All @@ -107,9 +102,27 @@ public void onError(Throwable e) {
}

@Override
public void onNext(T args) {
while (!deque.offerFirst(args)) {
deque.removeLast();
public void onNext(T value) {
if (count == 0) {
// If count == 0, we do not need to put value into deque and
// remove it at once. We can ignore the value directly.
return;
}
lock.lock();
try {
deque.offerLast(value);
if (deque.size() > count) {
// Now deque has count + 1 elements, so the first
// element in the deque definitely does not belong
// to the last count elements of the source
// sequence. We can drop it now.
deque.removeFirst();
}
} catch (Throwable e) {
observer.onError(e);
subscription.unsubscribe();
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -174,6 +187,35 @@ public void testTakeLastWithZeroCount() {
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testTakeLastWithNull() {
Observable<String> w = Observable.from("one", null, "three");
Observable<String> take = Observable.create(takeLast(w, 2));

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
take.subscribe(aObserver);
verify(aObserver, never()).onNext("one");
verify(aObserver, times(1)).onNext(null);
verify(aObserver, times(1)).onNext("three");
verify(aObserver, never()).onError(any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testTakeLastWithNegativeCount() {
Observable<String> w = Observable.from("one");
Observable<String> take = Observable.create(takeLast(w, -1));

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
take.subscribe(aObserver);
verify(aObserver, never()).onNext("one");
verify(aObserver, times(1)).onError(
any(IndexOutOfBoundsException.class));
verify(aObserver, never()).onCompleted();
}

}

}

0 comments on commit 5200cfc

Please sign in to comment.