From 8c12cebe77c7790a9ce0252099a97f994b6e560c Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 8 Mar 2024 21:53:01 +0000 Subject: [PATCH] x-pack/filebeat/input/httpjson: Apply rate limiting to all responses (#38161) (#38236) - Drain the body before rate limiting. - Apply rate limiting to all responses, waiting immediately. Retry if the response was not successful and there was a rate limit wait (even if immediately expired), otherwise return. - Improve names of variables `epoch` and `activeLimit`. Co-authored-by: Dan Kortschak (cherry picked from commit ca07b8e680c9029648fc401d1a25bffff59a2a91) Co-authored-by: Chris Berkhout --- .../filebeat/input/httpjson/rate_limiter.go | 73 +++++++++-------- .../input/httpjson/rate_limiter_test.go | 78 ++++++++++--------- x-pack/filebeat/input/httpjson/request.go | 22 +++--- 3 files changed, 93 insertions(+), 80 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/rate_limiter.go b/x-pack/filebeat/input/httpjson/rate_limiter.go index 30c50ae3f05..d82f5829be8 100644 --- a/x-pack/filebeat/input/httpjson/rate_limiter.go +++ b/x-pack/filebeat/input/httpjson/rate_limiter.go @@ -42,35 +42,37 @@ func (r *rateLimiter) execute(ctx context.Context, f func() (*http.Response, err for { resp, err := f() if err != nil { - return nil, fmt.Errorf("failed to read http.response.body: %w", err) + return nil, err } - if r == nil || resp.StatusCode == http.StatusOK { + if r == nil { return resp, nil } - if resp.StatusCode != http.StatusTooManyRequests { - return nil, fmt.Errorf("http request was unsuccessful with a status code %d", resp.StatusCode) + applied, err := r.applyRateLimit(ctx, resp) + if err != nil { + return nil, fmt.Errorf("error applying rate limit: %w", err) } - if err := r.applyRateLimit(ctx, resp); err != nil { - return nil, err + if resp.StatusCode == http.StatusOK || !applied { + return resp, nil } } } -// applyRateLimit applies appropriate rate limit if specified in the HTTP Header of the response -func (r *rateLimiter) applyRateLimit(ctx context.Context, resp *http.Response) error { - epoch, err := r.getRateLimit(resp) +// applyRateLimit applies appropriate rate limit if specified in the HTTP Header of the response. +// It returns a bool indicating whether a limit was reached. +func (r *rateLimiter) applyRateLimit(ctx context.Context, resp *http.Response) (bool, error) { + limitReached, resumeAt, err := r.getRateLimit(resp) if err != nil { - return err + return limitReached, err } - t := time.Unix(epoch, 0) + t := time.Unix(resumeAt, 0) w := time.Until(t) - if epoch == 0 || w <= 0 { + if resumeAt == 0 || w <= 0 { r.log.Debugf("Rate Limit: No need to apply rate limit.") - return nil + return limitReached, nil } r.log.Debugf("Rate Limit: Wait until %v for the rate limit to reset.", t) timer := time.NewTimer(w) @@ -80,24 +82,25 @@ func (r *rateLimiter) applyRateLimit(ctx context.Context, resp *http.Response) e <-timer.C } r.log.Info("Context done.") - return nil + return limitReached, nil case <-timer.C: r.log.Debug("Rate Limit: time is up.") - return nil + return limitReached, nil } } // getRateLimit gets the rate limit value if specified in the response, -// and returns an int64 value in seconds since unix epoch for rate limit reset time. +// and returns a bool indicating whether a limit was reached, and +// an int64 value in seconds since unix epoch for rate limit reset time. // When there is a remaining rate limit quota, or when the rate limit reset time has expired, it // returns 0 for the epoch value. -func (r *rateLimiter) getRateLimit(resp *http.Response) (int64, error) { +func (r *rateLimiter) getRateLimit(resp *http.Response) (bool, int64, error) { if r == nil { - return 0, nil + return false, 0, nil } if r.remaining == nil { - return 0, nil + return false, 0, nil } tr := transformable{} @@ -106,16 +109,17 @@ func (r *rateLimiter) getRateLimit(resp *http.Response) (int64, error) { remaining, _ := r.remaining.Execute(ctx, tr, "rate-limit_remaining", nil, r.log) if remaining == "" { - return 0, errors.New("remaining value is empty") + r.log.Infow("get rate limit", "error", errors.New("remaining value is empty")) + return false, 0, nil } m, err := strconv.ParseInt(remaining, 10, 64) if err != nil { - return 0, fmt.Errorf("failed to parse rate-limit remaining value: %w", err) + return false, 0, fmt.Errorf("failed to parse rate-limit remaining value: %w", err) } // by default, httpjson will continue requests until Limit is 0 // can optionally stop requests "early" - var activeLimit int64 = 0 + var minRemaining int64 = 0 if r.earlyLimit != nil { earlyLimit := *r.earlyLimit if earlyLimit > 0 && earlyLimit < 1 { @@ -123,37 +127,38 @@ func (r *rateLimiter) getRateLimit(resp *http.Response) (int64, error) { if limit != "" { l, err := strconv.ParseInt(limit, 10, 64) if err == nil { - activeLimit = l - int64(earlyLimit*float64(l)) + minRemaining = l - int64(earlyLimit*float64(l)) } } } else if earlyLimit >= 1 { - activeLimit = int64(earlyLimit) + minRemaining = int64(earlyLimit) } } - r.log.Debugf("Rate Limit: Using active Early Limit: %f", activeLimit) - if m > activeLimit { - return 0, nil + r.log.Debugf("Rate Limit: Using active Early Limit: %d", minRemaining) + if m > minRemaining { + return false, 0, nil } if r.reset == nil { r.log.Warn("reset rate limit is not set") - return 0, nil + return false, 0, nil } reset, _ := r.reset.Execute(ctx, tr, "rate-limit_reset", nil, r.log) if reset == "" { - return 0, errors.New("reset value is empty") + r.log.Infow("get rate limit", "error", errors.New("reset value is empty")) + return false, 0, nil } - epoch, err := strconv.ParseInt(reset, 10, 64) + resumeAt, err := strconv.ParseInt(reset, 10, 64) if err != nil { - return 0, fmt.Errorf("failed to parse rate-limit reset value: %w", err) + return false, 0, fmt.Errorf("failed to parse rate-limit reset value: %w", err) } - if timeNow().Unix() > epoch { - return 0, nil + if timeNow().Unix() > resumeAt { + return true, 0, nil } - return epoch, nil + return true, resumeAt, nil } diff --git a/x-pack/filebeat/input/httpjson/rate_limiter_test.go b/x-pack/filebeat/input/httpjson/rate_limiter_test.go index fe928eb4f3d..3fdb73fc44c 100644 --- a/x-pack/filebeat/input/httpjson/rate_limiter_test.go +++ b/x-pack/filebeat/input/httpjson/rate_limiter_test.go @@ -16,7 +16,7 @@ import ( ) // Test getRateLimit function with a remaining quota, expect to receive 0, nil. -func TestGetRateLimitReturns0IfRemainingQuota(t *testing.T) { +func TestGetRateLimitReturnsFalse0IfRemainingQuota(t *testing.T) { header := make(http.Header) header.Add("X-Rate-Limit-Limit", "120") header.Add("X-Rate-Limit-Remaining", "118") @@ -34,12 +34,13 @@ func TestGetRateLimitReturns0IfRemainingQuota(t *testing.T) { log: logp.NewLogger(""), } resp := &http.Response{Header: header} - epoch, err := rateLimit.getRateLimit(resp) + applied, resumeAt, err := rateLimit.getRateLimit(resp) assert.NoError(t, err) - assert.EqualValues(t, 0, epoch) + assert.False(t, applied) + assert.EqualValues(t, 0, resumeAt) } -func TestGetRateLimitReturns0IfEpochInPast(t *testing.T) { +func TestGetRateLimitReturnsTrue0IfResumeAtInPast(t *testing.T) { header := make(http.Header) header.Add("X-Rate-Limit-Limit", "10") header.Add("X-Rate-Limit-Remaining", "0") @@ -57,20 +58,21 @@ func TestGetRateLimitReturns0IfEpochInPast(t *testing.T) { log: logp.NewLogger(""), } resp := &http.Response{Header: header} - epoch, err := rateLimit.getRateLimit(resp) + applied, resumeAt, err := rateLimit.getRateLimit(resp) assert.NoError(t, err) - assert.EqualValues(t, 0, epoch) + assert.True(t, applied) + assert.EqualValues(t, 0, resumeAt) } func TestGetRateLimitReturnsResetValue(t *testing.T) { - epoch := int64(1604582732 + 100) + reset := int64(1604582732 + 100) timeNow = func() time.Time { return time.Unix(1604582732, 0).UTC() } t.Cleanup(func() { timeNow = time.Now }) header := make(http.Header) header.Add("X-Rate-Limit-Limit", "10") header.Add("X-Rate-Limit-Remaining", "0") - header.Add("X-Rate-Limit-Reset", strconv.FormatInt(epoch, 10)) + header.Add("X-Rate-Limit-Reset", strconv.FormatInt(reset, 10)) tplLimit := &valueTpl{} tplReset := &valueTpl{} tplRemaining := &valueTpl{} @@ -84,22 +86,23 @@ func TestGetRateLimitReturnsResetValue(t *testing.T) { log: logp.NewLogger(""), } resp := &http.Response{Header: header} - epoch2, err := rateLimit.getRateLimit(resp) + applied, resumeAt, err := rateLimit.getRateLimit(resp) assert.NoError(t, err) - assert.EqualValues(t, 1604582832, epoch2) + assert.True(t, applied) + assert.EqualValues(t, reset, resumeAt) } // Test getRateLimit function with a remaining quota, using default early limit -// expect to receive 0, nil. +// expect to receive false, 0, nil. func TestGetRateLimitReturns0IfEarlyLimit0(t *testing.T) { - resetEpoch := int64(1634579974 + 100) + resetAt := int64(1634579974 + 100) timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() } t.Cleanup(func() { timeNow = time.Now }) header := make(http.Header) header.Add("X-Rate-Limit-Limit", "120") header.Add("X-Rate-Limit-Remaining", "1") - header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10)) + header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetAt, 10)) tplLimit := &valueTpl{} tplReset := &valueTpl{} tplRemaining := &valueTpl{} @@ -115,22 +118,23 @@ func TestGetRateLimitReturns0IfEarlyLimit0(t *testing.T) { earlyLimit: earlyLimit, } resp := &http.Response{Header: header} - epoch, err := rateLimit.getRateLimit(resp) + applied, resumeAt, err := rateLimit.getRateLimit(resp) assert.NoError(t, err) - assert.EqualValues(t, 0, epoch) + assert.False(t, applied) + assert.EqualValues(t, 0, resumeAt) } // Test getRateLimit function with a remaining limit, but early limit -// expect to receive Reset Time +// expect to receive true, Reset Time func TestGetRateLimitReturnsResetValueIfEarlyLimit1(t *testing.T) { - resetEpoch := int64(1634579974 + 100) + resetAt := int64(1634579974 + 100) timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() } t.Cleanup(func() { timeNow = time.Now }) header := make(http.Header) header.Add("X-Rate-Limit-Limit", "120") header.Add("X-Rate-Limit-Remaining", "1") - header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10)) + header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetAt, 10)) tplLimit := &valueTpl{} tplReset := &valueTpl{} tplRemaining := &valueTpl{} @@ -146,22 +150,23 @@ func TestGetRateLimitReturnsResetValueIfEarlyLimit1(t *testing.T) { earlyLimit: earlyLimit, } resp := &http.Response{Header: header} - epoch, err := rateLimit.getRateLimit(resp) + applied, resumeAt, err := rateLimit.getRateLimit(resp) assert.NoError(t, err) - assert.EqualValues(t, resetEpoch, epoch) + assert.True(t, applied) + assert.EqualValues(t, resetAt, resumeAt) } // Test getRateLimit function with a remaining quota, using 90% early limit -// expect to receive 0, nil. +// expect to receive false, 0, nil. func TestGetRateLimitReturns0IfEarlyLimitPercent(t *testing.T) { - resetEpoch := int64(1634579974 + 100) + resetAt := int64(1634579974 + 100) timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() } t.Cleanup(func() { timeNow = time.Now }) header := make(http.Header) header.Add("X-Rate-Limit-Limit", "120") header.Add("X-Rate-Limit-Remaining", "13") - header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10)) + header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetAt, 10)) tplLimit := &valueTpl{} tplReset := &valueTpl{} tplRemaining := &valueTpl{} @@ -177,22 +182,23 @@ func TestGetRateLimitReturns0IfEarlyLimitPercent(t *testing.T) { earlyLimit: earlyLimit, } resp := &http.Response{Header: header} - epoch, err := rateLimit.getRateLimit(resp) + applied, resumeAt, err := rateLimit.getRateLimit(resp) assert.NoError(t, err) - assert.EqualValues(t, 0, epoch) + assert.False(t, applied) + assert.EqualValues(t, 0, resumeAt) } // Test getRateLimit function with a remaining limit, but early limit of 90% -// expect to receive Reset Time +// expect to receive true, Reset Time func TestGetRateLimitReturnsResetValueIfEarlyLimitPercent(t *testing.T) { - resetEpoch := int64(1634579974 + 100) + resetAt := int64(1634579974 + 100) timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() } t.Cleanup(func() { timeNow = time.Now }) header := make(http.Header) header.Add("X-Rate-Limit-Limit", "120") header.Add("X-Rate-Limit-Remaining", "12") - header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10)) + header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetAt, 10)) tplLimit := &valueTpl{} tplReset := &valueTpl{} tplRemaining := &valueTpl{} @@ -208,21 +214,22 @@ func TestGetRateLimitReturnsResetValueIfEarlyLimitPercent(t *testing.T) { earlyLimit: earlyLimit, } resp := &http.Response{Header: header} - epoch, err := rateLimit.getRateLimit(resp) + applied, resumeAt, err := rateLimit.getRateLimit(resp) assert.NoError(t, err) - assert.EqualValues(t, resetEpoch, epoch) + assert.True(t, applied) + assert.EqualValues(t, resetAt, resumeAt) } // Test getRateLimit function when "Limit" header is missing, when using a Percentage early-limit -// expect to receive 0, nil. (default rate-limiting) +// expect to receive false, 0, nil. (default rate-limiting) func TestGetRateLimitWhenMissingLimit(t *testing.T) { - resetEpoch := int64(1634579974 + 100) + reset := int64(1634579974 + 100) timeNow = func() time.Time { return time.Unix(1634579974, 0).UTC() } t.Cleanup(func() { timeNow = time.Now }) header := make(http.Header) header.Add("X-Rate-Limit-Remaining", "1") - header.Add("X-Rate-Limit-Reset", strconv.FormatInt(resetEpoch, 10)) + header.Add("X-Rate-Limit-Reset", strconv.FormatInt(reset, 10)) tplReset := &valueTpl{} tplRemaining := &valueTpl{} earlyLimit := func(i float64) *float64 { return &i }(0.9) @@ -236,7 +243,8 @@ func TestGetRateLimitWhenMissingLimit(t *testing.T) { earlyLimit: earlyLimit, } resp := &http.Response{Header: header} - epoch, err := rateLimit.getRateLimit(resp) + applied, resumeAt, err := rateLimit.getRateLimit(resp) assert.NoError(t, err) - assert.EqualValues(t, 0, epoch) + assert.False(t, applied) + assert.EqualValues(t, 0, resumeAt) } diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index 5612f2dc641..9e60d22ac49 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -236,19 +236,17 @@ func (rf *requestFactory) collectResponse(ctx context.Context, trCtx *transformC func (c *httpClient) do(ctx context.Context, req *http.Request) (*http.Response, error) { resp, err := c.limiter.execute(ctx, func() (*http.Response, error) { - return c.client.Do(req) + resp, err := c.client.Do(req) + if err == nil { + // Read the whole resp.Body so we can release the connection. + // This implementation is inspired by httputil.DumpResponse + resp.Body, err = drainBody(resp.Body) + } + return resp, err }) if err != nil { return nil, err } - defer resp.Body.Close() - - // Read the whole resp.Body so we can release the connection. - // This implementation is inspired by httputil.DumpResponse - resp.Body, err = drainBody(resp.Body) - if err != nil { - return nil, fmt.Errorf("failed to read response body: %w", err) - } if resp.StatusCode >= http.StatusBadRequest { body, _ := io.ReadAll(resp.Body) @@ -939,6 +937,8 @@ func cloneResponse(source *http.Response) (*http.Response, error) { // // This function is a modified version of drainBody from the http/httputil package. func drainBody(b io.ReadCloser) (r1 io.ReadCloser, err error) { + defer b.Close() + if b == nil || b == http.NoBody { // No copying needed. Preserve the magic sentinel meaning of NoBody. return http.NoBody, nil @@ -946,10 +946,10 @@ func drainBody(b io.ReadCloser) (r1 io.ReadCloser, err error) { var buf bytes.Buffer if _, err = buf.ReadFrom(b); err != nil { - return b, err + return b, fmt.Errorf("failed to read http.response.body: %w", err) } if err = b.Close(); err != nil { - return b, err + return b, fmt.Errorf("failed to close http.response.body: %w", err) } return io.NopCloser(&buf), nil