Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/httpjson: provide an approach to use complete URL replacements #37486

Merged
merged 1 commit into from
Jan 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d
- Suppress and log max HTTP request retry errors in CEL input. {pull}37160[37160]
- Prevent CEL input from re-entering the eval loop when an evaluation failed. {pull}37161[37161]
- Update CEL extensions library to v1.7.0. {pull}37172[37172]
- Add support for complete URL replacement in HTTPJSON chain steps. {pull}37486[37486]

*Auditbeat*

Expand Down
52 changes: 52 additions & 0 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1226,6 +1226,30 @@ request_url using file_name as 'file_2': \https://example.com/services/data/v1.0
+
Collect and make events from response in any format supported by httpjson for all calls.

+
Note that since `request.url` must be a valid URL, if an API returns complete URLs in place of an identifier as in the example above, it would not be possible to use the JSON Path syntax. To achieve the desired result in this case an opaque URI syntax can be used. An opaque URI has an arbitrary scheme and opaque text separated by a colon. When the replacement is done, the scheme and colon are stripped from the URI prior to the replacement and the remaining opaque text is used as the replacement target. In the following example, the scheme is "placeholder".

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: httpjson
enabled: true
# first call
request.url: https://example.com/services/data/v1.0/records
interval: 1h
chain:
# second call
- step:
request.url: placeholder:$.records[:]
request.method: GET
replace: $.records[:]
# third call
- step:
request.url: placeholder:$.file_name
request.method: GET
replace: $.file_name
----

+
[[chain-step-replace_with]]
[float]
Expand Down Expand Up @@ -1478,6 +1502,34 @@ response_json using id as '2':
+
Collect and make events from response in any format supported by httpjson for all calls.

+
Note that since `request.url` must be a valid URL, if an API returns complete URLs in place of an identifier as in the example above, it would not be possible to use the JSON Path syntax. To achieve the desired result in this case an opaque URI syntax can be used. An opaque URI has an arbitrary scheme and opaque text separated by a colon. When the replacement is done, the scheme and colon are stripped from the URI prior to the replacement and the remaining opaque text is used as the replacement target. In the following example, the scheme is "placeholder".

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: httpjson
enabled: true
# first call
id: my-httpjson-id
request.url: http://example.com/services/data/v1.0/exports
interval: 1h
chain:
# second call
- while:
request.url: placeholder:$.exportId
request.method: GET
replace: $.exportId
until: '[[ eq .last_response.body.status "completed" ]]'
request.retry.max_attempts: 5
request.retry.wait_min: 5s
# third call
- step:
request.url: placeholder:$.files[:]
request.method: GET
replace: $.files[:]
----

NOTE: httpjson chain will only create and ingest events from last call on chained configurations. Also, the current chain only supports the following: all <<request-parameters, request parameters>>, <<response-transforms, response.transforms>> and <<response-split, response.split>>.

[float]
Expand Down
38 changes: 38 additions & 0 deletions x-pack/filebeat/input/httpjson/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,25 @@
handler: defaultHandler(http.MethodGet, "", ""),
expected: []string{`{"hello":[{"world":"moon"},{"space":[{"cake":"pumpkin"}]}]}`},
},
{
name: "simple_naked_Chain_GET_request",
setupServer: newNakedChainTestServer(httptest.NewServer),
baseConfig: map[string]interface{}{
"interval": 10,
"request.method": http.MethodGet,
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
"request.url": "placeholder:$.records[:]",
"request.method": http.MethodGet,
"replace": "$.records[:]",
},
},
},
},
handler: defaultHandler(http.MethodGet, "", ""),
expected: []string{`{"hello":[{"world":"moon"},{"space":[{"cake":"pumpkin"}]}]}`},
},
{
name: "multiple_Chain_GET_request",
setupServer: func(t testing.TB, h http.HandlerFunc, config map[string]interface{}) {
Expand Down Expand Up @@ -1173,9 +1192,9 @@
</item>
</order>
`
io.ReadAll(r.Body)

Check failure on line 1195 in x-pack/filebeat/input/httpjson/input_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value of `io.ReadAll` is not checked (errcheck)

Check failure on line 1195 in x-pack/filebeat/input/httpjson/input_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value of `io.ReadAll` is not checked (errcheck)
r.Body.Close()
w.Write([]byte(text))

Check failure on line 1197 in x-pack/filebeat/input/httpjson/input_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value of `w.Write` is not checked (errcheck)

Check failure on line 1197 in x-pack/filebeat/input/httpjson/input_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value of `w.Write` is not checked (errcheck)
})
server := httptest.NewServer(r)
config["request.url"] = server.URL
Expand Down Expand Up @@ -1419,6 +1438,25 @@
}
}

func newNakedChainTestServer(
newServer func(http.Handler) *httptest.Server,
) func(testing.TB, http.HandlerFunc, map[string]interface{}) {
return func(t testing.TB, h http.HandlerFunc, config map[string]interface{}) {
var server *httptest.Server
r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
fmt.Fprintln(w, `{"records":["`+server.URL+`/1"]}`)
case "/1":
fmt.Fprintln(w, `{"hello":[{"world":"moon"},{"space":[{"cake":"pumpkin"}]}]}`)
}
})
server = httptest.NewServer(r)
config["request.url"] = server.URL
t.Cleanup(server.Close)
}
}

func newChainPaginationTestServer(
newServer func(http.Handler) *httptest.Server,
) func(testing.TB, http.HandlerFunc, map[string]interface{}) {
Expand Down
19 changes: 18 additions & 1 deletion x-pack/filebeat/input/httpjson/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,8 +714,25 @@ func (r *requester) processChainPaginationEvents(ctx context.Context, trCtx *tra
return n, nil
}

// generateNewUrl returns new url value using replacement from oldUrl with ids
// generateNewUrl returns new url value using replacement from oldUrl with ids.
// If oldUrl is an opaque URL, the scheme: is dropped and the remaining string
// is used as the replacement target. For example
//
// placeholder:$.result[:]
//
// becomes
//
// $.result[:]
//
// which is now the replacement target.
func generateNewUrl(replacement, oldUrl, id string) (url.URL, error) {
u, err := url.Parse(oldUrl)
if err != nil {
return url.URL{}, err
}
if u.Opaque != "" {
oldUrl = u.Opaque
}
newUrl, err := url.Parse(strings.Replace(oldUrl, replacement, id, 1))
if err != nil {
return url.URL{}, fmt.Errorf("failed to replace value in url: %w", err)
Expand Down
Loading