Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: add limit() to limit both item count and request amount #5655

Merged
merged 3 commits into from
Oct 18, 2017

Conversation

akarnokd
Copy link
Member

@akarnokd akarnokd commented Oct 9, 2017

This PR adds the operator limit to limit the number of items as well as the total request amount.

Some asynchronous boundary-like sources, such as network libraries that translate from/to Flowable/Publisher, may not modulate or limit downstream requests and thus the other side may produce items unnecessarily.

Alternatives, such as take(N) is designed to go unbounded if its downstream requests more than N to improve performance of local flows and rebatchRequests(M) will keep requesting once 75% of M has been received.

Note that requests are negotiated on an operator boundary and limit's amount may not be preserved further upstream. For example, source.observeOn(Schedulers.computation()).limit(5) will still request the default (128) elements from the given source.

Related discussion: #5077.
Blog: Java 9 Flow API: taking and skipping - Limiting the request amount

@codecov
Copy link

codecov bot commented Oct 9, 2017

Codecov Report

Merging #5655 into 2.x will decrease coverage by 0.03%.
The diff coverage is 100%.

Impacted file tree graph

@@             Coverage Diff              @@
##                2.x    #5655      +/-   ##
============================================
- Coverage     96.24%   96.21%   -0.04%     
+ Complexity     5853     5850       -3     
============================================
  Files           635      636       +1     
  Lines         41651    41703      +52     
  Branches       5768     5778      +10     
============================================
+ Hits          40089    40123      +34     
- Misses          616      623       +7     
- Partials        946      957      +11
Impacted Files Coverage Δ Complexity Δ
src/main/java/io/reactivex/Flowable.java 100% <100%> (ø) 525 <2> (+2) ⬆️
...vex/internal/operators/flowable/FlowableLimit.java 100% <100%> (ø) 2 <2> (?)
...ernal/operators/flowable/FlowableFlatMapMaybe.java 88.88% <0%> (-7.73%) 2% <0%> (ø)
.../operators/flowable/FlowableBlockingSubscribe.java 91.89% <0%> (-5.41%) 9% <0%> (-1%)
...rnal/subscribers/SinglePostCompleteSubscriber.java 94.87% <0%> (-5.13%) 14% <0%> (-1%)
.../io/reactivex/internal/schedulers/IoScheduler.java 89.24% <0%> (-3.23%) 9% <0%> (ø)
...a/io/reactivex/internal/util/QueueDrainHelper.java 60.28% <0%> (-2.84%) 32% <0%> (-2%)
...activex/internal/observers/QueueDrainObserver.java 61.53% <0%> (-2.57%) 12% <0%> (-1%)
...ternal/operators/completable/CompletableMerge.java 96.42% <0%> (-2.39%) 2% <0%> (ø)
.../operators/observable/ObservableFlatMapSingle.java 88.8% <0%> (-2.24%) 2% <0%> (ø)
... and 26 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update e02cb58...a7e3037. Read the comment docs.

@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport(SchedulerSupport.NONE)
@CheckReturnValue
public final Flowable<T> limit(long count) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this strict version of take(), but thinking about API design I'm afraid that this additional operator could add unnecessary confusion for the users.

Maybe an overload of take() with boolean flag that controls "strictness" of the upstream requests would work? (or something better if you have it on your mind)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly.


@Override
public void onNext(T t) {
long r = remaining;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the point of local r? It looks like you can just do everything directly on remaining here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I avoid re-reading fields.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, it's not volatile, so it should be normally in CPU register/cache, but yeah local var has slightly more chances to be there

Although local var creates indirection between read/writes, so overall I think it's neither win nor loss in performance :)

There is interesting question on SO, but answers seem to only compare speed of access to the field when it's already initialized, not taking overall overhead of additional variable https://stackoverflow.com/questions/21613098/java-local-vs-instance-variable-access-speed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JIT may load it into a register but not everything runs on HotSpot.

remaining = --r;
actual.onNext(t);
if (r == 0L) {
upstream.cancel();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about this:

if (r > 0L) {
  remaining = --r;
  final boolean completed = r == 0L;
  
  if (completed) {
    upstream.cancel();
  }

  actual.onNext(t);

  if (completed) {
    actual.onComplete();
  }
}

This way we cancel upstream before delivering onNext notification, it could be important in case if onNext triggers long running chain of computations thus delaying cancellation of the upstream (in current version).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though, current implementations of take() also does this so it might be unwanted behavior change, up2u, we can move this change to 3.x

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or it could trigger an interrupted thread and fail code in the downstream unexpectedly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

But doesn't it mean that actual.onComplete() should happen before upstream.cancel() then? :trollface:

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's way less likely to happen. There were issues around onNext before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, although I think probability is the same here, upstream.cancel() has to be called in both ways of call ordering and possibly cause problems. But in current order we guarantee delivery of onNext which is probably good :)


public class FlowableLimitTest implements LongConsumer, Action {

final List<Long> requests = Collections.synchronizedList(new ArrayList<Long>());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I don't think you need synchronization here, I don't see any concurrent access to this list

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just in case.


@Override
public void run() throws Exception {
requests.add(-100L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: List<Object>, private static final Object CANCELLED = new Object(), requests.add(CANCELLED)

assertEquals(5, requests.get(0));
assertEquals(CANCELLED, requests.get(1));

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-100 is fine.


assertEquals(2, requests.size());
assertEquals(5, requests.get(0).intValue());
assertEquals(-100, requests.get(1).intValue());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use the constant here?

@akarnokd
Copy link
Member Author

Updated. I'm not too fond of a take(long, boolean) overload. I suggest having limit as the name. I think the feature it provides is mainly useful for network/IO library writers and power users.

@vanniktech
Copy link
Collaborator

I also prefer to have a dedicated method instead of magic Boolean operators.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants