Skip to content

Commit

Permalink
[Filebeat] httpjson split strings on delimiter (#24022)
Browse files Browse the repository at this point in the history
* [Filebeat] httpjson split strings on delimiter

- adds new split type "string"
- adds new split config option "delimiter"
  • Loading branch information
leehinman authored Feb 12, 2021
1 parent 2c5886a commit 5414972
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Move aws-s3 input to GA. {pull}23631[23631]
- Populate `source.mac` and `destination.mac` for Suricata EVE events. {issue}23706[23706] {pull}23721[23721]
- Added RFC6587 framing option for tcp and unix inputs {issue}23663[23663] {pull}23724[23724]
- Added string splitting for httpjson input {pull}24022[24022]

*Heartbeat*

Expand Down
59 changes: 57 additions & 2 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ filebeat.inputs:
[float]
==== `response.split`

Split operation to apply to the response once it is received. A split can convert a map or an array into multiple events.
Split operation to apply to the response once it is received. A split can convert a map, array, or string into multiple events.

[float]
==== `response.split[].target`
Expand All @@ -509,7 +509,7 @@ Defines the target field upon the split operation will be performed.
[float]
==== `response.split[].type`

Defines the field type of the target. Allowed values: `array`, `map`. Default: `array`.
Defines the field type of the target. Allowed values: `array`, `map`, `string`. `string` requires the use of the `delimiter` options to specify what characters to split the string on. `delimiter` always behaves as if `keep_parent` is set to `true`. Default: `array`.

[float]
==== `response.split[].transforms`
Expand All @@ -529,6 +529,11 @@ NOTE: in this context, `body.*` will be the result of all the previous transform

If set to true, the fields from the parent document (at the same level as `target`) will be kept. Otherwise a new document will be created using `target` as the root. Default: `false`.

[float]
==== `response.split[].delimiter`

Required if using split type of `string`. This is the sub string used to split the string. For example if `delimiter` was "\n" and the string was "line 1\nline 2", then the split would result in "line 1" and "line 2".

[float]
==== `response.split[].key_field`

Expand Down Expand Up @@ -804,6 +809,56 @@ This will output:
]
----

- We have a response with a keys whose value is a string. We want the string to be split on a delimiter and a document for each sub strings.

+
["source","json",subs="attributes"]
----
{
"this": "is kept",
"lines": "Line 1\nLine 2\nLine 3"
}
----

+
The config will look like:

+
["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: httpjson
config_version: 2
interval: 1m
request.url: https://example.com
response.split:
target: body.lines
type: string
delimiter: "\n"
----

+
This will output:

+
["source","json",subs="attributes"]
----
[
{
"this": "is kept",
"lines": "Line 1"
},
{
"this": "is kept",
"lines": "Line 2"
},
{
"this": "is kept",
"lines": "Line 3"
}
]
----

[[cursor]]
[float]
==== `cursor`
Expand Down
22 changes: 14 additions & 8 deletions x-pack/filebeat/input/httpjson/internal/v2/config_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import (
)

const (
splitTypeArr = "array"
splitTypeMap = "map"
splitTypeArr = "array"
splitTypeMap = "map"
splitTypeString = "string"
)

type responseConfig struct {
Expand All @@ -23,12 +24,13 @@ type responseConfig struct {
}

type splitConfig struct {
Target string `config:"target" validation:"required"`
Type string `config:"type"`
Transforms transformsConfig `config:"transforms"`
Split *splitConfig `config:"split"`
KeepParent bool `config:"keep_parent"`
KeyField string `config:"key_field"`
Target string `config:"target" validation:"required"`
Type string `config:"type"`
Transforms transformsConfig `config:"transforms"`
Split *splitConfig `config:"split"`
KeepParent bool `config:"keep_parent"`
KeyField string `config:"key_field"`
DelimiterString string `config:"delimiter"`
}

func (c *responseConfig) Validate() error {
Expand Down Expand Up @@ -58,6 +60,10 @@ func (c *splitConfig) Validate() error {
return fmt.Errorf("key_field can only be used with a %s split type", splitTypeMap)
}
case splitTypeMap:
case splitTypeString:
if c.DelimiterString == "" {
return fmt.Errorf("delimiter required for split type %s", splitTypeString)
}
default:
return fmt.Errorf("invalid split type: %s", c.Type)
}
Expand Down
56 changes: 52 additions & 4 deletions x-pack/filebeat/input/httpjson/internal/v2/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ package v2
import (
"errors"
"fmt"
"strings"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)

var (
errEmptyField = errors.New("the requested field is empty")
errEmptyRootField = errors.New("the requested root field is empty")
errExpectedSplitArr = errors.New("split was expecting field to be an array")
errExpectedSplitObj = errors.New("split was expecting field to be an object")
errEmptyField = errors.New("the requested field is empty")
errEmptyRootField = errors.New("the requested root field is empty")
errExpectedSplitArr = errors.New("split was expecting field to be an array")
errExpectedSplitObj = errors.New("split was expecting field to be an object")
errExpectedSplitString = errors.New("split was expecting field to be a string")
)

type split struct {
Expand All @@ -28,6 +30,7 @@ type split struct {
keepParent bool
keyField string
isRoot bool
delimiter string
}

func newSplitResponse(cfg *splitConfig, log *logp.Logger) (*split, error) {
Expand Down Expand Up @@ -73,6 +76,7 @@ func newSplit(c *splitConfig, log *logp.Logger) (*split, error) {
kind: c.Type,
keepParent: c.KeepParent,
keyField: c.KeyField,
delimiter: c.DelimiterString,
transforms: ts,
child: s,
}, nil
Expand Down Expand Up @@ -139,6 +143,26 @@ func (s *split) split(ctx *transformContext, root common.MapStr, ch chan<- maybe
}
}

return nil
case splitTypeString:
vstr, ok := v.(string)
if !ok {
return errExpectedSplitString
}

if len(vstr) == 0 {
if s.isRoot {
return errEmptyRootField
}
ch <- maybeMsg{msg: root}
return errEmptyField
}
for _, substr := range strings.Split(vstr, s.delimiter) {
if err := s.sendMessageSplitString(ctx, root, substr, ch); err != nil {
s.log.Debug(err)
}
}

return nil
}

Expand Down Expand Up @@ -195,3 +219,27 @@ func toMapStr(v interface{}) (common.MapStr, bool) {
}
return common.MapStr{}, false
}

func (s *split) sendMessageSplitString(ctx *transformContext, root common.MapStr, v string, ch chan<- maybeMsg) error {
clone := root.Clone()
_, _ = clone.Put(s.targetInfo.Name, v)

tr := transformable{}
tr.setBody(clone)

var err error
for _, t := range s.transforms {
tr, err = t.run(ctx, tr)
if err != nil {
return err
}
}

if s.child != nil {
return s.child.split(ctx, clone, ch)
}

ch <- maybeMsg{msg: clone}

return nil
}
20 changes: 20 additions & 0 deletions x-pack/filebeat/input/httpjson/internal/v2/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,26 @@ func TestSplit(t *testing.T) {
{"baz": "buzz", "splitHere": common.MapStr{"splitMore": common.MapStr{"deepest2": "data"}}},
},
},
{
name: "Split string",
config: &splitConfig{
Target: "body.items",
Type: "string",
DelimiterString: "\n",
},
ctx: emptyTransformContext(),
resp: transformable{
"body": common.MapStr{
"@timestamp": "1234567890",
"items": "Line 1\nLine 2\nLine 3",
},
},
expectedMessages: []common.MapStr{
{"@timestamp": "1234567890", "items": "Line 1"},
{"@timestamp": "1234567890", "items": "Line 2"},
{"@timestamp": "1234567890", "items": "Line 3"},
},
},
}

for _, tc := range cases {
Expand Down

0 comments on commit 5414972

Please sign in to comment.