Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimization - use OperatorTakeLastOne for takeLast(1) #2914

Merged
merged 1 commit into from
Apr 25, 2015

Conversation

davidmoten
Copy link
Collaborator

This is an optimization for takeLast when called with parameter 1. Using OperatorTakeLast carries unnecessary overhead for the takeLast(1) case and a decent throughput improvement (x2) for streams of 100 elements or more is seen in the benchmarks below.

takeLast(1) is used by the following operators which will also demonstrate a throughput improvement:

  • last, lastOrDefault
  • reduce
  • collect
  • count, countLong

Benchmarks comparing using the new OperatorTakeLastOne and OperatorTakeLast:

Benchmark                                                        Mode   Samples        Score  Score error    Units
r.o.OperatorTakeLastOnePerf.takeLastOneUsingTakeLastOne_Few     thrpt         5  2235516.141   129091.019    ops/s
r.o.OperatorTakeLastOnePerf.takeLastOneUsingTakeLastOne_Many    thrpt         5      103.980        9.233    ops/s
r.o.OperatorTakeLastOnePerf.takeLastOneUsingTakeLastOne_Some    thrpt         5   984689.481    48560.897    ops/s
r.o.OperatorTakeLastOnePerf.takeLastOneUsingTakeLast_Few        thrpt         5  2187421.223    93550.379    ops/s
r.o.OperatorTakeLastOnePerf.takeLastOneUsingTakeLast_Many       thrpt         5       54.575        2.054    ops/s
r.o.OperatorTakeLastOnePerf.takeLastOneUsingTakeLast_Some       thrpt         5   466892.497     9267.405    ops/s

T t = last;
// release for gc
last = null;
child.onNext(t);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never call onNext while holding a lock.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you are calling child.onNext from onCompleted, you need to bounce back any thrown exceptions because throwing onCompleted can't be routed back to onError and child will not receive the exception.

@@ -7771,7 +7771,10 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
* @see <a href="http://reactivex.io/documentation/operators/takelast.html">ReactiveX operators documentation: TakeLast</a>
*/
public final Observable<T> takeLast(final int count) {
return lift(new OperatorTakeLast<T>(count));
if (count == 1 )
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is takeLast(0) meaningful? If so, it can be routed to ignoreElements(). BTW, ignoreElements() can be optimized by making it singleton and be very simple (i.e., its onNext does nothing) instead of filtering with always false.

@davidmoten
Copy link
Collaborator Author

Thanks @akarnokd for the review, I appreciate the lessons!

I've incorporated the changes except for the ignoreElements optimization which I'll leave for another PR.

I wrote this up before you submitted the SingleDelayedProducer in #2901 but had spotted its usefulness. It's very close to being reusable here, just have to handle the empty stream case which is allowed by takeLast. I submitted this anyway because it helps me learn, thanks.

Up to you of course if you want to merge this or wait for #2901 to be merged then get another PR that uses a modified SingleDelayedProducer (might have its name changed if it allows single or none).

Updated benchmarks (that's a nifty trick using Params):

Benchmark                                                  (size)   Mode   Samples        Score  Score error    Units
r.o.OperatorTakeLastOnePerf.takeLastOneUsingTakeLast            5  thrpt         5  2233452.067    81630.798    ops/s
r.o.OperatorTakeLastOnePerf.takeLastOneUsingTakeLast          100  thrpt         5   444353.711   224920.714    ops/s
r.o.OperatorTakeLastOnePerf.takeLastOneUsingTakeLast      1000000  thrpt         5       56.947        2.630    ops/s
r.o.OperatorTakeLastOnePerf.takeLastOneUsingTakeLastOne         5  thrpt         5  2372577.831    76820.729    ops/s
r.o.OperatorTakeLastOnePerf.takeLastOneUsingTakeLastOne       100  thrpt         5  1012528.600    75674.064    ops/s
r.o.OperatorTakeLastOnePerf.takeLastOneUsingTakeLastOne   1000000  thrpt         5      108.284        4.965    ops/s

public void onCompleted() {
// CAS loop to atomically change state given that requestMore()
// may be acting concurrently
while (true) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If last == ABSENT then there is no need to interact with the backpressure logic since a plain onCompleted can be sent before any request. In this case, I'd also not check isUnsubscribed().

@davidmoten
Copy link
Collaborator Author

Thanks @akarnokd. I've added your suggestions:

  • shortcut to completion when empty
  • test for takeLast(0)

@akarnokd
Copy link
Member

Thanks!

akarnokd added a commit that referenced this pull request Apr 25, 2015
Optimization - use OperatorTakeLastOne for takeLast(1)
@akarnokd akarnokd merged commit 1cff8bf into ReactiveX:1.x Apr 25, 2015
@benjchristensen
Copy link
Member

👍

@benjchristensen benjchristensen mentioned this pull request Apr 30, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants