Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimization - use OperatorTakeLastOne for takeLast(1) #2914

Merged
merged 1 commit into from
Apr 25, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -7771,7 +7771,12 @@ public final Observable<T> takeFirst(Func1<? super T, Boolean> predicate) {
* @see <a href="http://reactivex.io/documentation/operators/takelast.html">ReactiveX operators documentation: TakeLast</a>
*/
public final Observable<T> takeLast(final int count) {
return lift(new OperatorTakeLast<T>(count));
if (count == 0)
return ignoreElements();
else if (count == 1 )
return lift(OperatorTakeLastOne.<T>instance());
else
return lift(new OperatorTakeLast<T>(count));
}

/**
Expand Down
173 changes: 173 additions & 0 deletions src/main/java/rx/internal/operators/OperatorTakeLastOne.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package rx.internal.operators;

import java.util.concurrent.atomic.AtomicInteger;

import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;

public class OperatorTakeLastOne<T> implements Operator<T, T> {

private static class Holder {
static final OperatorTakeLastOne<Object> INSTANCE = new OperatorTakeLastOne<Object>();
}

@SuppressWarnings("unchecked")
public static <T> OperatorTakeLastOne<T> instance() {
return (OperatorTakeLastOne<T>) Holder.INSTANCE;
}

private OperatorTakeLastOne() {

}

@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
final ParentSubscriber<T> parent = new ParentSubscriber<T>(child);
child.setProducer(new Producer() {

@Override
public void request(long n) {
parent.requestMore(n);
}
});
child.add(parent);
return parent;
}

private static class ParentSubscriber<T> extends Subscriber<T> {

private final static int NOT_REQUESTED_NOT_COMPLETED = 0;
private final static int NOT_REQUESTED_COMPLETED = 1;
private final static int REQUESTED_NOT_COMPLETED = 2;
private final static int REQUESTED_COMPLETED = 3;

/*
* These are the expected state transitions:
*
* NOT_REQUESTED_NOT_COMPLETED --> REQUESTED_NOT_COMPLETED
* | |
* V V
* NOT_REQUESTED_COMPLETED --> REQUESTED_COMPLETED
*
* Once at REQUESTED_COMPLETED we emit the last value if one exists
*/

// Used as the initial value of last
private static final Object ABSENT = new Object();

// the downstream subscriber
private final Subscriber<? super T> child;

@SuppressWarnings("unchecked")
// we can get away with this cast at runtime because of type erasure
private T last = (T) ABSENT;

// holds the current state of the stream so that we can make atomic
// updates to it
private final AtomicInteger state = new AtomicInteger(NOT_REQUESTED_NOT_COMPLETED);

ParentSubscriber(Subscriber<? super T> child) {
this.child = child;
}

void requestMore(long n) {
if (n > 0) {
// CAS loop to atomically change state given that onCompleted()
// or another requestMore() may be acting concurrently
while (true) {
// read the value of state and then try state transitions
// only if the value of state does not change in the
// meantime (in another requestMore() or onCompleted()). If
// the value has changed and we expect to do a transition
// still then we loop and try again.
final int s = state.get();
if (s == NOT_REQUESTED_NOT_COMPLETED) {
if (state.compareAndSet(NOT_REQUESTED_NOT_COMPLETED,
REQUESTED_NOT_COMPLETED)) {
return;
}
} else if (s == NOT_REQUESTED_COMPLETED) {
if (state.compareAndSet(NOT_REQUESTED_COMPLETED, REQUESTED_COMPLETED)) {
emit();
return;
}
} else
// already requested so we exit
return;
}
}
}

@Override
public void onCompleted() {
//shortcut if an empty stream
if (last == ABSENT) {
child.onCompleted();
return;
}
// CAS loop to atomically change state given that requestMore()
// may be acting concurrently
while (true) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If last == ABSENT then there is no need to interact with the backpressure logic since a plain onCompleted can be sent before any request. In this case, I'd also not check isUnsubscribed().

// read the value of state and then try state transitions
// only if the value of state does not change in the meantime
// (in another requestMore()). If the value has changed and
// we expect to do a transition still then we loop and try
// again.
final int s = state.get();
if (s == NOT_REQUESTED_NOT_COMPLETED) {
if (state.compareAndSet(NOT_REQUESTED_NOT_COMPLETED, NOT_REQUESTED_COMPLETED)) {
return;
}
} else if (s == REQUESTED_NOT_COMPLETED) {
if (state.compareAndSet(REQUESTED_NOT_COMPLETED, REQUESTED_COMPLETED)) {
emit();
return;
}
} else
// already completed so we exit
return;
}
}

/**
* If not unsubscribed then emits last value and completed to the child
* subscriber.
*/
private void emit() {
if (isUnsubscribed()) {
// release for gc
last = null;
return;
}
// Note that last is safely published despite not being volatile
// because a CAS update must have happened in the current thread just before
// emit() was called
T t = last;
// release for gc
last = null;
if (t != ABSENT) {
try {
child.onNext(t);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never call onNext while holding a lock.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you are calling child.onNext from onCompleted, you need to bounce back any thrown exceptions because throwing onCompleted can't be routed back to onError and child will not receive the exception.

} catch (Throwable e) {
child.onError(e);
return;
}
}
if (!isUnsubscribed())
child.onCompleted();
}

@Override
public void onError(Throwable e) {
child.onError(e);
}

@Override
public void onNext(T t) {
last = t;
}

}

}
39 changes: 39 additions & 0 deletions src/perf/java/rx/operators/OperatorTakeLastOnePerf.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package rx.operators;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;

import rx.internal.operators.OperatorTakeLast;
import rx.internal.operators.OperatorTakeLastOne;
import rx.jmh.InputWithIncrementingInteger;

public class OperatorTakeLastOnePerf {

private static final OperatorTakeLast<Integer> TAKE_LAST = new OperatorTakeLast<Integer>(1);

@State(Scope.Thread)
public static class Input extends InputWithIncrementingInteger {

@Param({ "5", "100", "1000000" })
public int size;

@Override
public int getSize() {
return size;
}

}

@Benchmark
public void takeLastOneUsingTakeLast(Input input) {
input.observable.lift(TAKE_LAST).subscribe(input.observer);
}

@Benchmark
public void takeLastOneUsingTakeLastOne(Input input) {
input.observable.lift(OperatorTakeLastOne.<Integer>instance()).subscribe(input.observer);
}

}
128 changes: 128 additions & 0 deletions src/test/java/rx/internal/operators/OperatorTakeLastOneTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package rx.internal.operators;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.observers.TestSubscriber;

public class OperatorTakeLastOneTest {

@Test
public void testLastOfManyReturnsLast() {
TestSubscriber<Integer> s = new TestSubscriber<Integer>();
Observable.range(1, 10).takeLast(1).subscribe(s);
s.assertReceivedOnNext(Arrays.asList(10));
s.assertNoErrors();
s.assertTerminalEvent();
s.assertUnsubscribed();
}

@Test
public void testLastOfEmptyReturnsEmpty() {
TestSubscriber<Object> s = new TestSubscriber<Object>();
Observable.empty().takeLast(1).subscribe(s);
s.assertReceivedOnNext(Collections.emptyList());
s.assertNoErrors();
s.assertTerminalEvent();
s.assertUnsubscribed();
}

@Test
public void testLastOfOneReturnsLast() {
TestSubscriber<Integer> s = new TestSubscriber<Integer>();
Observable.just(1).takeLast(1).subscribe(s);
s.assertReceivedOnNext(Arrays.asList(1));
s.assertNoErrors();
s.assertTerminalEvent();
s.assertUnsubscribed();
}

@Test
public void testUnsubscribesFromUpstream() {
final AtomicBoolean unsubscribed = new AtomicBoolean(false);
Action0 unsubscribeAction = new Action0() {
@Override
public void call() {
unsubscribed.set(true);
}
};
Observable.just(1).doOnUnsubscribe(unsubscribeAction)
.takeLast(1).subscribe();
assertTrue(unsubscribed.get());
}

@Test
public void testLastWithBackpressure() {
MySubscriber<Integer> s = new MySubscriber<Integer>(0);
Observable.just(1).takeLast(1).subscribe(s);
assertEquals(0, s.list.size());
s.requestMore(1);
assertEquals(1, s.list.size());
}

@Test
public void testTakeLastZeroProcessesAllItemsButIgnoresThem() {
final AtomicInteger upstreamCount = new AtomicInteger();
final int num = 10;
int count = Observable.range(1,num).doOnNext(new Action1<Integer>() {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also add a test for takeLast(0) to make sure it 'delays' the onCompleted.

@Override
public void call(Integer t) {
upstreamCount.incrementAndGet();
}})
.takeLast(0).count().toBlocking().single();
assertEquals(num, upstreamCount.get());
assertEquals(0, count);
}

private static class MySubscriber<T> extends Subscriber<T> {

private long initialRequest;

MySubscriber(long initialRequest) {
this.initialRequest = initialRequest;
}

final List<T> list = new ArrayList<T>();

public void requestMore(long n) {
request(n);
}

@Override
public void onStart() {
request(initialRequest);
}

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(T t) {
list.add(t);
}

}

}