-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
try to transform throttle into a reusable transformation #3409
Conversation
.window(runningOptions.getUsersPerSecond(), Duration.ofSeconds(1)) | ||
.throttle(users) | ||
return Iterators.toFlux(usersRepository.list()) | ||
.transform(ReactorUtils.Throttler.<Username>throttler().elements(runningOptions.getUsersPerSecond()).per(Duration.ofSeconds(1))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indent
.throttle(originalFlux) | ||
ImmutableList<Integer> ongoingProcessingUponComputationStart = originalFlux | ||
.transform(ReactorUtils.<Integer>throttle().elements(windowMaxSize).per(windowDuration)) | ||
.flatMap(longRunningOperation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain to me why throttleDownStreamConcurrencyShouldNotExceedWindowMaxSize
still works?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it probably doesn't but maybe you have an idea of why if you put this comment? I couldn't find why it broke
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way I understand it is that transform
don't limit downstream concurrency, hence I was really curious to know if that test pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from https://projectreactor.io/docs/core/release/reference/
How did you saw it's doing anything differently than inline operators?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your operation don't wrap the downstream flatMap request were we do actually perform the operation we want to throttle. If the prossessing is slower than the rate of the throttling you would end up doing more concurrent operations than what you would like, unless you apply some form of limitations to it.
That's what the test enforces.
Also, from what I understand the downstream flatMap needs to be part of the 'extracted operator chain'.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A syntax addressing this might be (suggestion)
originalFlux
.transform(ReactorUtils<Integer>throttle()
.elements(windowMaxSize)
.per(windowDuration)
.forOperation(longRunningOperation))
I know it's a draft, but still... No ticket related? |
Preconditions.checkArgument(!duration.isZero(), "'windowDuration' must be strictly positive"); | ||
return flux -> flux | ||
.windowTimeout(elements, duration) | ||
.zipWith(Flux.interval(DELAY, duration), 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you make more clear of 1
, days or minus or second?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
concurrency. if we can make it work, we'll refine the PR
I pushed a fixup with my above proposal. I was surprised to get a buffer overrun error due to the prefect on the interval (that one #3409 (comment)) , hence added one more test to ensure we are "buffer filling proof". Feel free to keep it or discard it. |
|
3ed77dd
to
d2245f5
Compare
There was rebase issue with master, so I rebased and tried to fix the issue |
Can be considered 🍏 test this please |
Log output lost test this please |
Reschedule build: test this please |
Seems unrelated test this please |
test this please |
(be patient) test this please |
...in/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
Outdated
Show resolved
Hide resolved
Still missing a relating ticket though? |
JAMES-3184 seems appropriate |
Merged |
I tried to make it more readable but failed because of reactor/reactor-core#1099
If anybody has an idea ...