diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 38249d44e6..ead34cba61 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -7771,7 +7771,12 @@ public final Observable takeFirst(Func1 predicate) { * @see ReactiveX operators documentation: TakeLast */ public final Observable takeLast(final int count) { - return lift(new OperatorTakeLast(count)); + if (count == 0) + return ignoreElements(); + else if (count == 1 ) + return lift(OperatorTakeLastOne.instance()); + else + return lift(new OperatorTakeLast(count)); } /** diff --git a/src/main/java/rx/internal/operators/OperatorTakeLastOne.java b/src/main/java/rx/internal/operators/OperatorTakeLastOne.java new file mode 100644 index 0000000000..a9bb7b5d33 --- /dev/null +++ b/src/main/java/rx/internal/operators/OperatorTakeLastOne.java @@ -0,0 +1,173 @@ +package rx.internal.operators; + +import java.util.concurrent.atomic.AtomicInteger; + +import rx.Observable.Operator; +import rx.Producer; +import rx.Subscriber; + +public class OperatorTakeLastOne implements Operator { + + private static class Holder { + static final OperatorTakeLastOne INSTANCE = new OperatorTakeLastOne(); + } + + @SuppressWarnings("unchecked") + public static OperatorTakeLastOne instance() { + return (OperatorTakeLastOne) Holder.INSTANCE; + } + + private OperatorTakeLastOne() { + + } + + @Override + public Subscriber call(Subscriber child) { + final ParentSubscriber parent = new ParentSubscriber(child); + child.setProducer(new Producer() { + + @Override + public void request(long n) { + parent.requestMore(n); + } + }); + child.add(parent); + return parent; + } + + private static class ParentSubscriber extends Subscriber { + + private final static int NOT_REQUESTED_NOT_COMPLETED = 0; + private final static int NOT_REQUESTED_COMPLETED = 1; + private final static int REQUESTED_NOT_COMPLETED = 2; + private final static int REQUESTED_COMPLETED = 3; + + /* + * These are the expected state transitions: + * + * NOT_REQUESTED_NOT_COMPLETED --> REQUESTED_NOT_COMPLETED + * | | + * V V + * NOT_REQUESTED_COMPLETED --> REQUESTED_COMPLETED + * + * Once at REQUESTED_COMPLETED we emit the last value if one exists + */ + + // Used as the initial value of last + private static final Object ABSENT = new Object(); + + // the downstream subscriber + private final Subscriber child; + + @SuppressWarnings("unchecked") + // we can get away with this cast at runtime because of type erasure + private T last = (T) ABSENT; + + // holds the current state of the stream so that we can make atomic + // updates to it + private final AtomicInteger state = new AtomicInteger(NOT_REQUESTED_NOT_COMPLETED); + + ParentSubscriber(Subscriber child) { + this.child = child; + } + + void requestMore(long n) { + if (n > 0) { + // CAS loop to atomically change state given that onCompleted() + // or another requestMore() may be acting concurrently + while (true) { + // read the value of state and then try state transitions + // only if the value of state does not change in the + // meantime (in another requestMore() or onCompleted()). If + // the value has changed and we expect to do a transition + // still then we loop and try again. + final int s = state.get(); + if (s == NOT_REQUESTED_NOT_COMPLETED) { + if (state.compareAndSet(NOT_REQUESTED_NOT_COMPLETED, + REQUESTED_NOT_COMPLETED)) { + return; + } + } else if (s == NOT_REQUESTED_COMPLETED) { + if (state.compareAndSet(NOT_REQUESTED_COMPLETED, REQUESTED_COMPLETED)) { + emit(); + return; + } + } else + // already requested so we exit + return; + } + } + } + + @Override + public void onCompleted() { + //shortcut if an empty stream + if (last == ABSENT) { + child.onCompleted(); + return; + } + // CAS loop to atomically change state given that requestMore() + // may be acting concurrently + while (true) { + // read the value of state and then try state transitions + // only if the value of state does not change in the meantime + // (in another requestMore()). If the value has changed and + // we expect to do a transition still then we loop and try + // again. + final int s = state.get(); + if (s == NOT_REQUESTED_NOT_COMPLETED) { + if (state.compareAndSet(NOT_REQUESTED_NOT_COMPLETED, NOT_REQUESTED_COMPLETED)) { + return; + } + } else if (s == REQUESTED_NOT_COMPLETED) { + if (state.compareAndSet(REQUESTED_NOT_COMPLETED, REQUESTED_COMPLETED)) { + emit(); + return; + } + } else + // already completed so we exit + return; + } + } + + /** + * If not unsubscribed then emits last value and completed to the child + * subscriber. + */ + private void emit() { + if (isUnsubscribed()) { + // release for gc + last = null; + return; + } + // Note that last is safely published despite not being volatile + // because a CAS update must have happened in the current thread just before + // emit() was called + T t = last; + // release for gc + last = null; + if (t != ABSENT) { + try { + child.onNext(t); + } catch (Throwable e) { + child.onError(e); + return; + } + } + if (!isUnsubscribed()) + child.onCompleted(); + } + + @Override + public void onError(Throwable e) { + child.onError(e); + } + + @Override + public void onNext(T t) { + last = t; + } + + } + +} diff --git a/src/perf/java/rx/operators/OperatorTakeLastOnePerf.java b/src/perf/java/rx/operators/OperatorTakeLastOnePerf.java new file mode 100644 index 0000000000..43adf76bc5 --- /dev/null +++ b/src/perf/java/rx/operators/OperatorTakeLastOnePerf.java @@ -0,0 +1,39 @@ +package rx.operators; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; + +import rx.internal.operators.OperatorTakeLast; +import rx.internal.operators.OperatorTakeLastOne; +import rx.jmh.InputWithIncrementingInteger; + +public class OperatorTakeLastOnePerf { + + private static final OperatorTakeLast TAKE_LAST = new OperatorTakeLast(1); + + @State(Scope.Thread) + public static class Input extends InputWithIncrementingInteger { + + @Param({ "5", "100", "1000000" }) + public int size; + + @Override + public int getSize() { + return size; + } + + } + + @Benchmark + public void takeLastOneUsingTakeLast(Input input) { + input.observable.lift(TAKE_LAST).subscribe(input.observer); + } + + @Benchmark + public void takeLastOneUsingTakeLastOne(Input input) { + input.observable.lift(OperatorTakeLastOne.instance()).subscribe(input.observer); + } + +} diff --git a/src/test/java/rx/internal/operators/OperatorTakeLastOneTest.java b/src/test/java/rx/internal/operators/OperatorTakeLastOneTest.java new file mode 100644 index 0000000000..3ca921daf0 --- /dev/null +++ b/src/test/java/rx/internal/operators/OperatorTakeLastOneTest.java @@ -0,0 +1,128 @@ +package rx.internal.operators; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import rx.Observable; +import rx.Subscriber; +import rx.Subscription; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.observers.TestSubscriber; + +public class OperatorTakeLastOneTest { + + @Test + public void testLastOfManyReturnsLast() { + TestSubscriber s = new TestSubscriber(); + Observable.range(1, 10).takeLast(1).subscribe(s); + s.assertReceivedOnNext(Arrays.asList(10)); + s.assertNoErrors(); + s.assertTerminalEvent(); + s.assertUnsubscribed(); + } + + @Test + public void testLastOfEmptyReturnsEmpty() { + TestSubscriber s = new TestSubscriber(); + Observable.empty().takeLast(1).subscribe(s); + s.assertReceivedOnNext(Collections.emptyList()); + s.assertNoErrors(); + s.assertTerminalEvent(); + s.assertUnsubscribed(); + } + + @Test + public void testLastOfOneReturnsLast() { + TestSubscriber s = new TestSubscriber(); + Observable.just(1).takeLast(1).subscribe(s); + s.assertReceivedOnNext(Arrays.asList(1)); + s.assertNoErrors(); + s.assertTerminalEvent(); + s.assertUnsubscribed(); + } + + @Test + public void testUnsubscribesFromUpstream() { + final AtomicBoolean unsubscribed = new AtomicBoolean(false); + Action0 unsubscribeAction = new Action0() { + @Override + public void call() { + unsubscribed.set(true); + } + }; + Observable.just(1).doOnUnsubscribe(unsubscribeAction) + .takeLast(1).subscribe(); + assertTrue(unsubscribed.get()); + } + + @Test + public void testLastWithBackpressure() { + MySubscriber s = new MySubscriber(0); + Observable.just(1).takeLast(1).subscribe(s); + assertEquals(0, s.list.size()); + s.requestMore(1); + assertEquals(1, s.list.size()); + } + + @Test + public void testTakeLastZeroProcessesAllItemsButIgnoresThem() { + final AtomicInteger upstreamCount = new AtomicInteger(); + final int num = 10; + int count = Observable.range(1,num).doOnNext(new Action1() { + + @Override + public void call(Integer t) { + upstreamCount.incrementAndGet(); + }}) + .takeLast(0).count().toBlocking().single(); + assertEquals(num, upstreamCount.get()); + assertEquals(0, count); + } + + private static class MySubscriber extends Subscriber { + + private long initialRequest; + + MySubscriber(long initialRequest) { + this.initialRequest = initialRequest; + } + + final List list = new ArrayList(); + + public void requestMore(long n) { + request(n); + } + + @Override + public void onStart() { + request(initialRequest); + } + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onNext(T t) { + list.add(t); + } + + } + +}