From 5414972f435d8c03813ff79049c0ac0ffab77304 Mon Sep 17 00:00:00 2001 From: Lee Hinman <57081003+leehinman@users.noreply.github.com> Date: Fri, 12 Feb 2021 13:27:03 -0600 Subject: [PATCH] [Filebeat] httpjson split strings on delimiter (#24022) * [Filebeat] httpjson split strings on delimiter - adds new split type "string" - adds new split config option "delimiter" --- CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-httpjson.asciidoc | 59 ++++++++++++++++++- .../httpjson/internal/v2/config_response.go | 22 ++++--- .../input/httpjson/internal/v2/split.go | 56 ++++++++++++++++-- .../input/httpjson/internal/v2/split_test.go | 20 +++++++ 5 files changed, 144 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index cdfce9ba5aa..9305d523f7f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -824,6 +824,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Move aws-s3 input to GA. {pull}23631[23631] - Populate `source.mac` and `destination.mac` for Suricata EVE events. {issue}23706[23706] {pull}23721[23721] - Added RFC6587 framing option for tcp and unix inputs {issue}23663[23663] {pull}23724[23724] +- Added string splitting for httpjson input {pull}24022[24022] *Heartbeat* diff --git a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc index 64247b517f8..56b022d3e9a 100644 --- a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc @@ -499,7 +499,7 @@ filebeat.inputs: [float] ==== `response.split` -Split operation to apply to the response once it is received. A split can convert a map or an array into multiple events. +Split operation to apply to the response once it is received. A split can convert a map, array, or string into multiple events. [float] ==== `response.split[].target` @@ -509,7 +509,7 @@ Defines the target field upon the split operation will be performed. [float] ==== `response.split[].type` -Defines the field type of the target. Allowed values: `array`, `map`. Default: `array`. +Defines the field type of the target. Allowed values: `array`, `map`, `string`. `string` requires the use of the `delimiter` options to specify what characters to split the string on. `delimiter` always behaves as if `keep_parent` is set to `true`. Default: `array`. [float] ==== `response.split[].transforms` @@ -529,6 +529,11 @@ NOTE: in this context, `body.*` will be the result of all the previous transform If set to true, the fields from the parent document (at the same level as `target`) will be kept. Otherwise a new document will be created using `target` as the root. Default: `false`. +[float] +==== `response.split[].delimiter` + +Required if using split type of `string`. This is the sub string used to split the string. For example if `delimiter` was "\n" and the string was "line 1\nline 2", then the split would result in "line 1" and "line 2". + [float] ==== `response.split[].key_field` @@ -804,6 +809,56 @@ This will output: ] ---- +- We have a response with a keys whose value is a string. We want the string to be split on a delimiter and a document for each sub strings. + ++ +["source","json",subs="attributes"] +---- +{ + "this": "is kept", + "lines": "Line 1\nLine 2\nLine 3" +} +---- + ++ +The config will look like: + ++ +["source","yaml",subs="attributes"] +---- +filebeat.inputs: +- type: httpjson + config_version: 2 + interval: 1m + request.url: https://example.com + response.split: + target: body.lines + type: string + delimiter: "\n" +---- + ++ +This will output: + ++ +["source","json",subs="attributes"] +---- +[ + { + "this": "is kept", + "lines": "Line 1" + }, + { + "this": "is kept", + "lines": "Line 2" + }, + { + "this": "is kept", + "lines": "Line 3" + } +] +---- + [[cursor]] [float] ==== `cursor` diff --git a/x-pack/filebeat/input/httpjson/internal/v2/config_response.go b/x-pack/filebeat/input/httpjson/internal/v2/config_response.go index be265409ba8..0bb51910387 100644 --- a/x-pack/filebeat/input/httpjson/internal/v2/config_response.go +++ b/x-pack/filebeat/input/httpjson/internal/v2/config_response.go @@ -10,8 +10,9 @@ import ( ) const ( - splitTypeArr = "array" - splitTypeMap = "map" + splitTypeArr = "array" + splitTypeMap = "map" + splitTypeString = "string" ) type responseConfig struct { @@ -23,12 +24,13 @@ type responseConfig struct { } type splitConfig struct { - Target string `config:"target" validation:"required"` - Type string `config:"type"` - Transforms transformsConfig `config:"transforms"` - Split *splitConfig `config:"split"` - KeepParent bool `config:"keep_parent"` - KeyField string `config:"key_field"` + Target string `config:"target" validation:"required"` + Type string `config:"type"` + Transforms transformsConfig `config:"transforms"` + Split *splitConfig `config:"split"` + KeepParent bool `config:"keep_parent"` + KeyField string `config:"key_field"` + DelimiterString string `config:"delimiter"` } func (c *responseConfig) Validate() error { @@ -58,6 +60,10 @@ func (c *splitConfig) Validate() error { return fmt.Errorf("key_field can only be used with a %s split type", splitTypeMap) } case splitTypeMap: + case splitTypeString: + if c.DelimiterString == "" { + return fmt.Errorf("delimiter required for split type %s", splitTypeString) + } default: return fmt.Errorf("invalid split type: %s", c.Type) } diff --git a/x-pack/filebeat/input/httpjson/internal/v2/split.go b/x-pack/filebeat/input/httpjson/internal/v2/split.go index 823b57c39fb..9cb686e63ad 100644 --- a/x-pack/filebeat/input/httpjson/internal/v2/split.go +++ b/x-pack/filebeat/input/httpjson/internal/v2/split.go @@ -7,16 +7,18 @@ package v2 import ( "errors" "fmt" + "strings" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" ) var ( - errEmptyField = errors.New("the requested field is empty") - errEmptyRootField = errors.New("the requested root field is empty") - errExpectedSplitArr = errors.New("split was expecting field to be an array") - errExpectedSplitObj = errors.New("split was expecting field to be an object") + errEmptyField = errors.New("the requested field is empty") + errEmptyRootField = errors.New("the requested root field is empty") + errExpectedSplitArr = errors.New("split was expecting field to be an array") + errExpectedSplitObj = errors.New("split was expecting field to be an object") + errExpectedSplitString = errors.New("split was expecting field to be a string") ) type split struct { @@ -28,6 +30,7 @@ type split struct { keepParent bool keyField string isRoot bool + delimiter string } func newSplitResponse(cfg *splitConfig, log *logp.Logger) (*split, error) { @@ -73,6 +76,7 @@ func newSplit(c *splitConfig, log *logp.Logger) (*split, error) { kind: c.Type, keepParent: c.KeepParent, keyField: c.KeyField, + delimiter: c.DelimiterString, transforms: ts, child: s, }, nil @@ -139,6 +143,26 @@ func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybe } } + return nil + case splitTypeString: + vstr, ok := v.(string) + if !ok { + return errExpectedSplitString + } + + if len(vstr) == 0 { + if s.isRoot { + return errEmptyRootField + } + ch <- maybeMsg{msg: root} + return errEmptyField + } + for _, substr := range strings.Split(vstr, s.delimiter) { + if err := s.sendMessageSplitString(ctx, root, substr, ch); err != nil { + s.log.Debug(err) + } + } + return nil } @@ -195,3 +219,27 @@ func toMapStr(v interface{}) (common.MapStr, bool) { } return common.MapStr{}, false } + +func (s *split) sendMessageSplitString(ctx *transformContext, root common.MapStr, v string, ch chan<- maybeMsg) error { + clone := root.Clone() + _, _ = clone.Put(s.targetInfo.Name, v) + + tr := transformable{} + tr.setBody(clone) + + var err error + for _, t := range s.transforms { + tr, err = t.run(ctx, tr) + if err != nil { + return err + } + } + + if s.child != nil { + return s.child.split(ctx, clone, ch) + } + + ch <- maybeMsg{msg: clone} + + return nil +} diff --git a/x-pack/filebeat/input/httpjson/internal/v2/split_test.go b/x-pack/filebeat/input/httpjson/internal/v2/split_test.go index c6ad4a6170c..2c53d0fcbe1 100644 --- a/x-pack/filebeat/input/httpjson/internal/v2/split_test.go +++ b/x-pack/filebeat/input/httpjson/internal/v2/split_test.go @@ -334,6 +334,26 @@ func TestSplit(t *testing.T) { {"baz": "buzz", "splitHere": common.MapStr{"splitMore": common.MapStr{"deepest2": "data"}}}, }, }, + { + name: "Split string", + config: &splitConfig{ + Target: "body.items", + Type: "string", + DelimiterString: "\n", + }, + ctx: emptyTransformContext(), + resp: transformable{ + "body": common.MapStr{ + "@timestamp": "1234567890", + "items": "Line 1\nLine 2\nLine 3", + }, + }, + expectedMessages: []common.MapStr{ + {"@timestamp": "1234567890", "items": "Line 1"}, + {"@timestamp": "1234567890", "items": "Line 2"}, + {"@timestamp": "1234567890", "items": "Line 3"}, + }, + }, } for _, tc := range cases {