Skip to content

Commit

Permalink
[Filebeat][httpjson] Add date_cursor to httpjson input (elastic#19483)
Browse files Browse the repository at this point in the history
* Add date_cursor to httpjson input

* Add changelog entry

* Fix tests

* Default to UTC date

* Add date_cursor validations and better error message

* Run fmt update
  • Loading branch information
marc-gr authored and melchiormoulin committed Oct 14, 2020
1 parent 01fe5a3 commit 5199aed
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Okta module now requires objects instead of JSON strings for the `http_headers`, `http_request_body`, `pagination`, `rate_limit`, and `ssl` variables. {pull}18953[18953]
- Adds oauth support for httpjson input. {issue}18415[18415] {pull}18892[18892]
- Adds `split_events_by` option to httpjson input. {pull}19246[19246]
- Adds `date_cursor` option to httpjson input. {pull}19483[19483]

*Heartbeat*

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ require (
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5
golang.org/x/text v0.3.2
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/tools v0.0.0-20200630154851-b2d8b0336632
golang.org/x/tools v0.0.0-20200701041122-1837592efa10
google.golang.org/api v0.15.0
google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb
google.golang.org/grpc v1.29.1
Expand Down
53 changes: 53 additions & 0 deletions x-pack/filebeat/input/httpjson/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package httpjson
import (
"regexp"
"strings"
"text/template"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -35,6 +36,7 @@ type config struct {
RetryWaitMax time.Duration `config:"retry.wait_max"`
TLS *tlscommon.Config `config:"ssl"`
URL string `config:"url" validate:"required"`
DateCursor *DateCursor `config:"date_cursor"`
}

// Pagination contains information about httpjson pagination settings
Expand Down Expand Up @@ -65,6 +67,54 @@ type RateLimit struct {
Remaining string `config:"remaining"`
}

type DateCursor struct {
Enabled *bool `config:"enabled"`
Field string `config:"field" validate:"required"`
URLField string `config:"url_field" validate:"required"`
ValueTemplate *Template `config:"value_template"`
DateFormat string `config:"date_format"`
InitialInterval time.Duration `config:"initial_interval"`
}

type Template struct {
*template.Template
}

func (t *Template) Unpack(in string) error {
tpl, err := template.New("tpl").Parse(in)
if err != nil {
return err
}

*t = Template{Template: tpl}

return nil
}

// IsEnabled returns true if the `enable` field is set to true in the yaml.
func (dc *DateCursor) IsEnabled() bool {
return dc != nil && (dc.Enabled == nil || *dc.Enabled)
}

// IsEnabled returns true if the `enable` field is set to true in the yaml.
func (dc *DateCursor) GetDateFormat() string {
if dc.DateFormat == "" {
return time.RFC3339
}
return dc.DateFormat
}

func (dc *DateCursor) Validate() error {
if dc.DateFormat == "" {
return nil
}
now := time.Now().Format(dc.DateFormat)
if _, err := time.Parse(dc.DateFormat, now); err != nil {
return errors.New("invalid configuration: date_format is not a valid date layout")
}
return nil
}

func (c *config) Validate() error {
switch strings.ToUpper(c.HTTPMethod) {
case "GET", "POST":
Expand All @@ -81,6 +131,9 @@ func (c *config) Validate() error {
}
}
if c.Pagination != nil {
if c.DateCursor.IsEnabled() {
return errors.Errorf("invalid configuration: date_cursor cannnot be set in combination with other pagination mechanisms")
}
if c.Pagination.Header != nil {
if c.Pagination.RequestField != "" || c.Pagination.IDField != "" || len(c.Pagination.ExtraBodyContent) > 0 {
return errors.Errorf("invalid configuration: both pagination.header and pagination.req_field or pagination.id_field or pagination.extra_body_content cannot be set simultaneously")
Expand Down
27 changes: 27 additions & 0 deletions x-pack/filebeat/input/httpjson/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"os"
"testing"
"time"

"github.com/pkg/errors"
"golang.org/x/oauth2/google"
Expand Down Expand Up @@ -350,6 +351,32 @@ func TestConfigOauth2Validation(t *testing.T) {
"url": "localhost",
},
},
{
name: "date_cursor must fail in combination with pagination",
expectedErr: "invalid configuration: date_cursor cannnot be set in combination with other pagination mechanisms accessing config",
input: map[string]interface{}{
"date_cursor": map[string]interface{}{"field": "foo", "url_field": "foo"},
"pagination": map[string]interface{}{
"header": map[string]interface{}{"field_name": "foo", "regex_pattern": "bar"},
},
"url": "localhost",
},
},
{
name: "date_cursor.date_format will fail if invalid",
expectedErr: "invalid configuration: date_format is not a valid date layout accessing 'date_cursor'",
input: map[string]interface{}{
"date_cursor": map[string]interface{}{"field": "foo", "url_field": "foo", "date_format": "1234"},
"url": "localhost",
},
},
{
name: "date_cursor must work with a valid date_format",
input: map[string]interface{}{
"date_cursor": map[string]interface{}{"field": "foo", "url_field": "foo", "date_format": time.RFC3339},
"url": "localhost",
},
},
}

for _, c := range cases {
Expand Down
48 changes: 48 additions & 0 deletions x-pack/filebeat/input/httpjson/httpjson_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package httpjson
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"math/rand"
Expand Down Expand Up @@ -727,3 +728,50 @@ func TestArrayWithSplitResponse(t *testing.T) {
}
})
}

func TestCursor(t *testing.T) {
m := map[string]interface{}{
"http_method": "GET",
"date_cursor.field": "@timestamp",
"date_cursor.url_field": "$filter",
"date_cursor.value_template": "alertCreationTime ge {{.}}",
"date_cursor.initial_interval": "10m",
"date_cursor.date_format": "2006-01-02T15:04:05Z",
}

timeNow = func() time.Time {
t, _ := time.Parse("2006-01-02T15:04:05Z", "2002-10-02T15:10:00Z")
return t
}

const (
expectedQuery = "%24filter=alertCreationTime+ge+2002-10-02T15%3A00%3A00Z"
expectedNextCursorValue = "2002-10-02T15:00:01Z"
expectedNextQuery = "%24filter=alertCreationTime+ge+2002-10-02T15%3A00%3A01Z"
)
var gotQuery string
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
gotQuery = r.URL.Query().Encode()
w.Write([]byte(`[{"@timestamp":"2002-10-02T15:00:00Z"},{"@timestamp":"2002-10-02T15:00:01Z"}]`))
}))

runTest(t, ts, m, func(input *HttpjsonInput, out *stubOutleter, t *testing.T) {
group, _ := errgroup.WithContext(context.Background())
group.Go(input.run)

events, ok := out.waitForEvents(2)
if !ok {
t.Fatalf("Expected 2 events, but got %d.", len(events))
}
input.Stop()

if err := group.Wait(); err != nil {
t.Fatal(err)
}

assert.Equal(t, expectedQuery, gotQuery)
assert.Equal(t, expectedNextCursorValue, input.nextCursorValue)
assert.Equal(t, fmt.Sprintf("%s?%s", ts.URL, expectedNextQuery), input.getURL())
})
}
73 changes: 69 additions & 4 deletions x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"io/ioutil"
"net"
"net/http"
"net/url"
"regexp"
"strconv"
"sync"
Expand All @@ -37,6 +38,9 @@ const (

var userAgent = useragent.UserAgent("Filebeat")

// for testing
var timeNow = time.Now

func init() {
err := input.Register(inputName, NewInput)
if err != nil {
Expand All @@ -55,6 +59,8 @@ type HttpjsonInput struct {
workerCancel context.CancelFunc // Used to signal that the worker should stop.
workerOnce sync.Once // Guarantees that the worker goroutine is only started once.
workerWg sync.WaitGroup // Waits on worker goroutine.

nextCursorValue string
}

// RequestInfo struct has the information for generating an HTTP request
Expand Down Expand Up @@ -343,6 +349,7 @@ func createRequestInfoFromBody(m common.MapStr, idField string, requestField str

// processHTTPRequest processes HTTP request, and handles pagination if enabled
func (in *HttpjsonInput) processHTTPRequest(ctx context.Context, client *http.Client, ri *RequestInfo) error {
ri.URL = in.getURL()
for {
req, err := in.createHTTPRequest(ctx, ri)
if err != nil {
Expand Down Expand Up @@ -407,8 +414,7 @@ func (in *HttpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl
in.log.Debug("http.response.body is not a valid JSON object", string(responseData))
return errors.Errorf("http.response.body is not a valid JSON object, but a %T", obj)
}

if mm != nil && in.config.Pagination != nil && in.config.Pagination.IsEnabled() {
if mm != nil && in.config.Pagination.IsEnabled() {
if in.config.Pagination.Header != nil {
// Pagination control using HTTP Header
url, err := getNextLinkFromHeader(header, in.config.Pagination.Header.FieldName, in.config.Pagination.Header.RegexPattern)
Expand All @@ -427,7 +433,7 @@ func (in *HttpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl
continue
} else {
// Pagination control using HTTP Body fields
ri, err := createRequestInfoFromBody(common.MapStr(mm), in.config.Pagination.IDField, in.config.Pagination.RequestField, common.MapStr(in.config.Pagination.ExtraBodyContent), in.config.Pagination.URL, ri)
ri, err = createRequestInfoFromBody(common.MapStr(mm), in.config.Pagination.IDField, in.config.Pagination.RequestField, common.MapStr(in.config.Pagination.ExtraBodyContent), in.config.Pagination.URL, ri)
if err != nil {
return err
}
Expand All @@ -441,10 +447,70 @@ func (in *HttpjsonInput) processHTTPRequest(ctx context.Context, client *http.Cl
continue
}
}
if mm != nil && in.config.DateCursor.IsEnabled() {
in.advanceCursor(common.MapStr(mm))
}
return nil
}
}

func (in *HttpjsonInput) getURL() string {
if !in.config.DateCursor.IsEnabled() {
return in.config.URL
}

var dateStr string
if in.nextCursorValue == "" {
t := timeNow().UTC().Add(-in.config.DateCursor.InitialInterval)
dateStr = t.Format(in.config.DateCursor.GetDateFormat())
} else {
dateStr = in.nextCursorValue
}

url, err := url.Parse(in.config.URL)
if err != nil {
return in.config.URL
}

q := url.Query()

var value string
if in.config.DateCursor.ValueTemplate == nil {
value = dateStr
} else {
buf := new(bytes.Buffer)
if err := in.config.DateCursor.ValueTemplate.Execute(buf, dateStr); err != nil {
return in.config.URL
}
value = buf.String()
}

q.Set(in.config.DateCursor.URLField, value)

url.RawQuery = q.Encode()

return url.String()
}

func (in *HttpjsonInput) advanceCursor(m common.MapStr) {
v, err := m.GetValue(in.config.DateCursor.Field)
if err != nil {
in.log.Warnf("date_cursor field: %q", err)
return
}
switch t := v.(type) {
case string:
_, err := time.Parse(in.config.DateCursor.GetDateFormat(), t)
if err != nil {
return
}
in.nextCursorValue = t
default:
in.log.Warn("date_cursor field must be a string, cursor will not advance")
return
}
}

func (in *HttpjsonInput) run() error {
ctx, cancel := context.WithCancel(in.workerCtx)
defer cancel()
Expand All @@ -455,7 +521,6 @@ func (in *HttpjsonInput) run() error {
}

ri := &RequestInfo{
URL: in.URL,
ContentMap: common.MapStr{},
Headers: in.HTTPHeaders,
}
Expand Down

0 comments on commit 5199aed

Please sign in to comment.