Skip to content

Commit

Permalink
[filebeat][httpjson] - Separation of global transform contexts and in…
Browse files Browse the repository at this point in the history
…troduction of parent transform context within chains (#33499)

* initial commit for transform context separation and introduction of parent object

* removed ioutil from tests

* updated asciidoc changelog

* added support for dot's in values and expressions for replace_with clause, added doc updates and tests

* added linter ignores for errcheck in test scenario

* updated as per pr suggetions

* added processExpression Tests
  • Loading branch information
ShourieG authored and chrisberkhout committed Jun 1, 2023
1 parent 466b95d commit bda7384
Show file tree
Hide file tree
Showing 8 changed files with 375 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix handling of invalid UserIP and LocalIP values. {pull}32896[32896]
- Allow http_endpoint instances to share ports. {issue}32578[32578] {pull}33377[33377]
- Improve httpjson documentation for split processor. {pull}33473[33473]
- Added separation of transform context object inside httpjson. Introduced new clause `.parent_last_response.*` {pull}33499[33499]

*Auditbeat*

Expand Down
62 changes: 57 additions & 5 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ List of transforms to apply to the request before each execution.

Available transforms for request: [`append`, `delete`, `set`].

Can read state from: [`.first_response.*`,`.last_response.*`, `.last_event.*`, `.cursor.*`, `.header.*`, `.url.*`, `.body.*`].
Can read state from: [`.first_response.*`,`.last_response.*`, `.parent_last_response.*` `.last_event.*`, `.cursor.*`, `.header.*`, `.url.*`, `.body.*`].

Can write state to: [`body.*`, `header.*`, `url.*`].

Expand All @@ -547,6 +547,47 @@ filebeat.inputs:
value: '[[now (parseDuration "-1h")]]'
----

NOTE: The clause `.parent_last_response.` should only be used from within chain steps and when pagination exists at the root request level. If pagination
does not exist at the root level, please use the clause `.first_response.` to access parent response object from within chains. You can look at this
<<parent-last-response,example>> below for a better idea.


["source","yaml",subs="attributes",id="parent-last-response"]
filebeat.inputs:
- type: httpjson
enabled: true
id: my-httpjson-id
request.url: http://xyz.com/services/data/v1.0/export_ids/page
request.method: POST
interval: 1h
request.retry.max_attempts: 2
request.retry.wait_min: 5s
request.transforms:
- set:
target: body.page
value: 0
response.request_body_on_pagination: true
response.pagination:
- set:
target: body.page
value: '[[ .last_response.body.page ]]'
fail_on_template_error: true
chain:
- step:
request.url: http://xyz.com/services/data/v1.0/$.exportId/export_ids/$.files[:].id/info
request.method: POST
request.transforms:
- set:
target: body.exportId
value: '[[ .parent_last_response.body.exportId ]]'
replace: $.files[:].id
replace_with: '$.exportId,.parent_last_response.body.exportId'

Here we can see that the chain step uses `.parent_last_response.body.exportId` only because `response.pagination` is present for the parent (root) request.
However if `response.pagination` was not present in the parent (root) request, `replace_with` clause should have used `.first_response.body.exportId`. This is
because when pagination does not exist at the parent level `parent_last_response` object is not populated with required values for performance reasons, but the
`first_response` object always stores the very first response in the process chain.

[float]
==== `request.tracer.filename`

Expand Down Expand Up @@ -1141,7 +1182,7 @@ Collect and make events from response in any format supported by httpjson for al

The `replace_with: "pattern,value"` clause is used to replace a fixed pattern string defined in `request.url` with the given value.
The fixed pattern must have a `$.` prefix, for example: `$.xyz`. The `value` may be hard coded or extracted from context variables
like [`.last_response.*`, `.first_response.*`] etc. The `replace_with` clause can be used in combination with the `replace` clause
like [`.last_response.*`, `.first_response.*`, `.parent_last_response.*`] etc. The `replace_with` clause can be used in combination with the `replace` clause
thus providing a lot of flexibility in the logic of chain requests.

Example:
Expand All @@ -1167,7 +1208,7 @@ filebeat.inputs:
- step:
request.url: https://example.com/services/data/v1.0/$.exportId/files
request.method: GET
replace_with: '$.exportId,first_response.body.exportId'
replace_with: '$.exportId,.first_response.body.exportId'
----

Example:
Expand Down Expand Up @@ -1217,8 +1258,19 @@ response_json using exportId as '2212':
----
This behaviour of targeted fixed pattern replacement in the url helps solve various use cases.

NOTE: Fixed patterns must not contain commas in their definition. String replacement patterns are matched by the
`replace_with` processor with exact string matching.
**Some useful points to remember:- **

1. If you want the `value` to be treated as an expression to be evaluated for data extraction from context variables, it should always have a
**single '.' (dot) prefix**. Example: `replace_with: '$.exportId,.first_response.body.exportId'`. Anything more or less will have the internal
processor treat it as a hard coded value, `replace_with: '$.exportId,..first_response.body.exportId'` (more than one '.' (dot) as prefix) or
`replace_with:'$.exportId,first_response.body.exportId'` (no '.' dot as prefix)

2. Incomplete `value expressions` will cause an error while processing. Example: `replace_with: '$.exportId,.first_response.'`, `replace_with:
'$.exportId,.last_response.'` etc. These expressions are incomplete because they do not evaluate down to a valid key that can be extracted from
the context variables. The value expression: `.first_response.`, on processing, will result in an array `[first_response ""]` where the key to be
extrated becomes `"" (an empty string)`, which has no definition within any context variable.

NOTE: Fixed patterns must not contain commas in their definition. String replacement patterns are matched by the `replace_with` processor with exact string matching.

[float]
==== `chain[].while`
Expand Down
227 changes: 223 additions & 4 deletions x-pack/filebeat/input/httpjson/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package httpjson
import (
"context"
"fmt"
"io/ioutil"
"io"
"math/rand"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -683,17 +683,227 @@ func TestInput(t *testing.T) {
"step": map[string]interface{}{
"request.method": http.MethodGet,
"replace": "$.files[:].id",
"replace_with": "$.exportId,first_response.body.exportId",
"replace_with": "$.exportId,.first_response.body.exportId",
},
},
},
},
expected: []string{
`{"hello":{"world":"moon"}}`,
`{"space":{"cake":"pumpkin"}}`,
},
},
{
name: "Test replace_with clause with hardcoded value_1",
setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
fmt.Fprintln(w, `{"files":[{"id":"1"},{"id":"2"}]}`)
case "/2212/1":
fmt.Fprintln(w, `{"hello":{"world":"moon"}}`)
case "/2212/2":
fmt.Fprintln(w, `{"space":{"cake":"pumpkin"}}`)
}
})
server := httptest.NewServer(r)
config["request.url"] = server.URL
config["chain.0.step.request.url"] = server.URL + "/$.exportId/$.files[:].id"
t.Cleanup(server.Close)
},
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodGet,
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
"request.method": http.MethodGet,
"replace": "$.files[:].id",
"replace_with": "$.exportId,2212",
},
},
},
},
expected: []string{
`{"hello":{"world":"moon"}}`,
`{"space":{"cake":"pumpkin"}}`,
},
},
{
name: "Test replace_with clause with hardcoded value (no dot prefix)",
setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
fmt.Fprintln(w, `{"files":[{"id":"1"},{"id":"2"}]}`)
case "/first_response.body.id/1":
fmt.Fprintln(w, `{"hello":{"world":"moon"}}`)
case "/first_response.body.id/2":
fmt.Fprintln(w, `{"space":{"cake":"pumpkin"}}`)
}
})
server := httptest.NewServer(r)
config["request.url"] = server.URL
config["chain.0.step.request.url"] = server.URL + "/$.exportId/$.files[:].id"
t.Cleanup(server.Close)
},
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodGet,
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
"request.method": http.MethodGet,
"replace": "$.files[:].id",
"replace_with": "$.exportId,first_response.body.id",
},
},
},
},
handler: defaultHandler(http.MethodGet, "", ""),
expected: []string{
`{"hello":{"world":"moon"}}`,
`{"space":{"cake":"pumpkin"}}`,
},
},
{
name: "Test replace_with clause with hardcoded value (more than one dot prefix)",
setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
fmt.Fprintln(w, `{"files":[{"id":"1"},{"id":"2"}]}`)
case "/..first_response.body.id/1":
fmt.Fprintln(w, `{"hello":{"world":"moon"}}`)
case "/..first_response.body.id/2":
fmt.Fprintln(w, `{"space":{"cake":"pumpkin"}}`)
}
})
server := httptest.NewServer(r)
config["request.url"] = server.URL
config["chain.0.step.request.url"] = server.URL + "/$.exportId/$.files[:].id"
t.Cleanup(server.Close)
},
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodGet,
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
"request.method": http.MethodGet,
"replace": "$.files[:].id",
"replace_with": "$.exportId,..first_response.body.id",
},
},
},
},
expected: []string{
`{"hello":{"world":"moon"}}`,
`{"space":{"cake":"pumpkin"}}`,
},
},
{
name: "Test replace_with clause with hardcoded value containing '.' (dots)",
setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
fmt.Fprintln(w, `{"files":[{"id":"1"},{"id":"2"}]}`)
case "/.xyz.2212.abc./1":
fmt.Fprintln(w, `{"hello":{"world":"moon"}}`)
case "/.xyz.2212.abc./2":
fmt.Fprintln(w, `{"space":{"cake":"pumpkin"}}`)
}
})
server := httptest.NewServer(r)
config["request.url"] = server.URL
config["chain.0.step.request.url"] = server.URL + "/$.exportId/$.files[:].id"
t.Cleanup(server.Close)
},
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodGet,
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
"request.method": http.MethodGet,
"replace": "$.files[:].id",
"replace_with": "$.exportId,.xyz.2212.abc.",
},
},
},
},
expected: []string{
`{"hello":{"world":"moon"}}`,
`{"space":{"cake":"pumpkin"}}`,
},
},
{
name: "Test global transform context separation with parent_last_response object",
setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
var serverURL string
registerPaginationTransforms()
registerRequestTransforms()
r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
fmt.Fprintf(w, `{"files":[{"id":"1"},{"id":"2"}],"exportId":"2212", "nextLink":"%s/link1"}`, serverURL)
case "/link1":
fmt.Fprintln(w, `{"files":[{"id":"3"},{"id":"4"}], "exportId":"2213"}`)
case "/2212/1":
matchBody(w, r, `{"exportId":"2212"}`, `{"hello":{"world":"moon"}}`)
case "/2212/2":
matchBody(w, r, `{"exportId":"2212"}`, `{"space":{"cake":"pumpkin"}}`)
case "/2213/3":
matchBody(w, r, `{"exportId":"2213"}`, `{"hello":{"cake":"pumpkin"}}`)
case "/2213/4":
matchBody(w, r, `{"exportId":"2213"}`, `{"space":{"world":"moon"}}`)
}
})
server := httptest.NewServer(r)
t.Cleanup(func() { registeredTransforms = newRegistry() })
config["request.url"] = server.URL
serverURL = server.URL
config["chain.0.step.request.url"] = server.URL + "/$.exportId/$.files[:].id"
t.Cleanup(server.Close)
},
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodPost,
"response.request_body_on_pagination": true,
"response.pagination": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "url.value",
"value": "[[.last_response.body.nextLink]]",
"fail_on_template_error": true,
},
},
},
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
"request.method": http.MethodPost,
"replace": "$.files[:].id",
"replace_with": "$.exportId,.parent_last_response.body.exportId",
"request.transforms": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "body.exportId",
"value": "[[ .parent_last_response.body.exportId ]]",
},
},
},
},
},
},
},
expected: []string{
`{"hello":{"world":"moon"}}`,
`{"space":{"cake":"pumpkin"}}`,
`{"hello":{"cake":"pumpkin"}}`,
`{"space":{"world":"moon"}}`,
},
},
}

for _, testCase := range testCases {
Expand Down Expand Up @@ -826,6 +1036,15 @@ func newV2Context() (v2.Context, func()) {
}, cancel
}

//nolint:errcheck // We can safely ignore errors here
func matchBody(w io.Writer, req *http.Request, match, response string) {
body, _ := io.ReadAll(req.Body)
req.Body.Close()
if string(body) == match {
w.Write([]byte(response))
}
}

func defaultHandler(expectedMethod, expectedBody, msg string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("content-type", "application/json")
Expand All @@ -837,7 +1056,7 @@ func defaultHandler(expectedMethod, expectedBody, msg string) http.HandlerFunc {
w.WriteHeader(http.StatusBadRequest)
msg = fmt.Sprintf(`{"error":"expected method was %q"}`, expectedMethod)
case expectedBody != "":
body, _ := ioutil.ReadAll(r.Body)
body, _ := io.ReadAll(r.Body)
r.Body.Close()
if expectedBody != string(body) {
w.WriteHeader(http.StatusBadRequest)
Expand Down
Loading

0 comments on commit bda7384

Please sign in to comment.