Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/httpjson: Apply rate limiting to all responses #38161

Merged
merged 2 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 39 additions & 34 deletions x-pack/filebeat/input/httpjson/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,35 +42,37 @@ func (r *rateLimiter) execute(ctx context.Context, f func() (*http.Response, err
for {
resp, err := f()
if err != nil {
return nil, fmt.Errorf("failed to read http.response.body: %w", err)
return nil, err
}

if r == nil || resp.StatusCode == http.StatusOK {
if r == nil {
return resp, nil
}

if resp.StatusCode != http.StatusTooManyRequests {
return nil, fmt.Errorf("http request was unsuccessful with a status code %d", resp.StatusCode)
applied, err := r.applyRateLimit(ctx, resp)
if err != nil {
return nil, fmt.Errorf("error applying rate limit: %w", err)
}

if err := r.applyRateLimit(ctx, resp); err != nil {
return nil, err
if resp.StatusCode == http.StatusOK || !applied {
return resp, nil
}
}
}

// applyRateLimit applies appropriate rate limit if specified in the HTTP Header of the response
func (r *rateLimiter) applyRateLimit(ctx context.Context, resp *http.Response) error {
epoch, err := r.getRateLimit(resp)
// applyRateLimit applies appropriate rate limit if specified in the HTTP Header of the response.
// It returns a bool indicating whether a limit was reached.
func (r *rateLimiter) applyRateLimit(ctx context.Context, resp *http.Response) (bool, error) {
limitReached, resumeAt, err := r.getRateLimit(resp)
if err != nil {
return err
return limitReached, err
}

t := time.Unix(epoch, 0)
t := time.Unix(resumeAt, 0)
w := time.Until(t)
if epoch == 0 || w <= 0 {
if resumeAt == 0 || w <= 0 {
r.log.Debugf("Rate Limit: No need to apply rate limit.")
return nil
return limitReached, nil
}
r.log.Debugf("Rate Limit: Wait until %v for the rate limit to reset.", t)
timer := time.NewTimer(w)
Expand All @@ -80,24 +82,25 @@ func (r *rateLimiter) applyRateLimit(ctx context.Context, resp *http.Response) e
<-timer.C
}
r.log.Info("Context done.")
return nil
return limitReached, nil
case <-timer.C:
r.log.Debug("Rate Limit: time is up.")
return nil
return limitReached, nil
}
}

// getRateLimit gets the rate limit value if specified in the response,
// and returns an int64 value in seconds since unix epoch for rate limit reset time.
// and returns a bool indicating whether a limit was reached, and
// an int64 value in seconds since unix epoch for rate limit reset time.
// When there is a remaining rate limit quota, or when the rate limit reset time has expired, it
// returns 0 for the epoch value.
func (r *rateLimiter) getRateLimit(resp *http.Response) (int64, error) {
func (r *rateLimiter) getRateLimit(resp *http.Response) (bool, int64, error) {
if r == nil {
return 0, nil
return false, 0, nil
}

if r.remaining == nil {
return 0, nil
return false, 0, nil
}

tr := transformable{}
Expand All @@ -106,54 +109,56 @@ func (r *rateLimiter) getRateLimit(resp *http.Response) (int64, error) {

remaining, _ := r.remaining.Execute(ctx, tr, "rate-limit_remaining", nil, r.log)
if remaining == "" {
return 0, errors.New("remaining value is empty")
r.log.Infow("get rate limit", "error", errors.New("remaining value is empty"))
return false, 0, nil
}
m, err := strconv.ParseInt(remaining, 10, 64)
if err != nil {
return 0, fmt.Errorf("failed to parse rate-limit remaining value: %w", err)
return false, 0, fmt.Errorf("failed to parse rate-limit remaining value: %w", err)
}

// by default, httpjson will continue requests until Limit is 0
// can optionally stop requests "early"
var activeLimit int64 = 0
var minRemaining int64 = 0
if r.earlyLimit != nil {
earlyLimit := *r.earlyLimit
if earlyLimit > 0 && earlyLimit < 1 {
limit, _ := r.limit.Execute(ctx, tr, "early_limit", nil, r.log)
if limit != "" {
l, err := strconv.ParseInt(limit, 10, 64)
if err == nil {
activeLimit = l - int64(earlyLimit*float64(l))
minRemaining = l - int64(earlyLimit*float64(l))
}
}
} else if earlyLimit >= 1 {
activeLimit = int64(earlyLimit)
minRemaining = int64(earlyLimit)
}
}

r.log.Debugf("Rate Limit: Using active Early Limit: %f", activeLimit)
if m > activeLimit {
return 0, nil
r.log.Debugf("Rate Limit: Using active Early Limit: %d", minRemaining)
if m > minRemaining {
return false, 0, nil
}

if r.reset == nil {
r.log.Warn("reset rate limit is not set")
return 0, nil
return false, 0, nil
}

reset, _ := r.reset.Execute(ctx, tr, "rate-limit_reset", nil, r.log)
if reset == "" {
return 0, errors.New("reset value is empty")
r.log.Infow("get rate limit", "error", errors.New("reset value is empty"))
return false, 0, nil
}

epoch, err := strconv.ParseInt(reset, 10, 64)
resumeAt, err := strconv.ParseInt(reset, 10, 64)
if err != nil {
return 0, fmt.Errorf("failed to parse rate-limit reset value: %w", err)
return false, 0, fmt.Errorf("failed to parse rate-limit reset value: %w", err)
}

if timeNow().Unix() > epoch {
return 0, nil
if timeNow().Unix() > resumeAt {
return true, 0, nil
}

return epoch, nil
return true, resumeAt, nil
}
78 changes: 43 additions & 35 deletions x-pack/filebeat/input/httpjson/rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

// Test getRateLimit function with a remaining quota, expect to receive 0, nil.
func TestGetRateLimitReturns0IfRemainingQuota(t *testing.T) {
func TestGetRateLimitReturnsFalse0IfRemainingQuota(t *testing.T) {
header := make(http.Header)
header.Add("X-Rate-Limit-Limit", "120")
header.Add("X-Rate-Limit-Remaining", "118")
Expand All @@ -34,12 +34,13 @@ func TestGetRateLimitReturns0IfRemainingQuota(t *testing.T) {
log: logp.NewLogger(""),
}
resp := &http.Response{Header: header}
epoch, err := rateLimit.getRateLimit(resp)
applied, resumeAt, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, 0, epoch)
assert.False(t, applied)
assert.EqualValues(t, 0, resumeAt)
}

func TestGetRateLimitReturns0IfEpochInPast(t *testing.T) {
func TestGetRateLimitReturnsTrue0IfResumeAtInPast(t *testing.T) {
header := make(http.Header)
header.Add("X-Rate-Limit-Limit", "10")
header.Add("X-Rate-Limit-Remaining", "0")
Expand All @@ -57,20 +58,21 @@ func TestGetRateLimitReturns0IfEpochInPast(t *testing.T) {
log: logp.NewLogger(""),
}
resp := &http.Response{Header: header}
epoch, err := rateLimit.getRateLimit(resp)
applied, resumeAt, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, 0, epoch)
assert.True(t, applied)
assert.EqualValues(t, 0, resumeAt)
}

func TestGetRateLimitReturnsResetValue(t *testing.T) {
epoch := int64(1604582732 + 100)
reset := int64(1604582732 + 100)
timeNow = func() time.Time { return time.Unix(1604582732, 0).UTC() }
t.Cleanup(func() { timeNow = time.Now })

header := make(http.Header)
header.Add("X-Rate-Limit-Limit", "10")
header.Add("X-Rate-Limit-Remaining", "0")
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(epoch, 10))
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(reset, 10))
tplLimit := &valueTpl{}
tplReset := &valueTpl{}
tplRemaining := &valueTpl{}
Expand All @@ -84,22 +86,23 @@ func TestGetRateLimitReturnsResetValue(t *testing.T) {
log: logp.NewLogger(""),
}
resp := &http.Response{Header: header}
epoch2, err := rateLimit.getRateLimit(resp)
applied, resumeAt, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, 1604582832, epoch2)
assert.True(t, applied)
assert.EqualValues(t, reset, resumeAt)
}

// Test getRateLimit function with a remaining quota, using default early limit
// expect to receive 0, nil.
// expect to receive false, 0, nil.
func TestGetRateLimitReturns0IfEarlyLimit0(t *testing.T) {
resetEpoch := int64(1634579974 + 100)
resetAt := int64(1634579974 + 100)
timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() }
t.Cleanup(func() { timeNow = time.Now })

header := make(http.Header)
header.Add("X-Rate-Limit-Limit", "120")
header.Add("X-Rate-Limit-Remaining", "1")
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10))
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetAt, 10))
tplLimit := &valueTpl{}
tplReset := &valueTpl{}
tplRemaining := &valueTpl{}
Expand All @@ -115,22 +118,23 @@ func TestGetRateLimitReturns0IfEarlyLimit0(t *testing.T) {
earlyLimit: earlyLimit,
}
resp := &http.Response{Header: header}
epoch, err := rateLimit.getRateLimit(resp)
applied, resumeAt, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, 0, epoch)
assert.False(t, applied)
assert.EqualValues(t, 0, resumeAt)
}

// Test getRateLimit function with a remaining limit, but early limit
// expect to receive Reset Time
// expect to receive true, Reset Time
func TestGetRateLimitReturnsResetValueIfEarlyLimit1(t *testing.T) {
resetEpoch := int64(1634579974 + 100)
resetAt := int64(1634579974 + 100)
timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() }
t.Cleanup(func() { timeNow = time.Now })

header := make(http.Header)
header.Add("X-Rate-Limit-Limit", "120")
header.Add("X-Rate-Limit-Remaining", "1")
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10))
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetAt, 10))
tplLimit := &valueTpl{}
tplReset := &valueTpl{}
tplRemaining := &valueTpl{}
Expand All @@ -146,22 +150,23 @@ func TestGetRateLimitReturnsResetValueIfEarlyLimit1(t *testing.T) {
earlyLimit: earlyLimit,
}
resp := &http.Response{Header: header}
epoch, err := rateLimit.getRateLimit(resp)
applied, resumeAt, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, resetEpoch, epoch)
assert.True(t, applied)
assert.EqualValues(t, resetAt, resumeAt)
}

// Test getRateLimit function with a remaining quota, using 90% early limit
// expect to receive 0, nil.
// expect to receive false, 0, nil.
func TestGetRateLimitReturns0IfEarlyLimitPercent(t *testing.T) {
resetEpoch := int64(1634579974 + 100)
resetAt := int64(1634579974 + 100)
timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() }
t.Cleanup(func() { timeNow = time.Now })

header := make(http.Header)
header.Add("X-Rate-Limit-Limit", "120")
header.Add("X-Rate-Limit-Remaining", "13")
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10))
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetAt, 10))
tplLimit := &valueTpl{}
tplReset := &valueTpl{}
tplRemaining := &valueTpl{}
Expand All @@ -177,22 +182,23 @@ func TestGetRateLimitReturns0IfEarlyLimitPercent(t *testing.T) {
earlyLimit: earlyLimit,
}
resp := &http.Response{Header: header}
epoch, err := rateLimit.getRateLimit(resp)
applied, resumeAt, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, 0, epoch)
assert.False(t, applied)
assert.EqualValues(t, 0, resumeAt)
}

// Test getRateLimit function with a remaining limit, but early limit of 90%
// expect to receive Reset Time
// expect to receive true, Reset Time
func TestGetRateLimitReturnsResetValueIfEarlyLimitPercent(t *testing.T) {
resetEpoch := int64(1634579974 + 100)
resetAt := int64(1634579974 + 100)
timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() }
t.Cleanup(func() { timeNow = time.Now })

header := make(http.Header)
header.Add("X-Rate-Limit-Limit", "120")
header.Add("X-Rate-Limit-Remaining", "12")
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10))
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetAt, 10))
tplLimit := &valueTpl{}
tplReset := &valueTpl{}
tplRemaining := &valueTpl{}
Expand All @@ -208,21 +214,22 @@ func TestGetRateLimitReturnsResetValueIfEarlyLimitPercent(t *testing.T) {
earlyLimit: earlyLimit,
}
resp := &http.Response{Header: header}
epoch, err := rateLimit.getRateLimit(resp)
applied, resumeAt, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, resetEpoch, epoch)
assert.True(t, applied)
assert.EqualValues(t, resetAt, resumeAt)
}

// Test getRateLimit function when "Limit" header is missing, when using a Percentage early-limit
// expect to receive 0, nil. (default rate-limiting)
// expect to receive false, 0, nil. (default rate-limiting)
func TestGetRateLimitWhenMissingLimit(t *testing.T) {
resetEpoch := int64(1634579974 + 100)
reset := int64(1634579974 + 100)
timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() }
t.Cleanup(func() { timeNow = time.Now })

header := make(http.Header)
header.Add("X-Rate-Limit-Remaining", "1")
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10))
header.Add("X-Rate-Limit-Reset", strconv.FormatInt(reset, 10))
tplReset := &valueTpl{}
tplRemaining := &valueTpl{}
earlyLimit := func(i float64) *float64 { return &i }(0.9)
Expand All @@ -236,7 +243,8 @@ func TestGetRateLimitWhenMissingLimit(t *testing.T) {
earlyLimit: earlyLimit,
}
resp := &http.Response{Header: header}
epoch, err := rateLimit.getRateLimit(resp)
applied, resumeAt, err := rateLimit.getRateLimit(resp)
assert.NoError(t, err)
assert.EqualValues(t, 0, epoch)
assert.False(t, applied)
assert.EqualValues(t, 0, resumeAt)
}
Loading
Loading