-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ML] Inference API Rate limiter #106330
[ML] Inference API Rate limiter #106330
Conversation
Pinging @elastic/ml-core (Team:ML) |
throw new IllegalArgumentException("Accumulated tokens limit must be greater than or equal to 0"); | ||
} | ||
|
||
if (newAccumulatedTokensLimit == Double.POSITIVE_INFINITY) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (newAccumulatedTokensLimit == Double.POSITIVE_INFINITY) { | |
if (Double.isInfinite(newAccumulatedTokensLimit)) { |
return Double.POSITIVE_INFINITY; | ||
} | ||
|
||
return Double.NEGATIVE_INFINITY; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the accumulateTokens code:
var newTokens = tokensPerNanos * elapsedTimeNanos;
accumulatedTokens = Math.min(accumulatedTokensLimit, newTokens);
Math.min(a_positive_number, Double.NEGATIVE_INFINITY)
returns Double.NEGATIVE_INFINITY
. If accumulatedTokens
becomes a -ve number I think that could cause errors.
One option is to return 0 ( not +ve or -ve infinity). Using ChronoUnit.MICRO or ChronoUnit.MILLIS reduces the chance of an arithmetic overflow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I'll switch it to 0 and use micros instead.
|
||
private static double nanosBetweenExact(Instant start, Instant end) { | ||
try { | ||
return ChronoUnit.NANOS.between(start, end); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TemporalUnit.between() returns a long not double
private void accumulateTokens() { | ||
var now = Instant.now(clock); | ||
if (now.isAfter(nextTokenAvailability)) { | ||
var elapsedTimeNanos = nanosBetweenExact(nextTokenAvailability, now); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is called in the ctor via setRate()
at which point nextTokenAvailability == Instant.MIN
. Because the calculated elapsedTimeNanos
is high the class will be initialised with accumulatedTokens == accumulatedTokensLimit
.
That seems reasonable to me, or at least as good as initialising accumulatedTokens
to 0. Just want to check that is the intention
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that was intentional. My thinking was that the first request can move forward without having to wait for tokens to accumulate if the limit was set to a positive number. If we always want it to start as 0 that's fine with me too though.
@elasticmachine run elasticsearch-ci/part-1 |
@elasticmachine merge upstream |
private void accumulateTokens() { | ||
var now = Instant.now(clock); | ||
if (now.isAfter(nextTokenAvailability)) { | ||
var elapsedTimeNanos = microsBetweenExact(nextTokenAvailability, now); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var elapsedTimeNanos = microsBetweenExact(nextTokenAvailability, now); | |
var elapsedTimeMicros = microsBetweenExact(nextTokenAvailability, now); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ugh sorry for all the missed nanos
find replace 🤦♂️
var now = Instant.now(clock); | ||
if (now.isAfter(nextTokenAvailability)) { | ||
var elapsedTimeNanos = microsBetweenExact(nextTokenAvailability, now); | ||
var newTokens = tokensPerMicros * elapsedTimeNanos; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var newTokens = tokensPerMicros * elapsedTimeNanos; | |
var newTokens = tokensPerMicros * elapsedTimeMicros; |
|
||
accumulatedTokensLimit = newAccumulatedTokensLimit; | ||
|
||
var unitsInNanos = newUnit.toMicros(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var unitsInNanos = newUnit.toMicros(1); | |
var unitsInMicros = newUnit.toMicros(1); |
accumulatedTokensLimit = newAccumulatedTokensLimit; | ||
|
||
var unitsInNanos = newUnit.toMicros(1); | ||
tokensPerMicros = newTokensPerTimeUnit / unitsInNanos; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tokensPerMicros = newTokensPerTimeUnit / unitsInNanos; | |
tokensPerMicros = newTokensPerTimeUnit / unitsInMicros; |
if (now.isAfter(nextTokenAvailability)) { | ||
var elapsedTimeNanos = microsBetweenExact(nextTokenAvailability, now); | ||
var newTokens = tokensPerMicros * elapsedTimeNanos; | ||
accumulatedTokens = Math.min(accumulatedTokensLimit, newTokens); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this include the previously accumulated tokens?
accumulatedTokens = Math.min(accumulatedTokensLimit, newTokens); | |
accumulatedTokens = Math.min(accumulatedTokensLimit, accumulatedTokens + newTokens); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep thanks for that.
…search into ml-token-bucket
@elasticmachine merge upstream |
@elasticmachine run elasticsearch-ci/part-3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@elasticmachine merge upstream |
@elasticmachine merge upstream |
This PR might be relevant to this issue #106877 |
This PR adds a
RateLimiter
class. It is currently unused but will be leveraged once the queuing and threading of the external services is refactored.It implements the token bucket algorithm: https://en.wikipedia.org/wiki/Token_bucket