Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #17735 to 7.7: Fix issue 17734 to retry on rate-limit error in the Filebeat httpjson input. #17839

Merged
merged 2 commits into from
Apr 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix default index pattern in IBM MQ filebeat dashboard. {pull}17146[17146]
- Fix `elasticsearch.gc` fileset to not collect _all_ logs when Elasticsearch is running in Docker. {issue}13164[13164] {issue}16583[16583] {pull}17164[17164]
- Fixed a mapping exception when ingesting CEF logs that used the spriv or dpriv extensions. {issue}17216[17216] {pull}17220[17220]
- Fix issue 17734 to retry on rate-limit error in the Filebeat httpjson input. {issue}17734[17734] {pull}17735[17735]

*Heartbeat*

Expand Down
81 changes: 68 additions & 13 deletions x-pack/filebeat/input/httpjson/httpjson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"net/http"
"net/http/httptest"
"regexp"
"strconv"
"sync"
"testing"
"time"

"golang.org/x/sync/errgroup"

Expand All @@ -36,14 +38,8 @@ func testSetup(t *testing.T) {
})
}

func runTest(t *testing.T, isTLS bool, m map[string]interface{}, run func(input *HttpjsonInput, out *stubOutleter, t *testing.T)) {
testSetup(t)
// Create an http test server according to whether TLS is used
var newServer = httptest.NewServer
if isTLS {
newServer = httptest.NewTLSServer
}
ts := newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
func createServer(newServer func(handler http.Handler) *httptest.Server) *httptest.Server {
return newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost {
req, err := ioutil.ReadAll(r.Body)
defer r.Body.Close()
Expand Down Expand Up @@ -72,6 +68,44 @@ func runTest(t *testing.T, isTLS bool, m map[string]interface{}, run func(input
w.Write(b)
}
}))
}

func createCustomServer(newServer func(handler http.Handler) *httptest.Server) *httptest.Server {
var isRetry bool
return newServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
if !isRetry {
w.Header().Set("X-Rate-Limit-Limit", "0")
w.Header().Set("X-Rate-Limit-Remaining", "0")
w.Header().Set("X-Rate-Limit-Reset", strconv.FormatInt(time.Now().Unix(), 10))
w.WriteHeader(http.StatusTooManyRequests)
w.Write([]byte{})
isRetry = true
} else {
message := map[string]interface{}{
"hello": "world",
"embedded": map[string]string{
"hello": "world",
},
}
b, _ := json.Marshal(message)
w.WriteHeader(http.StatusOK)
w.Write(b)
}
}))
}

func runTest(t *testing.T, isTLS bool, testRateLimitRetry bool, m map[string]interface{}, run func(input *HttpjsonInput, out *stubOutleter, t *testing.T)) {
testSetup(t)
// Create an http test server according to whether TLS is used
var newServer = httptest.NewServer
if isTLS {
newServer = httptest.NewTLSServer
}
ts := createServer(newServer)
if testRateLimitRetry {
ts = createCustomServer(newServer)
}
defer ts.Close()
m["url"] = ts.URL
cfg := common.MustNewConfigFrom(m)
Expand Down Expand Up @@ -337,7 +371,7 @@ func TestGET(t *testing.T) {
"http_method": "GET",
"interval": 0,
}
runTest(t, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -359,7 +393,28 @@ func TestGetHTTPS(t *testing.T) {
"interval": 0,
"ssl.verification_mode": "none",
}
runTest(t, true, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, true, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

events, ok := out.waitForEvents(1)
if !ok {
t.Fatalf("Expected 1 events, but got %d.", len(events))
}
input.Stop()

if err := group.Wait(); err != nil {
t.Fatal(err)
}
})
}

func TestRateLimitRetry(t *testing.T) {
m := map[string]interface{}{
"http_method": "GET",
"interval": 0,
}
runTest(t, false, true, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -381,7 +436,7 @@ func TestPOST(t *testing.T) {
"http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}},
"interval": 0,
}
runTest(t, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -403,7 +458,7 @@ func TestRepeatedPOST(t *testing.T) {
"http_request_body": map[string]interface{}{"test": "abc", "testNested": map[string]interface{}{"testNested1": 123}},
"interval": 10 ^ 9,
}
runTest(t, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

Expand All @@ -424,7 +479,7 @@ func TestRunStop(t *testing.T) {
"http_method": "GET",
"interval": 0,
}
runTest(t, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
runTest(t, false, false, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
input.Run()
input.Stop()
input.Run()
Expand Down
6 changes: 6 additions & 0 deletions x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ func (in *HttpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl
}
if msg.StatusCode != http.StatusOK {
in.log.Debugw("HTTP request failed", "http.response.status_code", msg.StatusCode, "http.response.body", string(responseData))
if msg.StatusCode == http.StatusTooManyRequests {
if err = in.applyRateLimit(ctx, header, in.config.RateLimit); err != nil {
return err
}
continue
}
return errors.Errorf("http request was unsuccessful with a status code %d", msg.StatusCode)
}
var m, v interface{}
Expand Down