From 6b3d63e85df8386218118d2f6360783dcde43adf Mon Sep 17 00:00:00 2001 From: ShourieG <105607378+ShourieG@users.noreply.github.com> Date: Wed, 15 Mar 2023 14:53:39 +0530 Subject: [PATCH] [filebeat][httpjson] - Fix first_response false positive error by making it a flag based object (#34748) * introduced new config element that helps fix first_response bug by making it a flag based variable * updated asciidoc * added nolint comments for linting errors --- CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-httpjson.asciidoc | 9 ++-- .../input/httpjson/config_response.go | 1 + x-pack/filebeat/input/httpjson/encoding.go | 2 +- x-pack/filebeat/input/httpjson/input_test.go | 15 +++--- x-pack/filebeat/input/httpjson/pagination.go | 1 + x-pack/filebeat/input/httpjson/request.go | 52 +++++++++++-------- 7 files changed, 48 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7529c44acb7..7496e5f421e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -96,6 +96,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Prevent Elasticsearch from spewing log warnings about redundant wildcard when setting up ingest pipelines. {issue}34249[34249] {pull}34550[34550] - Gracefully handle Windows event channel not found errors in winlog input. {issue}30201[30201] {pull}34605[34605] - Fix the issue of `cometd` input worker getting closed in case of a network connection issue and an EOF error. {issue}34326[34326] {pull}34327[34327] +- Fix for httpjson first_response object throwing false positive errors by making it a flag based object {issue}34747[34747] {pull}34748[34748] - Fix errors and panics due to re-used processors {pull}34761[34761] - Add missing Basic Authentication support to CEL input {issue}34609[34609] {pull}34689[34689] diff --git a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc index 7d69c6b195a..92fd6d15936 100644 --- a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc @@ -608,8 +608,9 @@ does not exist at the root level, please use the clause `.first_response.` to ac Here we can see that the chain step uses `.parent_last_response.body.exportId` only because `response.pagination` is present for the parent (root) request. However if `response.pagination` was not present in the parent (root) request, `replace_with` clause should have used `.first_response.body.exportId`. This is -because when pagination does not exist at the parent level `parent_last_response` object is not populated with required values for performance reasons, but the -`first_response` object always stores the very first response in the process chain. +because when pagination does not exist at the parent level `parent_last_response` object is not populated with required values for performance reasons, but the `first_response` object always stores the very first response in the process chain. + +NOTE: The `first_response` object at the moment can only store flat JSON structures (i.e. no support for JSONS having array at root level, NDJSON or Gzipped JSON), hence it should only be used in scenarios where the this is the case. Splits cannot be performed on `first_response`. It needs to be explicitly enabled by setting the flag `response.save_first_response` to `true` in the httpjson config. [float] ==== `request.tracer.filename` @@ -1293,8 +1294,8 @@ This behaviour of targeted fixed pattern replacement in the url helps solve vari the context variables. The value expression: `.first_response.`, on processing, will result in an array `[first_response ""]` where the key to be extrated becomes `"" (an empty string)`, which has no definition within any context variable. -NOTE: Fixed patterns must not contain commas in their definition. String replacement patterns are matched by the `replace_with` processor with exact string matching. - +NOTE: Fixed patterns must not contain commas in their definition. String replacement patterns are matched by the `replace_with` processor with exact string matching. The `first_response` object at the moment can only store flat JSON structures (i.e. no support for JSONS having array at root level, NDJSON or Gzipped JSON), hence it should only be used in scenarios where the this is the case. Splits cannot be performed on `first_response`. It needs to be explicitly enabled by setting the flag `response.save_first_response` to `true` in the httpjson config. + [float] ==== `chain[].while` diff --git a/x-pack/filebeat/input/httpjson/config_response.go b/x-pack/filebeat/input/httpjson/config_response.go index 669875a7265..61900c7de5a 100644 --- a/x-pack/filebeat/input/httpjson/config_response.go +++ b/x-pack/filebeat/input/httpjson/config_response.go @@ -21,6 +21,7 @@ type responseConfig struct { Transforms transformsConfig `config:"transforms"` Pagination transformsConfig `config:"pagination"` Split *splitConfig `config:"split"` + SaveFirstResponse bool `config:"save_first_response"` } type splitConfig struct { diff --git a/x-pack/filebeat/input/httpjson/encoding.go b/x-pack/filebeat/input/httpjson/encoding.go index d260bf414a2..cc0a59dbc99 100644 --- a/x-pack/filebeat/input/httpjson/encoding.go +++ b/x-pack/filebeat/input/httpjson/encoding.go @@ -210,7 +210,7 @@ func decodeAsZip(p []byte, dst *response) error { } dst.body = results - if dst.header == nil { //nolint:errorlint // golangci-lint-action #624 + if dst.header == nil { dst.header = http.Header{} } dst.header["X-Zip-Files"] = names diff --git a/x-pack/filebeat/input/httpjson/input_test.go b/x-pack/filebeat/input/httpjson/input_test.go index fa39f538e4a..5bcd7b157a2 100644 --- a/x-pack/filebeat/input/httpjson/input_test.go +++ b/x-pack/filebeat/input/httpjson/input_test.go @@ -730,8 +730,9 @@ func TestInput(t *testing.T) { t.Cleanup(server.Close) }, baseConfig: map[string]interface{}{ - "interval": 1, - "request.method": http.MethodGet, + "interval": 1, + "request.method": http.MethodGet, + "response.save_first_response": true, "chain": []interface{}{ map[string]interface{}{ "step": map[string]interface{}{ @@ -808,8 +809,9 @@ func TestInput(t *testing.T) { t.Cleanup(server.Close) }, baseConfig: map[string]interface{}{ - "interval": 1, - "request.method": http.MethodGet, + "interval": 1, + "request.method": http.MethodGet, + "response.save_first_response": true, "chain": []interface{}{ map[string]interface{}{ "step": map[string]interface{}{ @@ -844,8 +846,9 @@ func TestInput(t *testing.T) { t.Cleanup(server.Close) }, baseConfig: map[string]interface{}{ - "interval": 1, - "request.method": http.MethodGet, + "interval": 1, + "request.method": http.MethodGet, + "response.save_first_response": true, "chain": []interface{}{ map[string]interface{}{ "step": map[string]interface{}{ diff --git a/x-pack/filebeat/input/httpjson/pagination.go b/x-pack/filebeat/input/httpjson/pagination.go index 9e9d8b02d35..c8b8d128d84 100644 --- a/x-pack/filebeat/input/httpjson/pagination.go +++ b/x-pack/filebeat/input/httpjson/pagination.go @@ -138,6 +138,7 @@ func (iter *pageIterator) next() (*response, bool, error) { return nil, false, err } + //nolint:bodyclose // response body is closed through drainBody method resp, err := iter.pagination.httpClient.do(iter.stdCtx, httpReq) if err != nil { return nil, false, err diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index 3a04d0a3d72..9832275a326 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -106,6 +106,7 @@ type requestFactory struct { until *valueTpl chainHTTPClient *httpClient chainResponseProcessor *responseProcessor + saveFirstResponse bool } func newRequestFactory(ctx context.Context, config config, log *logp.Logger) ([]*requestFactory, error) { @@ -114,12 +115,13 @@ func newRequestFactory(ctx context.Context, config config, log *logp.Logger) ([] ts, _ := newBasicTransformsFromConfig(config.Request.Transforms, requestNamespace, log) // regular call requestFactory object rf := &requestFactory{ - url: *config.Request.URL.URL, - method: config.Request.Method, - body: config.Request.Body, - transforms: ts, - log: log, - encoder: registeredEncoders[config.Request.EncodeAs], + url: *config.Request.URL.URL, + method: config.Request.Method, + body: config.Request.Body, + transforms: ts, + log: log, + encoder: registeredEncoders[config.Request.EncodeAs], + saveFirstResponse: config.Response.SaveFirstResponse, } if config.Auth != nil && config.Auth.Basic.isEnabled() { rf.user = config.Auth.Basic.User @@ -292,6 +294,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p chainIndex int ) + //nolint:bodyclose // response body is closed through drainBody method for i, rf := range r.requestFactories { finalResps = nil intermediateResps = nil @@ -302,23 +305,26 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p if err != nil { return fmt.Errorf("failed to execute rf.collectResponse: %w", err) } - // store first response in transform context - var bodyMap map[string]interface{} - body, err := io.ReadAll(httpResp.Body) - if err != nil { - return fmt.Errorf("failed to read http response body: %w", err) - } - httpResp.Body = io.NopCloser(bytes.NewReader(body)) - err = json.Unmarshal(body, &bodyMap) - if err != nil { - r.log.Errorf("unable to unmarshal first_response.body: %v", err) - } - firstResponse := response{ - url: *httpResp.Request.URL, - header: httpResp.Header.Clone(), - body: bodyMap, + + if rf.saveFirstResponse { + // store first response in transform context + var bodyMap map[string]interface{} + body, err := io.ReadAll(httpResp.Body) + if err != nil { + return fmt.Errorf("failed to read http response body: %w", err) + } + httpResp.Body = io.NopCloser(bytes.NewReader(body)) + err = json.Unmarshal(body, &bodyMap) + if err != nil { + r.log.Errorf("unable to unmarshal first_response.body: %v", err) + } + firstResponse := response{ + url: *httpResp.Request.URL, + header: httpResp.Header.Clone(), + body: bodyMap, + } + trCtx.updateFirstResponse(firstResponse) } - trCtx.updateFirstResponse(firstResponse) if len(r.requestFactories) == 1 { finalResps = append(finalResps, httpResp) @@ -577,6 +583,8 @@ func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *t } // processChainPaginationEvents takes a pagination response as input and runs all the chain blocks for the input +// +//nolint:bodyclose // response body is closed through drainBody method func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, response *http.Response, chainIndex int, log *logp.Logger) (int, error) { var ( n int