Skip to content

Commit

Permalink
[filebeat][httpjson] - Fix first_response false positive error by mak…
Browse files Browse the repository at this point in the history
…ing it a flag based object (#34748)

* introduced new config element that helps fix first_response bug by making it a flag based variable

* updated asciidoc

* added nolint comments for linting errors
  • Loading branch information
ShourieG authored Mar 15, 2023
1 parent 710bcfb commit 6b3d63e
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Prevent Elasticsearch from spewing log warnings about redundant wildcard when setting up ingest pipelines. {issue}34249[34249] {pull}34550[34550]
- Gracefully handle Windows event channel not found errors in winlog input. {issue}30201[30201] {pull}34605[34605]
- Fix the issue of `cometd` input worker getting closed in case of a network connection issue and an EOF error. {issue}34326[34326] {pull}34327[34327]
- Fix for httpjson first_response object throwing false positive errors by making it a flag based object {issue}34747[34747] {pull}34748[34748]
- Fix errors and panics due to re-used processors {pull}34761[34761]
- Add missing Basic Authentication support to CEL input {issue}34609[34609] {pull}34689[34689]

Expand Down
9 changes: 5 additions & 4 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -608,8 +608,9 @@ does not exist at the root level, please use the clause `.first_response.` to ac

Here we can see that the chain step uses `.parent_last_response.body.exportId` only because `response.pagination` is present for the parent (root) request.
However if `response.pagination` was not present in the parent (root) request, `replace_with` clause should have used `.first_response.body.exportId`. This is
because when pagination does not exist at the parent level `parent_last_response` object is not populated with required values for performance reasons, but the
`first_response` object always stores the very first response in the process chain.
because when pagination does not exist at the parent level `parent_last_response` object is not populated with required values for performance reasons, but the `first_response` object always stores the very first response in the process chain.

NOTE: The `first_response` object at the moment can only store flat JSON structures (i.e. no support for JSONS having array at root level, NDJSON or Gzipped JSON), hence it should only be used in scenarios where the this is the case. Splits cannot be performed on `first_response`. It needs to be explicitly enabled by setting the flag `response.save_first_response` to `true` in the httpjson config.

[float]
==== `request.tracer.filename`
Expand Down Expand Up @@ -1293,8 +1294,8 @@ This behaviour of targeted fixed pattern replacement in the url helps solve vari
the context variables. The value expression: `.first_response.`, on processing, will result in an array `[first_response ""]` where the key to be
extrated becomes `"" (an empty string)`, which has no definition within any context variable.

NOTE: Fixed patterns must not contain commas in their definition. String replacement patterns are matched by the `replace_with` processor with exact string matching.

NOTE: Fixed patterns must not contain commas in their definition. String replacement patterns are matched by the `replace_with` processor with exact string matching. The `first_response` object at the moment can only store flat JSON structures (i.e. no support for JSONS having array at root level, NDJSON or Gzipped JSON), hence it should only be used in scenarios where the this is the case. Splits cannot be performed on `first_response`. It needs to be explicitly enabled by setting the flag `response.save_first_response` to `true` in the httpjson config.
[float]
==== `chain[].while`

Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/httpjson/config_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type responseConfig struct {
Transforms transformsConfig `config:"transforms"`
Pagination transformsConfig `config:"pagination"`
Split *splitConfig `config:"split"`
SaveFirstResponse bool `config:"save_first_response"`
}

type splitConfig struct {
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func decodeAsZip(p []byte, dst *response) error {
}

dst.body = results
if dst.header == nil { //nolint:errorlint // golangci-lint-action #624
if dst.header == nil {
dst.header = http.Header{}
}
dst.header["X-Zip-Files"] = names
Expand Down
15 changes: 9 additions & 6 deletions x-pack/filebeat/input/httpjson/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,8 +730,9 @@ func TestInput(t *testing.T) {
t.Cleanup(server.Close)
},
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodGet,
"interval": 1,
"request.method": http.MethodGet,
"response.save_first_response": true,
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
Expand Down Expand Up @@ -808,8 +809,9 @@ func TestInput(t *testing.T) {
t.Cleanup(server.Close)
},
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodGet,
"interval": 1,
"request.method": http.MethodGet,
"response.save_first_response": true,
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
Expand Down Expand Up @@ -844,8 +846,9 @@ func TestInput(t *testing.T) {
t.Cleanup(server.Close)
},
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodGet,
"interval": 1,
"request.method": http.MethodGet,
"response.save_first_response": true,
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/httpjson/pagination.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func (iter *pageIterator) next() (*response, bool, error) {
return nil, false, err
}

//nolint:bodyclose // response body is closed through drainBody method
resp, err := iter.pagination.httpClient.do(iter.stdCtx, httpReq)
if err != nil {
return nil, false, err
Expand Down
52 changes: 30 additions & 22 deletions x-pack/filebeat/input/httpjson/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type requestFactory struct {
until *valueTpl
chainHTTPClient *httpClient
chainResponseProcessor *responseProcessor
saveFirstResponse bool
}

func newRequestFactory(ctx context.Context, config config, log *logp.Logger) ([]*requestFactory, error) {
Expand All @@ -114,12 +115,13 @@ func newRequestFactory(ctx context.Context, config config, log *logp.Logger) ([]
ts, _ := newBasicTransformsFromConfig(config.Request.Transforms, requestNamespace, log)
// regular call requestFactory object
rf := &requestFactory{
url: *config.Request.URL.URL,
method: config.Request.Method,
body: config.Request.Body,
transforms: ts,
log: log,
encoder: registeredEncoders[config.Request.EncodeAs],
url: *config.Request.URL.URL,
method: config.Request.Method,
body: config.Request.Body,
transforms: ts,
log: log,
encoder: registeredEncoders[config.Request.EncodeAs],
saveFirstResponse: config.Response.SaveFirstResponse,
}
if config.Auth != nil && config.Auth.Basic.isEnabled() {
rf.user = config.Auth.Basic.User
Expand Down Expand Up @@ -292,6 +294,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
chainIndex int
)

//nolint:bodyclose // response body is closed through drainBody method
for i, rf := range r.requestFactories {
finalResps = nil
intermediateResps = nil
Expand All @@ -302,23 +305,26 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
if err != nil {
return fmt.Errorf("failed to execute rf.collectResponse: %w", err)
}
// store first response in transform context
var bodyMap map[string]interface{}
body, err := io.ReadAll(httpResp.Body)
if err != nil {
return fmt.Errorf("failed to read http response body: %w", err)
}
httpResp.Body = io.NopCloser(bytes.NewReader(body))
err = json.Unmarshal(body, &bodyMap)
if err != nil {
r.log.Errorf("unable to unmarshal first_response.body: %v", err)
}
firstResponse := response{
url: *httpResp.Request.URL,
header: httpResp.Header.Clone(),
body: bodyMap,

if rf.saveFirstResponse {
// store first response in transform context
var bodyMap map[string]interface{}
body, err := io.ReadAll(httpResp.Body)
if err != nil {
return fmt.Errorf("failed to read http response body: %w", err)
}
httpResp.Body = io.NopCloser(bytes.NewReader(body))
err = json.Unmarshal(body, &bodyMap)
if err != nil {
r.log.Errorf("unable to unmarshal first_response.body: %v", err)
}
firstResponse := response{
url: *httpResp.Request.URL,
header: httpResp.Header.Clone(),
body: bodyMap,
}
trCtx.updateFirstResponse(firstResponse)
}
trCtx.updateFirstResponse(firstResponse)

if len(r.requestFactories) == 1 {
finalResps = append(finalResps, httpResp)
Expand Down Expand Up @@ -577,6 +583,8 @@ func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *t
}

// processChainPaginationEvents takes a pagination response as input and runs all the chain blocks for the input
//
//nolint:bodyclose // response body is closed through drainBody method
func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, response *http.Response, chainIndex int, log *logp.Logger) (int, error) {
var (
n int
Expand Down

0 comments on commit 6b3d63e

Please sign in to comment.