From f0606c69395bcf28bd36d2aebe9ed82bf58e2f17 Mon Sep 17 00:00:00 2001 From: Vinit Chauhan Date: Tue, 23 Jul 2024 10:20:37 -0400 Subject: [PATCH] filebeat/decode_cef - Add option to ignore empty values (#40268) Added option to ignore empty values in the decode_cef processor. In the decode_cef processor, when there are empty values in the extensions section, we get errors during log parsing. This change provides a flag in decode_cef config to override this default behavior and ignore the fields with empty value. Some example errors that this helps handle are: error in field 'cn1': strconv.ParseInt: parsing "": invalid syntax error in field 'destinationTranslatedAddress': value is not a valid IP address Closes #40236 (cherry picked from commit dd671a6bcefbcf3b99a1e3ac863293839fa7a3cb) --- CHANGELOG-developer.next.asciidoc | 1 + CHANGELOG.next.asciidoc | 1 + .../filebeat/processors/decode_cef/cef/cef.go | 7 +++++ .../processors/decode_cef/cef/cef_test.go | 19 +++++++++++++ .../processors/decode_cef/cef/option.go | 14 +++++++++- .../filebeat/processors/decode_cef/config.go | 15 ++++++----- .../processors/decode_cef/decode_cef.go | 12 +++++---- .../processors/decode_cef/decode_cef_test.go | 27 +++++++++++++++++++ .../decode_cef/docs/decode_cef.asciidoc | 19 ++++++------- 9 files changed, 93 insertions(+), 22 deletions(-) diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 92a42ea40fe..a2f11330782 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -196,6 +196,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Improve robustness and error reporting from packetbeat default route testing. {pull}39757[39757] - Move x-pack/filebeat/input/salesforce jwt import to v5. {pull}39823[39823] - Drop x-pack/filebeat/input dependency on github.com/lestrrat-go/jwx/v2. {pull}39968[39968] +- Added `ignore_empty_values` flag in `decode_cef` Filebeat processor. {pull}40268[40268] ==== Deprecated diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 1eb6aac1add..fb16468bd62 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -32,6 +32,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix high IO and handling of a corrupted registry log file. {pull}35893[35893] - Enable file ingestion to report detailed status to Elastic Agent {pull}40075[40075] - Fix filestream's registry GC: registry entries will never be removed if clean_inactive is set to "-1". {pull}40258[40258] +- Added `ignore_empty_values` flag in `decode_cef` Filebeat processor. {pull}40268[40268] *Heartbeat* diff --git a/x-pack/filebeat/processors/decode_cef/cef/cef.go b/x-pack/filebeat/processors/decode_cef/cef/cef.go index d9f65ecf658..296e0e56e9e 100644 --- a/x-pack/filebeat/processors/decode_cef/cef/cef.go +++ b/x-pack/filebeat/processors/decode_cef/cef/cef.go @@ -152,6 +152,13 @@ func (e *Event) Unpack(data string, opts ...Option) error { // Mark the data type and do the actual conversion. field.Type = mapping.Type + + if settings.removeEmptyValues && field.String == "" { + // Drop the key because the value is empty field. + delete(e.Extensions, key) + continue + } + field.Interface, err = toType(field.String, mapping.Type, &settings) if err != nil { // Drop the key because the field value is invalid. diff --git a/x-pack/filebeat/processors/decode_cef/cef/cef_test.go b/x-pack/filebeat/processors/decode_cef/cef/cef_test.go index cc4a4ed786c..ec593ddfa53 100644 --- a/x-pack/filebeat/processors/decode_cef/cef/cef_test.go +++ b/x-pack/filebeat/processors/decode_cef/cef/cef_test.go @@ -58,6 +58,8 @@ const ( truncatedHeader = "CEF:0|SentinelOne|Mgmt|activityID=1111111111111111111 activityType=3505 siteId=None siteName=None accountId=1222222222222222222 accountName=foo-bar mdr notificationScope=ACCOUNT" + noValueInExtension = `CEF:26|security|threat=manager|1.0|100|trojan successfully stopped|10|src= dst=12.121.122.82 spt=` + // Found by fuzzing but minimised by hand. fuzz0 = `CEF:0|a=\\ b|` fuzz1 = `CEF:0|\|a=|b=` @@ -84,6 +86,7 @@ var testMessages = []string{ tabMessage, escapedMessage, truncatedHeader, + noValueInExtension, fuzz0, fuzz1, fuzz2, @@ -161,6 +164,22 @@ func TestEventUnpack(t *testing.T) { }, e.Extensions) }) + t.Run("noValueInExtension", func(t *testing.T) { + var e Event + err := e.Unpack(noValueInExtension, WithRemoveEmptyValues()) + assert.NoError(t, err) + assert.Equal(t, 26, e.Version) + assert.Equal(t, "security", e.DeviceVendor) + assert.Equal(t, "threat=manager", e.DeviceProduct) + assert.Equal(t, "1.0", e.DeviceVersion) + assert.Equal(t, "100", e.DeviceEventClassID) + assert.Equal(t, "trojan successfully stopped", e.Name) + assert.Equal(t, "10", e.Severity) + assert.Equal(t, map[string]*Field{ + "dst": IPField("12.121.122.82"), + }, e.Extensions) + }) + t.Run("equalsSignInHeader", func(t *testing.T) { var e Event err := e.Unpack(equalsSignInHeader) diff --git a/x-pack/filebeat/processors/decode_cef/cef/option.go b/x-pack/filebeat/processors/decode_cef/cef/option.go index 2a2e26dcb88..8923947c4a6 100644 --- a/x-pack/filebeat/processors/decode_cef/cef/option.go +++ b/x-pack/filebeat/processors/decode_cef/cef/option.go @@ -16,8 +16,20 @@ type Option interface { // Settings for unpacking messages. type Settings struct { fullExtensionNames bool + removeEmptyValues bool + timezone *time.Location +} - timezone *time.Location +type withRemoveEmptyValues struct{} + +func (w withRemoveEmptyValues) Apply(s *Settings) { + s.removeEmptyValues = true +} + +// WithRemoveEmptyValues causes CEF extension keys without values to be +// dropped. +func WithRemoveEmptyValues() Option { + return withRemoveEmptyValues{} } type withFullExtensionNames struct{} diff --git a/x-pack/filebeat/processors/decode_cef/config.go b/x-pack/filebeat/processors/decode_cef/config.go index 00139c400e4..fe23ca12904 100644 --- a/x-pack/filebeat/processors/decode_cef/config.go +++ b/x-pack/filebeat/processors/decode_cef/config.go @@ -7,13 +7,14 @@ package decode_cef import "github.com/elastic/beats/v7/libbeat/common/cfgtype" type config struct { - Field string `config:"field"` // Source field containing the CEF message. - TargetField string `config:"target_field"` // Target field for the CEF object. - IgnoreMissing bool `config:"ignore_missing"` // Ignore missing source field. - IgnoreFailure bool `config:"ignore_failure"` // Ignore failures when the source field does not contain a CEF message. Parse errors do not cause failures, but are added to error.message. - ID string `config:"id"` // Instance ID for debugging purposes. - ECS bool `config:"ecs"` // Generate ECS fields. - Timezone *cfgtype.Timezone `config:"timezone"` // Timezone used when parsing timestamps that do not contain a time zone or offset. + Field string `config:"field"` // Source field containing the CEF message. + TargetField string `config:"target_field"` // Target field for the CEF object. + IgnoreMissing bool `config:"ignore_missing"` // Ignore missing source field. + IgnoreFailure bool `config:"ignore_failure"` // Ignore failures when the source field does not contain a CEF message. Parse errors do not cause failures, but are added to error.message. + IgnoreEmptyValues bool `config:"ignore_empty_values"` // Ignore CEF extensions with empty values + ID string `config:"id"` // Instance ID for debugging purposes. + ECS bool `config:"ecs"` // Generate ECS fields. + Timezone *cfgtype.Timezone `config:"timezone"` // Timezone used when parsing timestamps that do not contain a time zone or offset. } func defaultConfig() config { diff --git a/x-pack/filebeat/processors/decode_cef/decode_cef.go b/x-pack/filebeat/processors/decode_cef/decode_cef.go index 2e42f846eae..f07f59a44cb 100644 --- a/x-pack/filebeat/processors/decode_cef/decode_cef.go +++ b/x-pack/filebeat/processors/decode_cef/decode_cef.go @@ -88,7 +88,7 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) { // If the version < 0 after parsing then none of the data is valid so return here. var ce cef.Event - if err = ce.Unpack(cefData, cef.WithFullExtensionNames(), cef.WithTimezone(p.Timezone.Location())); ce.Version < 0 && err != nil { + if err = ce.Unpack(cefData, cef.WithFullExtensionNames(), cef.WithTimezone(p.Timezone.Location()), cef.WithRemoveEmptyValues()); ce.Version < 0 && err != nil { if p.IgnoreFailure { return event, nil } @@ -141,7 +141,6 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) { return event, nil } -//nolint:errcheck // All errors are from mapstr puts. func toCEFObject(cefEvent *cef.Event) mapstr.M { // Add CEF header fields. cefObject := mapstr.M{"version": strconv.Itoa(cefEvent.Version)} @@ -180,32 +179,35 @@ func toCEFObject(cefEvent *cef.Event) mapstr.M { return cefObject } -//nolint:errcheck // All errors are from mapstr puts. func writeCEFHeaderToECS(cefEvent *cef.Event, event *beat.Event) { if cefEvent.DeviceVendor != "" { + //nolint:errcheck // All errors are from mapstr puts. event.PutValue("observer.vendor", cefEvent.DeviceVendor) } if cefEvent.DeviceProduct != "" { - // TODO: observer.product is not officially part of ECS. + //nolint:errcheck // All errors are from mapstr puts. event.PutValue("observer.product", cefEvent.DeviceProduct) } if cefEvent.DeviceVersion != "" { + //nolint:errcheck // All errors are from mapstr puts. event.PutValue("observer.version", cefEvent.DeviceVersion) } if cefEvent.DeviceEventClassID != "" { + //nolint:errcheck // All errors are from mapstr puts. event.PutValue("event.code", cefEvent.DeviceEventClassID) } if cefEvent.Name != "" { + //nolint:errcheck // All errors are from mapstr puts. event.PutValue("message", cefEvent.Name) } if cefEvent.Severity != "" { if sev, ok := cefSeverityToNumber(cefEvent.Severity); ok { + //nolint:errcheck // All errors are from mapstr puts. event.PutValue("event.severity", sev) } } } -//nolint:errcheck // All errors are from mapstr puts. func appendErrorMessage(m mapstr.M, msg string) error { const field = "error.message" list, _ := m.GetValue(field) diff --git a/x-pack/filebeat/processors/decode_cef/decode_cef_test.go b/x-pack/filebeat/processors/decode_cef/decode_cef_test.go index e5db6718291..e9bade08de5 100644 --- a/x-pack/filebeat/processors/decode_cef/decode_cef_test.go +++ b/x-pack/filebeat/processors/decode_cef/decode_cef_test.go @@ -59,6 +59,33 @@ func TestProcessorRun(t *testing.T) { "source.user.name": "admin", }, }, + "empty_field_values": { + config: func() config { + c := defaultConfig() + c.TargetField = "" + c.IgnoreEmptyValues = true + return c + }, + message: "CEF:1|Trend Micro|Deep Security Manager|1.2.3|600|User Signed In|3|src= suser= target=admin msg=User signed in from 2001:db8::5", + fields: mapstr.M{ + "version": "1", + "device.event_class_id": "600", + "device.product": "Deep Security Manager", + "device.vendor": "Trend Micro", + "device.version": "1.2.3", + "name": "User Signed In", + "severity": "3", + "event.severity": 3, + "extensions.message": "User signed in from 2001:db8::5", + "extensions.target": "admin", + // ECS + "event.code": "600", + "message": "User signed in from 2001:db8::5", + "observer.product": "Deep Security Manager", + "observer.vendor": "Trend Micro", + "observer.version": "1.2.3", + }, + }, "parse_errors": { message: "CEF:0|Trend Micro|Deep Security Manager|1.2.3|600|User Signed In|Low|msg=User signed in with =xyz", fields: mapstr.M{ diff --git a/x-pack/filebeat/processors/decode_cef/docs/decode_cef.asciidoc b/x-pack/filebeat/processors/decode_cef/docs/decode_cef.asciidoc index 859d24c49c2..c80bc38d1c3 100644 --- a/x-pack/filebeat/processors/decode_cef/docs/decode_cef.asciidoc +++ b/x-pack/filebeat/processors/decode_cef/docs/decode_cef.asciidoc @@ -37,13 +37,14 @@ The `decode_cef` processor has the following configuration settings. .Decode CEF options [options="header"] |====== -| Name | Required | Default | Description | -| `field` | no | message | Source field containing the CEF message to be parsed. | -| `target_field` | no | cef | Target field where the parsed CEF object will be written. | -| `ecs` | no | true | Generate Elastic Common Schema (ECS) fields from the CEF data. - Certain CEF header and extension values will be used to populate ECS fields. | -| `timezone` | no | UTC | IANA time zone name (e.g. `America/New_York`) or fixed time offset (e.g. `+0200`) to use when parsing times that do not contain a time zone. `Local` may be specified to use the machine's local time zone.| -| `ignore_missing` | no | false | Ignore errors when the source field is missing. | -| `ignore_failure` | no | false | Ignore failures when the source field does not contain a CEF message. | -| `id` | no | | An identifier for this processor instance. Useful for debugging. | +| Name | Required | Default | Description | +| `field` | no | message | Source field containing the CEF message to be parsed. | +| `target_field` | no | cef | Target field where the parsed CEF object will be written. | +| `ecs` | no | true | Generate Elastic Common Schema (ECS) fields from the CEF data. + Certain CEF header and extension values will be used to populate ECS fields. | +| `timezone` | no | UTC | IANA time zone name (e.g. `America/New_York`) or fixed time offset (e.g. `+0200`) to use when parsing times that do not contain a time zone. `Local` may be specified to use the machine's local time zone.| +| `ignore_missing` | no | false | Ignore errors when the source field is missing. | +| `ignore_failure` | no | false | Ignore failures when the source field does not contain a CEF message. | +| `ignore_empty_values` | no | false | Ignore CEF extensions with empty values (e.g. `spt= type=1`) | +| `id` | no | | An identifier for this processor instance. Useful for debugging. | |======