Skip to content

Commit

Permalink
[Filebeat][okta] Fix okta pagination (#21797)
Browse files Browse the repository at this point in the history
* Fix okta pagination

* Use cursor storage
  • Loading branch information
marc-gr authored Oct 16, 2020
1 parent 325ee32 commit 80d4209
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 6 deletions.
4 changes: 3 additions & 1 deletion x-pack/filebeat/input/httpjson/date_cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ func newDateCursorFromConfig(config config, log *logp.Logger) *dateCursor {
c.urlField = config.DateCursor.URLField
c.initialInterval = config.DateCursor.InitialInterval
c.dateFormat = config.DateCursor.getDateFormat()
c.valueTpl = config.DateCursor.ValueTemplate.Template
if config.DateCursor.ValueTemplate != nil {
c.valueTpl = config.DateCursor.ValueTemplate.Template
}

return c
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/pagination.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (p *pagination) nextRequestInfo(ri *requestInfo, response response, lastObj

// getNextLinkFromHeader retrieves the next URL for pagination from the HTTP Header of the response
func getNextLinkFromHeader(header http.Header, fieldName string, re *regexp.Regexp) (string, error) {
links, ok := header[fieldName]
links, ok := header[http.CanonicalHeaderKey(fieldName)]
if !ok {
return "", fmt.Errorf("field %s does not exist in the HTTP Header", fieldName)
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/pagination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

func TestGetNextLinkFromHeader(t *testing.T) {
header := make(http.Header)
header.Add("Link", "<https://dev-168980.okta.com/api/v1/logs>; rel=\"self\"")
header.Add("link", "<https://dev-168980.okta.com/api/v1/logs>; rel=\"self\"")
header.Add("Link", "<https://dev-168980.okta.com/api/v1/logs?after=1581658181086_1>; rel=\"next\"")
re, _ := regexp.Compile("<([^>]+)>; *rel=\"next\"(?:,|$)")
url, err := getNextLinkFromHeader(header, "Link", re)
Expand Down
7 changes: 4 additions & 3 deletions x-pack/filebeat/input/httpjson/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (r *requester) processHTTPRequest(ctx context.Context, publisher cursor.Pub
return err
}

response.header = resp.Header
responseData, err := ioutil.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read http response: %w", err)
Expand Down Expand Up @@ -165,10 +166,10 @@ func (r *requester) processHTTPRequest(ctx context.Context, publisher cursor.Pub
if err != nil {
return err
}
}

if lastObj != nil && r.dateCursor.enabled {
r.updateCursorState(ri.url, r.dateCursor.getNextValue(common.MapStr(lastObj)))
if lastObj != nil && r.dateCursor.enabled {
r.updateCursorState(ri.url, r.dateCursor.getNextValue(common.MapStr(lastObj)))
}
}

return nil
Expand Down
4 changes: 4 additions & 0 deletions x-pack/filebeat/module/okta/system/config/input.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ ssl: {{ .ssl | tojson }}
url: {{ .url }}
{{ end }}

date_cursor.field: published
date_cursor.url_field: since
date_cursor.initial_interval: {{ .initial_interval }}

{{ else if eq .input "file" }}

type: log
Expand Down
3 changes: 3 additions & 0 deletions x-pack/filebeat/module/okta/system/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ var:
default: "SSWS"
- name: http_client_timeout
- name: http_method
default: GET
- name: http_headers
- name: http_request_body
- name: interval
Expand All @@ -31,6 +32,8 @@ var:
- name: tags
default: [forwarded]
- name: url
- name: initial_interval
default: 24h

input: config/input.yml
ingest_pipeline: ingest/pipeline.yml
Expand Down

0 comments on commit 80d4209

Please sign in to comment.