Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update filebeat httpjson input to support pagination via Header and Okta module #16354

Merged
merged 21 commits into from
Mar 11, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions x-pack/filebeat/input/httpjson/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,44 @@ import (

// Config contains information about httpjson configuration
type config struct {
alakahakai marked this conversation as resolved.
Show resolved Hide resolved
APIKey string `config:"api_key"`
HTTPClientTimeout time.Duration `config:"http_client_timeout"`
HTTPHeaders common.MapStr `config:"http_headers"`
HTTPMethod string `config:"http_method" validate:"required"`
HTTPRequestBody common.MapStr `config:"http_request_body"`
Interval time.Duration `config:"interval"`
JSONObjects string `config:"json_objects_array"`
Pagination *Pagination `config:"pagination"`
TLS *tlscommon.Config `config:"ssl"`
URL string `config:"url" validate:"required"`
APIKey string `config:"api_key"`
AuthenticationScheme string `config:"authentication_scheme"`
HTTPClientTimeout time.Duration `config:"http_client_timeout"`
HTTPHeaders common.MapStr `config:"http_headers"`
HTTPMethod string `config:"http_method" validate:"required"`
HTTPRequestBody common.MapStr `config:"http_request_body"`
Interval time.Duration `config:"interval"`
JSONObjects string `config:"json_objects_array"`
NoHTTPBody bool `config:"no_http_body"`
Pagination *Pagination `config:"pagination"`
RateLimit *RateLimit `config:"rate_limit"`
TLS *tlscommon.Config `config:"ssl"`
URL string `config:"url" validate:"required"`
}

// Pagination contains information about httpjson pagination settings
type Pagination struct {
IsEnabled bool `config:"enabled"`
alakahakai marked this conversation as resolved.
Show resolved Hide resolved
ExtraBodyContent common.MapStr `config:"extra_body_content"`
Header *Header `config:"header"`
IDField string `config:"id_field"`
RequestField string `config:"req_field"`
URL string `config:"url"`
}

// HTTP Header information for pagination
type Header struct {
FieldName string `config:"field_name"`
RegexPattern string `config:"regex_pattern"`
alakahakai marked this conversation as resolved.
Show resolved Hide resolved
}

// HTTP Header Rate Limit information
type RateLimit struct {
Limit string `config:"limit"`
Reset string `config:"reset"`
Remaining string `config:"remaining"`
}

func (c *config) Validate() error {
switch strings.ToUpper(c.HTTPMethod) {
case "GET":
Expand Down
162 changes: 134 additions & 28 deletions x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"regexp"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -128,6 +131,9 @@ func (in *httpjsonInput) Run() {
func (in *httpjsonInput) createHTTPRequest(ctx context.Context, ri *requestInfo) (*http.Request, error) {
b, _ := json.Marshal(ri.ContentMap)
alakahakai marked this conversation as resolved.
Show resolved Hide resolved
body := bytes.NewReader(b)
if in.config.NoHTTPBody {
body = bytes.NewReader([]byte{})
alakahakai marked this conversation as resolved.
Show resolved Hide resolved
}
req, err := http.NewRequest(in.config.HTTPMethod, ri.URL, body)
if err != nil {
return nil, err
Expand All @@ -137,7 +143,11 @@ func (in *httpjsonInput) createHTTPRequest(ctx context.Context, ri *requestInfo)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", userAgent)
if in.config.APIKey != "" {
req.Header.Set("Authorization", in.config.APIKey)
if in.config.AuthenticationScheme != "" {
req.Header.Set("Authorization", fmt.Sprintf("%s %s", in.config.AuthenticationScheme, in.config.APIKey))
alakahakai marked this conversation as resolved.
Show resolved Hide resolved
} else {
req.Header.Set("Authorization", in.config.APIKey)
}
}
for k, v := range ri.Headers {
switch vv := v.(type) {
Expand All @@ -149,6 +159,78 @@ func (in *httpjsonInput) createHTTPRequest(ctx context.Context, ri *requestInfo)
return req, nil
}

// processEvent processes an array of events
alakahakai marked this conversation as resolved.
Show resolved Hide resolved
func (in *httpjsonInput) processEventArray(events []interface{}) (map[string]interface{}, error) {
var m map[string]interface{}
for _, t := range events {
switch v := t.(type) {
case map[string]interface{}:
m = v
d, err := json.Marshal(v)
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal %+v", v)
}
ok := in.outlet.OnEvent(makeEvent(string(d)))
if !ok {
return nil, errors.New("function OnEvent returned false")
}
default:
return nil, errors.Errorf("invalid JSON object")
alakahakai marked this conversation as resolved.
Show resolved Hide resolved
}
}
return m, nil
}

// getNextLinkFromHeader retrieves the next URL for pagination from the HTTP Header of the response
func getNextLinkFromHeader(header http.Header, fieldName string, regexPattern string) (string, error) {
re, err := regexp.Compile(regexPattern)
if err != nil {
return "", err
}
links, ok := header[fieldName]
if !ok {
return "", errors.Errorf("field %s does not exist in the HTTP Header", fieldName)
}
for _, link := range links {
matchArray := re.FindAllStringSubmatch(link, -1)
if len(matchArray) == 1 {
return matchArray[0][1], nil
}
}
return "", nil
}

// applyRateLimit applies appropriate rate limit if specified in the HTTP Header of the response
func (in *httpjsonInput) applyRateLimit(header http.Header, rateLimit *RateLimit) error {
if rateLimit != nil {
if rateLimit.Remaining != "" {
remaining := header.Get(rateLimit.Remaining)
if remaining == "" {
return errors.Errorf("field %s does not exist in the HTTP Header, or is empty", rateLimit.Remaining)
}
m, err := strconv.ParseInt(remaining, 10, 64)
if err != nil {
return errors.Wrapf(err, "failed to parse rate-limit remaining value")
}
in.log.Debugf("Rate Limit: The number of allowed remaining requests is %d.", m)
if m == 0 {
reset := header.Get(rateLimit.Reset)
if reset == "" {
return errors.Errorf("field %s does not exist in the HTTP Header, or is empty", rateLimit.Reset)
}
epoch, err := strconv.ParseInt(reset, 10, 64)
if err != nil {
return errors.Wrapf(err, "failed to parse rate-limit reset value")
}
t := time.Unix(epoch, 0)
in.log.Debugw("Rate Limit: Wait until %v for the rate limit to reset.", t)
alakahakai marked this conversation as resolved.
Show resolved Hide resolved
time.Sleep(time.Until(t))
alakahakai marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
return nil
}

// processHTTPRequest processes HTTP request, and handles pagination if enabled
func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Client, ri *requestInfo) error {
for {
Expand All @@ -161,6 +243,7 @@ func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl
return errors.Wrapf(err, "failed to execute http client.Do")
}
responseData, err := ioutil.ReadAll(msg.Body)
header := msg.Header
msg.Body.Close()
if err != nil {
return errors.Wrapf(err, "failed to read http.response.body")
Expand All @@ -170,47 +253,68 @@ func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl
return errors.Errorf("http request was unsuccessful with a status code %d", msg.StatusCode)
}
var m, v interface{}
var mm map[string]interface{}
err = json.Unmarshal(responseData, &m)
if err != nil {
in.log.Debugw("failed to unmarshal http.response.body", string(responseData))
return errors.Wrapf(err, "failed to unmarshal http.response.body")
}
switch mmap := m.(type) {
switch obj := m.(type) {
// Top level Array
case []interface{}:
mm, err = in.processEventArray(obj)
if err != nil {
return err
}
case map[string]interface{}:
if in.config.JSONObjects == "" {
ok := in.outlet.OnEvent(makeEvent(string(responseData)))
if !ok {
return errors.New("function OnEvent returned false")
mm, err = in.processEventArray([]interface{}{obj})
if err != nil {
return err
}
} else {
v, err = common.MapStr(mmap).GetValue(in.config.JSONObjects)
v, err = common.MapStr(mm).GetValue(in.config.JSONObjects)
if err != nil {
return err
}
switch ts := v.(type) {
case []interface{}:
for _, t := range ts {
switch tv := t.(type) {
case map[string]interface{}:
d, err := json.Marshal(tv)
if err != nil {
return errors.Wrapf(err, "failed to marshal json_objects_array")
}
ok := in.outlet.OnEvent(makeEvent(string(d)))
if !ok {
return errors.New("function OnEvent returned false")
}
default:
return errors.New("invalid json_objects_array configuration")
}
mm, err = in.processEventArray(ts)
if err != nil {
return err
}
default:
return errors.New("invalid json_objects_array configuration")
return errors.Errorf("content of %s is not a valid array", in.config.JSONObjects)
}
}
if in.config.Pagination != nil && in.config.Pagination.IsEnabled {
v, err = common.MapStr(mmap).GetValue(in.config.Pagination.IDField)
default:
in.log.Debugw("http.response.body is not valid JSON", string(responseData))
alakahakai marked this conversation as resolved.
Show resolved Hide resolved
alakahakai marked this conversation as resolved.
Show resolved Hide resolved
return errors.New("http.response.body is not valid JSON")
}

if mm != nil && in.config.Pagination != nil && in.config.Pagination.IsEnabled {
alakahakai marked this conversation as resolved.
Show resolved Hide resolved
if in.config.Pagination.Header != nil {
// Pagination control using HTTP Header
url, err := getNextLinkFromHeader(header, in.config.Pagination.Header.FieldName, in.config.Pagination.Header.RegexPattern)
if err != nil {
in.log.Info("Successfully processed HTTP request. Pagination finished.")
return errors.Wrapf(err, "failed to retrieve the next URL for pagination")
}
if ri.URL == url || url == "" {
in.log.Info("Pagination finished.")
return nil
}
ri.URL = url
err = in.applyRateLimit(header, in.config.RateLimit)
if err != nil {
return err
}
in.log.Info("Continuing with pagination to URL: ", ri.URL)
continue
} else {
// Pagination control using HTTP Body fields
v, err = common.MapStr(mm).GetValue(in.config.Pagination.IDField)
if err != nil {
alakahakai marked this conversation as resolved.
Show resolved Hide resolved
in.log.Info("Pagination finished.")
return nil
}
if in.config.Pagination.RequestField != "" {
Expand All @@ -229,13 +333,15 @@ func (in *httpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl
if in.config.Pagination.ExtraBodyContent != nil {
alakahakai marked this conversation as resolved.
Show resolved Hide resolved
ri.ContentMap.Update(common.MapStr(in.config.Pagination.ExtraBodyContent))
}
err = in.applyRateLimit(header, in.config.RateLimit)
if err != nil {
alakahakai marked this conversation as resolved.
Show resolved Hide resolved
return err
}
in.log.Info("Continuing with pagination to URL: ", ri.URL)
continue
}
return nil
default:
in.log.Debugw("http.response.body is not valid JSON", string(responseData))
return errors.New("http.response.body is not valid JSON")
}
return nil
}
}

Expand Down