From 97e3e19eaa17a399e1afcd5e50467a6e99bc9105 Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Thu, 25 Oct 2018 13:06:30 -0400 Subject: [PATCH 1/2] Dissect tag on parsing error Before when a parsing error occurred the events was returned untouched and an error was logged, if you don't look at your logs you have no the idea that the tokenizer was not able to match your string. Instead, when a parsing error occurs in the Dissect processor, we will now add a tag named 'dissect_parsing_error' to the 'log.flags' field. With that information, you are now able to reprocess your data or do filtering on the UI. Fixes: #8123 --- CHANGELOG.asciidoc | 1 + libbeat/beat/event.go | 3 ++ libbeat/processors/dissect/processor.go | 10 ++++ libbeat/processors/dissect/processor_test.go | 56 ++++++++++++++++++++ 4 files changed, 70 insertions(+) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 697df6cff62..e4f8ba36f50 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -128,6 +128,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff] - Add Beats Central Management {pull}8559[8559] - Allow Bus to buffer events in case listeners are not configured. {pull}8527[8527] - Enable `host` and `cloud` metadata processors by default. {pull}8596[8596] +- Dissect will now tag event on parsing error. {pull}8751[8751] *Auditbeat* diff --git a/libbeat/beat/event.go b/libbeat/beat/event.go index 8b25349eb1e..1191dff23d8 100644 --- a/libbeat/beat/event.go +++ b/libbeat/beat/event.go @@ -24,6 +24,9 @@ import ( "github.com/elastic/beats/libbeat/common" ) +// FlagField fields used to keep information or errors when events are parsed. +const FlagField = "log.flags" + // Event is the common event format shared by all beats. // Every event must have a timestamp and provide encodable Fields in `Fields`. // The `Meta`-fields can be used to pass additional meta-data to the outputs. diff --git a/libbeat/processors/dissect/processor.go b/libbeat/processors/dissect/processor.go index 613ea38dc10..9388337ee72 100644 --- a/libbeat/processors/dissect/processor.go +++ b/libbeat/processors/dissect/processor.go @@ -27,6 +27,8 @@ import ( "github.com/elastic/beats/libbeat/processors" ) +const tagParsingError = "dissect_parsing_error" + type processor struct { config config } @@ -60,6 +62,14 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) { m, err := p.config.Tokenizer.Dissect(s) if err != nil { + if err := common.AddTagsWithKey( + event.Fields, + beat.FlagField, + []string{tagParsingError}, + ); err != nil { + return event, errors.Wrap(err, "cannot add new tag the event") + } + return event, err } diff --git a/libbeat/processors/dissect/processor_test.go b/libbeat/processors/dissect/processor_test.go index 132dae07872..91cf58e6d28 100644 --- a/libbeat/processors/dissect/processor_test.go +++ b/libbeat/processors/dissect/processor_test.go @@ -176,3 +176,59 @@ func TestFieldAlreadyExist(t *testing.T) { }) } } + +func TestErrorTagging(t *testing.T) { + t.Run("when the parsing fails add a tag", func(t *testing.T) { + c, err := common.NewConfigFrom(map[string]interface{}{ + "tokenizer": "%{ok} - %{notvalid}", + }) + + if !assert.NoError(t, err) { + return + } + + processor, err := newProcessor(c) + if !assert.NoError(t, err) { + return + } + + e := beat.Event{Fields: common.MapStr{"message": "hello world"}} + event, err := processor.Run(&e) + + if !assert.Error(t, err) { + return + } + + tags, err := event.GetValue(beat.FlagField) + if !assert.NoError(t, err) { + return + } + + assert.Contains(t, tags, tagParsingError) + }) + + t.Run("when the parsing is succesful do not add a tag", func(t *testing.T) { + c, err := common.NewConfigFrom(map[string]interface{}{ + "tokenizer": "%{ok} %{valid}", + }) + + if !assert.NoError(t, err) { + return + } + + processor, err := newProcessor(c) + if !assert.NoError(t, err) { + return + } + + e := beat.Event{Fields: common.MapStr{"message": "hello world"}} + event, err := processor.Run(&e) + + if !assert.NoError(t, err) { + return + } + + _, err = event.GetValue(beat.FlagField) + assert.Error(t, err) + }) +} From 5e4fc09b999098e331e35cb645ddf77df00e122c Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Fri, 26 Oct 2018 08:07:47 -0400 Subject: [PATCH 2/2] flag instead of tag --- CHANGELOG.asciidoc | 2 +- libbeat/processors/dissect/processor.go | 6 +++--- libbeat/processors/dissect/processor_test.go | 10 +++++----- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index e4f8ba36f50..8d887c63d70 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -128,7 +128,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff] - Add Beats Central Management {pull}8559[8559] - Allow Bus to buffer events in case listeners are not configured. {pull}8527[8527] - Enable `host` and `cloud` metadata processors by default. {pull}8596[8596] -- Dissect will now tag event on parsing error. {pull}8751[8751] +- Dissect will now flag event on parsing error. {pull}8751[8751] *Auditbeat* diff --git a/libbeat/processors/dissect/processor.go b/libbeat/processors/dissect/processor.go index 9388337ee72..eb6f29240b2 100644 --- a/libbeat/processors/dissect/processor.go +++ b/libbeat/processors/dissect/processor.go @@ -27,7 +27,7 @@ import ( "github.com/elastic/beats/libbeat/processors" ) -const tagParsingError = "dissect_parsing_error" +const flagParsingError = "dissect_parsing_error" type processor struct { config config @@ -65,9 +65,9 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) { if err := common.AddTagsWithKey( event.Fields, beat.FlagField, - []string{tagParsingError}, + []string{flagParsingError}, ); err != nil { - return event, errors.Wrap(err, "cannot add new tag the event") + return event, errors.Wrap(err, "cannot add new flag the event") } return event, err diff --git a/libbeat/processors/dissect/processor_test.go b/libbeat/processors/dissect/processor_test.go index 91cf58e6d28..7f7fbbb1b44 100644 --- a/libbeat/processors/dissect/processor_test.go +++ b/libbeat/processors/dissect/processor_test.go @@ -177,8 +177,8 @@ func TestFieldAlreadyExist(t *testing.T) { } } -func TestErrorTagging(t *testing.T) { - t.Run("when the parsing fails add a tag", func(t *testing.T) { +func TestErrorFlagging(t *testing.T) { + t.Run("when the parsing fails add a flag", func(t *testing.T) { c, err := common.NewConfigFrom(map[string]interface{}{ "tokenizer": "%{ok} - %{notvalid}", }) @@ -199,15 +199,15 @@ func TestErrorTagging(t *testing.T) { return } - tags, err := event.GetValue(beat.FlagField) + flags, err := event.GetValue(beat.FlagField) if !assert.NoError(t, err) { return } - assert.Contains(t, tags, tagParsingError) + assert.Contains(t, flags, flagParsingError) }) - t.Run("when the parsing is succesful do not add a tag", func(t *testing.T) { + t.Run("when the parsing is succesful do not add a flag", func(t *testing.T) { c, err := common.NewConfigFrom(map[string]interface{}{ "tokenizer": "%{ok} %{valid}", })