Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/entityanalytics/provider/okta: Rate limiting hangs #40106

Open
6 of 9 tasks
chrisberkhout opened this issue Jul 4, 2024 · 3 comments
Open
6 of 9 tasks
Assignees
Labels
bug Team:Security-Service Integrations Security Service Integrations Team

Comments

@chrisberkhout
Copy link
Contributor

chrisberkhout commented Jul 4, 2024

Around the time of a rate limit reset, the rate limiting code may set a negative target rate and wait forever before the next request.

The rate comes out negative if current time is after the reset time returned in response headers. If a negative rate is set at a time when no request budget has accumulated, it will not recover. How previous events and timing affect the outcome can be seen in this example code.

We can avoid setting a negative rate by changing == 0 to <= 0 here.

There are some other corrections that can be made to the rate limiting logic, listed below. Beyond correctness, there are improvements that could be made for better operability, fault tolerance and user feedback.

A similar set of changes should be considered in the CEL input and related Mito code (in OktaRateLimit and in DraftRateLimit).

Fix rate limiting logic - #41583

Improve operability - #41977

Improve fault tolerance and user feedback

@chrisberkhout chrisberkhout added bug Team:Security-Service Integrations Security Service Integrations Team labels Jul 4, 2024
@chrisberkhout chrisberkhout self-assigned this Jul 4, 2024
@elasticmachine
Copy link
Collaborator

Pinging @elastic/security-service-integrations (Team:Security-Service Integrations)

@chrisberkhout
Copy link
Contributor Author

chrisberkhout commented Jul 4, 2024

Stop requests until reset rather than doing one more burst when x-rate-limit-remaining: 0

Setting a negative rate in the distant past avoids generating new tokens for the period from now until waitUntil (if the rate was zero a concurrent caller could still consume the burst): <-- True, but it won't clear existing tokens

diff --git a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta.go b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta.go
index 58495cbcd6..b33c7be7e6 100644
--- a/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta.go
+++ b/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta/okta.go
@@ -444,6 +444,7 @@ func oktaRateLimit(h http.Header, window time.Duration, limiter *rate.Limiter) e
 		// estimate will be overwritten when we make the next
 		// permissible API request.
 		next := rate.Limit(lim / window.Seconds())
+		limiter.SetLimitAt(time.Time{}, rate.Limit(-1))
 		limiter.SetLimitAt(waitUntil, next)
 		limiter.SetBurstAt(waitUntil, burst)
 		return nil

@chrisberkhout
Copy link
Contributor Author

In Go's x/time/rate/rate.go, a reservation is made as follows:

// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation {
	lim.mu.Lock()
	defer lim.mu.Unlock()

	if lim.limit == Inf {
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: t,
		}
	} else if lim.limit == 0 {
		var ok bool
		if lim.burst >= n {
			ok = true
			lim.burst -= n
		}
		return Reservation{
			ok:        ok,
			lim:       lim,
			tokens:    lim.burst,
			timeToAct: t,
		}
	}

	t, tokens := lim.advance(t)

	// Calculate the remaining number of tokens resulting from the request.
	tokens -= float64(n)

	// Calculate the wait duration
	var waitDuration time.Duration
	if tokens < 0 {
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}

	// Decide result
	ok := n <= lim.burst && waitDuration <= maxFutureReserve

	// Prepare reservation
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}
	if ok {
		r.tokens = n
		r.timeToAct = t.Add(waitDuration)

		// Update state
		lim.last = t
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	}

	return r
}

If t < lim.last, lim.advance(t) will return no new tokens (only existing tokens), because no time has elapsed to accumulate them. This case is specifically handled in advance(). Then waitDuration is set to the time it would take to accumulate the necessary tokens.

However, new tokens won't start accumulating until after lim.last. The correct way to calculate waitDuration would be:

// Calculate the wait duration
var waitDuration time.Duration
if tokens < 0 {
        if t < lim.last {
                waitDuration += lim.last.Sub(t) // non-accumulating duration
        }
        waitDuration += lim.limit.durationFromTokens(-tokens)
}

The reservation result (ok := n <= lim.burst && waitDuration <= maxFutureReserve) and the time to act (r.timeToAct = t.Add(waitDuration)) depend on having a correct value for waitDuration.

Also, lim.last should not be set to a smaller value, otherwise there can be double accumulation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Team:Security-Service Integrations Security Service Integrations Team
Projects
None yet
Development

No branches or pull requests

3 participants