Skip to content

Commit

Permalink
add OperatorTakeLastOne
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmoten committed Apr 24, 2015
1 parent 5b75d32 commit 0954ee8
Show file tree
Hide file tree
Showing 4 changed files with 323 additions and 1 deletion.
7 changes: 6 additions & 1 deletion src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -7771,7 +7771,12 @@ public final Observable<T> takeFirst(Func1<? super T, Boolean> predicate) {
* @see <a href="http://reactivex.io/documentation/operators/takelast.html">ReactiveX operators documentation: TakeLast</a>
*/
public final Observable<T> takeLast(final int count) {
return lift(new OperatorTakeLast<T>(count));
if (count == 0)
return ignoreElements();
else if (count == 1 )
return lift(OperatorTakeLastOne.<T>instance());
else
return lift(new OperatorTakeLast<T>(count));
}

/**
Expand Down
168 changes: 168 additions & 0 deletions src/main/java/rx/internal/operators/OperatorTakeLastOne.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package rx.internal.operators;

import java.util.concurrent.atomic.AtomicInteger;

import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;

public class OperatorTakeLastOne<T> implements Operator<T, T> {

private static class Holder {
static final OperatorTakeLastOne<Object> INSTANCE = new OperatorTakeLastOne<Object>();
}

@SuppressWarnings("unchecked")
public static <T> OperatorTakeLastOne<T> instance() {
return (OperatorTakeLastOne<T>) Holder.INSTANCE;
}

private OperatorTakeLastOne() {

}

@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
final ParentSubscriber<T> parent = new ParentSubscriber<T>(child);
child.setProducer(new Producer() {

@Override
public void request(long n) {
parent.requestMore(n);
}
});
child.add(parent);
return parent;
}

private static class ParentSubscriber<T> extends Subscriber<T> {

private final static int NOT_REQUESTED_NOT_COMPLETED = 0;
private final static int NOT_REQUESTED_COMPLETED = 1;
private final static int REQUESTED_NOT_COMPLETED = 2;
private final static int REQUESTED_COMPLETED = 3;

/*
* These are the expected state transitions:
*
* NOT_REQUESTED_NOT_COMPLETED --> REQUESTED_NOT_COMPLETED
* | |
* V V
* NOT_REQUESTED_COMPLETED --> REQUESTED_COMPLETED
*
* Once at REQUESTED_COMPLETED we emit the last value if one exists
*/

// Used as the initial value of last
private static final Object ABSENT = new Object();

// the downstream subscriber
private final Subscriber<? super T> child;

@SuppressWarnings("unchecked")
// we can get away with this cast at runtime because of type erasure
private T last = (T) ABSENT;

// holds the current state of the stream so that we can make atomic
// updates to it
private final AtomicInteger state = new AtomicInteger(NOT_REQUESTED_NOT_COMPLETED);

ParentSubscriber(Subscriber<? super T> child) {
this.child = child;
}

void requestMore(long n) {
if (n > 0) {
// CAS loop to atomically change state given that onCompleted()
// or another requestMore() may be acting concurrently
while (true) {
// read the value of state and then try state transitions
// only if the value of state does not change in the
// meantime (in another requestMore() or onCompleted()). If
// the value has changed and we expect to do a transition
// still then we loop and try again.
final int s = state.get();
if (s == NOT_REQUESTED_NOT_COMPLETED) {
if (state.compareAndSet(NOT_REQUESTED_NOT_COMPLETED,
REQUESTED_NOT_COMPLETED)) {
return;
}
} else if (s == NOT_REQUESTED_COMPLETED) {
if (state.compareAndSet(NOT_REQUESTED_COMPLETED, REQUESTED_COMPLETED)) {
emit();
return;
}
} else
// already requested so we exit
return;
}
}
}

@Override
public void onCompleted() {
// CAS loop to atomically change state given that requestMore()
// may be acting concurrently
while (true) {
// read the value of state and then try state transitions
// only if the value of state does not change in the meantime
// (in another requestMore()). If the value has changed and
// we expect to do a transition still then we loop and try
// again.
final int s = state.get();
if (s == NOT_REQUESTED_NOT_COMPLETED) {
if (state.compareAndSet(NOT_REQUESTED_NOT_COMPLETED, NOT_REQUESTED_COMPLETED)) {
return;
}
} else if (s == REQUESTED_NOT_COMPLETED) {
if (state.compareAndSet(REQUESTED_NOT_COMPLETED, REQUESTED_COMPLETED)) {
emit();
return;
}
} else
// already completed so we exit
return;
}
}

/**
* If not unsubscribed then emits last value and completed to the child
* subscriber.
*/
private void emit() {
if (isUnsubscribed()) {
// release for gc
last = null;
return;
}
// Note that last is safely published despite not being volatile
// because a CAS update must have happened in the current thread just before
// emit() was called
T t = last;
// release for gc
last = null;
if (t != ABSENT) {
try {
child.onNext(t);
} catch (Throwable e) {
child.onError(e);
return;
}
}
if (!isUnsubscribed())
child.onCompleted();
}

@Override
public void onError(Throwable e) {
child.onError(e);
}

@Override
public void onNext(T t) {
last = t;
}

}

}
39 changes: 39 additions & 0 deletions src/perf/java/rx/operators/OperatorTakeLastOnePerf.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package rx.operators;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;

import rx.internal.operators.OperatorTakeLast;
import rx.internal.operators.OperatorTakeLastOne;
import rx.jmh.InputWithIncrementingInteger;

public class OperatorTakeLastOnePerf {

private static final OperatorTakeLast<Integer> TAKE_LAST = new OperatorTakeLast<Integer>(1);

@State(Scope.Thread)
public static class Input extends InputWithIncrementingInteger {

@Param({ "5", "100", "1000000" })
public int size;

@Override
public int getSize() {
return size;
}

}

@Benchmark
public void takeLastOneUsingTakeLast(Input input) {
input.observable.lift(TAKE_LAST).subscribe(input.observer);
}

@Benchmark
public void takeLastOneUsingTakeLastOne(Input input) {
input.observable.lift(OperatorTakeLastOne.<Integer>instance()).subscribe(input.observer);
}

}
110 changes: 110 additions & 0 deletions src/test/java/rx/internal/operators/OperatorTakeLastOneTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package rx.internal.operators;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Test;

import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.observers.TestSubscriber;

public class OperatorTakeLastOneTest {

@Test
public void testLastOfManyReturnsLast() {
TestSubscriber<Integer> s = new TestSubscriber<Integer>();
Observable.range(1, 10).takeLast(1).subscribe(s);
s.assertReceivedOnNext(Arrays.asList(10));
s.assertNoErrors();
s.assertTerminalEvent();
s.assertUnsubscribed();
}

@Test
public void testLastOfEmptyReturnsEmpty() {
TestSubscriber<Object> s = new TestSubscriber<Object>();
Observable.empty().takeLast(1).subscribe(s);
s.assertReceivedOnNext(Collections.emptyList());
s.assertNoErrors();
s.assertTerminalEvent();
s.assertUnsubscribed();
}

@Test
public void testLastOfOneReturnsLast() {
TestSubscriber<Integer> s = new TestSubscriber<Integer>();
Observable.just(1).takeLast(1).subscribe(s);
s.assertReceivedOnNext(Arrays.asList(1));
s.assertNoErrors();
s.assertTerminalEvent();
s.assertUnsubscribed();
}

@Test
public void testUnsubscribesFromUpstream() {
final AtomicBoolean unsubscribed = new AtomicBoolean(false);
Action0 unsubscribeAction = new Action0() {
@Override
public void call() {
unsubscribed.set(true);
}
};
Observable.just(1).doOnUnsubscribe(unsubscribeAction)
.takeLast(1).subscribe();
assertTrue(unsubscribed.get());
}

@Test
public void testLastWithBackpressure() {
MySubscriber<Integer> s = new MySubscriber<Integer>(0);
Observable.just(1).takeLast(1).subscribe(s);
assertEquals(0, s.list.size());
s.requestMore(1);
assertEquals(1, s.list.size());
}

private static class MySubscriber<T> extends Subscriber<T> {

private long initialRequest;

MySubscriber(long initialRequest) {
this.initialRequest = initialRequest;
}

final List<T> list = new ArrayList<T>();

public void requestMore(long n) {
request(n);
}

@Override
public void onStart() {
request(initialRequest);
}

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(T t) {
list.add(t);
}

}

}

0 comments on commit 0954ee8

Please sign in to comment.