Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhancement][filebeat/decode_cef] Add option to ignore missing values #40268

Merged
merged 10 commits into from
Jul 23, 2024
Merged
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Improve robustness and error reporting from packetbeat default route testing. {pull}39757[39757]
- Move x-pack/filebeat/input/salesforce jwt import to v5. {pull}39823[39823]
- Drop x-pack/filebeat/input dependency on github.com/lestrrat-go/jwx/v2. {pull}39968[39968]
- Added `ignore_empty_values` flag in `decode_cef` Filebeat processor. {pull}40268[40268]

==== Deprecated

Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Filebeat, when running with Elastic-Agent, reports status for Filestream input. {pull}40121[40121]
- Implement Elastic Agent status and health reporting for Winlog Filebeat input. {pull}40163[40163]
- Fix filestream's registry GC: registry entries will never be removed if clean_inactive is set to "-1". {pull}40258[40258]
- Added `ignore_empty_values` flag in `decode_cef` Filebeat processor. {pull}40268[40268]

*Heartbeat*

Expand Down
7 changes: 7 additions & 0 deletions x-pack/filebeat/processors/decode_cef/cef/cef.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ func (e *Event) Unpack(data string, opts ...Option) error {

// Mark the data type and do the actual conversion.
field.Type = mapping.Type

if settings.removeEmptyValues && field.String == "" {
// Drop the key because the value is empty field.
delete(e.Extensions, key)
continue
}

field.Interface, err = toType(field.String, mapping.Type, &settings)
if err != nil {
// Drop the key because the field value is invalid.
Expand Down
19 changes: 19 additions & 0 deletions x-pack/filebeat/processors/decode_cef/cef/cef_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ const (

truncatedHeader = "CEF:0|SentinelOne|Mgmt|activityID=1111111111111111111 activityType=3505 siteId=None siteName=None accountId=1222222222222222222 accountName=foo-bar mdr notificationScope=ACCOUNT"

noValueInExtension = `CEF:26|security|threat=manager|1.0|100|trojan successfully stopped|10|src= dst=12.121.122.82 spt=`

// Found by fuzzing but minimised by hand.
fuzz0 = `CEF:0|a=\\ b|`
fuzz1 = `CEF:0|\|a=|b=`
Expand All @@ -84,6 +86,7 @@ var testMessages = []string{
tabMessage,
escapedMessage,
truncatedHeader,
noValueInExtension,
fuzz0,
fuzz1,
fuzz2,
Expand Down Expand Up @@ -161,6 +164,22 @@ func TestEventUnpack(t *testing.T) {
}, e.Extensions)
})

t.Run("noValueInExtension", func(t *testing.T) {
var e Event
err := e.Unpack(noValueInExtension, WithRemoveEmptyValues())
assert.NoError(t, err)
assert.Equal(t, 26, e.Version)
assert.Equal(t, "security", e.DeviceVendor)
assert.Equal(t, "threat=manager", e.DeviceProduct)
assert.Equal(t, "1.0", e.DeviceVersion)
assert.Equal(t, "100", e.DeviceEventClassID)
assert.Equal(t, "trojan successfully stopped", e.Name)
assert.Equal(t, "10", e.Severity)
assert.Equal(t, map[string]*Field{
"dst": IPField("12.121.122.82"),
}, e.Extensions)
})

t.Run("equalsSignInHeader", func(t *testing.T) {
var e Event
err := e.Unpack(equalsSignInHeader)
Expand Down
14 changes: 13 additions & 1 deletion x-pack/filebeat/processors/decode_cef/cef/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,20 @@ type Option interface {
// Settings for unpacking messages.
type Settings struct {
fullExtensionNames bool
removeEmptyValues bool
timezone *time.Location
}

timezone *time.Location
type withRemoveEmptyValues struct{}

func (w withRemoveEmptyValues) Apply(s *Settings) {
s.removeEmptyValues = true
}

// WithRemoveEmptyValues causes CEF extension keys without values to be
// dropped.
func WithRemoveEmptyValues() Option {
return withRemoveEmptyValues{}
}

type withFullExtensionNames struct{}
Expand Down
15 changes: 8 additions & 7 deletions x-pack/filebeat/processors/decode_cef/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ package decode_cef
import "github.com/elastic/beats/v7/libbeat/common/cfgtype"

type config struct {
Field string `config:"field"` // Source field containing the CEF message.
TargetField string `config:"target_field"` // Target field for the CEF object.
IgnoreMissing bool `config:"ignore_missing"` // Ignore missing source field.
IgnoreFailure bool `config:"ignore_failure"` // Ignore failures when the source field does not contain a CEF message. Parse errors do not cause failures, but are added to error.message.
ID string `config:"id"` // Instance ID for debugging purposes.
ECS bool `config:"ecs"` // Generate ECS fields.
Timezone *cfgtype.Timezone `config:"timezone"` // Timezone used when parsing timestamps that do not contain a time zone or offset.
Field string `config:"field"` // Source field containing the CEF message.
TargetField string `config:"target_field"` // Target field for the CEF object.
IgnoreMissing bool `config:"ignore_missing"` // Ignore missing source field.
IgnoreFailure bool `config:"ignore_failure"` // Ignore failures when the source field does not contain a CEF message. Parse errors do not cause failures, but are added to error.message.
IgnoreEmptyValues bool `config:"ignore_empty_values"` // Ignore CEF extensions with empty values
ID string `config:"id"` // Instance ID for debugging purposes.
ECS bool `config:"ecs"` // Generate ECS fields.
Timezone *cfgtype.Timezone `config:"timezone"` // Timezone used when parsing timestamps that do not contain a time zone or offset.
}

func defaultConfig() config {
Expand Down
12 changes: 7 additions & 5 deletions x-pack/filebeat/processors/decode_cef/decode_cef.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) {

// If the version < 0 after parsing then none of the data is valid so return here.
var ce cef.Event
if err = ce.Unpack(cefData, cef.WithFullExtensionNames(), cef.WithTimezone(p.Timezone.Location())); ce.Version < 0 && err != nil {
if err = ce.Unpack(cefData, cef.WithFullExtensionNames(), cef.WithTimezone(p.Timezone.Location()), cef.WithRemoveEmptyValues()); ce.Version < 0 && err != nil {
if p.IgnoreFailure {
return event, nil
}
Expand Down Expand Up @@ -141,7 +141,6 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) {
return event, nil
}

//nolint:errcheck // All errors are from mapstr puts.
func toCEFObject(cefEvent *cef.Event) mapstr.M {
// Add CEF header fields.
cefObject := mapstr.M{"version": strconv.Itoa(cefEvent.Version)}
Expand Down Expand Up @@ -180,32 +179,35 @@ func toCEFObject(cefEvent *cef.Event) mapstr.M {
return cefObject
}

//nolint:errcheck // All errors are from mapstr puts.
func writeCEFHeaderToECS(cefEvent *cef.Event, event *beat.Event) {
if cefEvent.DeviceVendor != "" {
//nolint:errcheck // All errors are from mapstr puts.
event.PutValue("observer.vendor", cefEvent.DeviceVendor)
}
if cefEvent.DeviceProduct != "" {
// TODO: observer.product is not officially part of ECS.
//nolint:errcheck // All errors are from mapstr puts.
event.PutValue("observer.product", cefEvent.DeviceProduct)
}
if cefEvent.DeviceVersion != "" {
//nolint:errcheck // All errors are from mapstr puts.
event.PutValue("observer.version", cefEvent.DeviceVersion)
}
if cefEvent.DeviceEventClassID != "" {
//nolint:errcheck // All errors are from mapstr puts.
event.PutValue("event.code", cefEvent.DeviceEventClassID)
}
if cefEvent.Name != "" {
//nolint:errcheck // All errors are from mapstr puts.
event.PutValue("message", cefEvent.Name)
}
if cefEvent.Severity != "" {
if sev, ok := cefSeverityToNumber(cefEvent.Severity); ok {
//nolint:errcheck // All errors are from mapstr puts.
event.PutValue("event.severity", sev)
}
}
}

//nolint:errcheck // All errors are from mapstr puts.
func appendErrorMessage(m mapstr.M, msg string) error {
const field = "error.message"
list, _ := m.GetValue(field)
Expand Down
27 changes: 27 additions & 0 deletions x-pack/filebeat/processors/decode_cef/decode_cef_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,33 @@ func TestProcessorRun(t *testing.T) {
"source.user.name": "admin",
},
},
"empty_field_values": {
config: func() config {
c := defaultConfig()
c.TargetField = ""
c.IgnoreEmptyValues = true
return c
},
message: "CEF:1|Trend Micro|Deep Security Manager|1.2.3|600|User Signed In|3|src= suser= target=admin msg=User signed in from 2001:db8::5",
fields: mapstr.M{
"version": "1",
"device.event_class_id": "600",
"device.product": "Deep Security Manager",
"device.vendor": "Trend Micro",
"device.version": "1.2.3",
"name": "User Signed In",
"severity": "3",
"event.severity": 3,
"extensions.message": "User signed in from 2001:db8::5",
"extensions.target": "admin",
// ECS
"event.code": "600",
"message": "User signed in from 2001:db8::5",
"observer.product": "Deep Security Manager",
"observer.vendor": "Trend Micro",
"observer.version": "1.2.3",
},
},
"parse_errors": {
message: "CEF:0|Trend Micro|Deep Security Manager|1.2.3|600|User Signed In|Low|msg=User signed in with =xyz",
fields: mapstr.M{
Expand Down
19 changes: 10 additions & 9 deletions x-pack/filebeat/processors/decode_cef/docs/decode_cef.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ The `decode_cef` processor has the following configuration settings.
.Decode CEF options
[options="header"]
|======
| Name | Required | Default | Description |
| `field` | no | message | Source field containing the CEF message to be parsed. |
| `target_field` | no | cef | Target field where the parsed CEF object will be written. |
| `ecs` | no | true | Generate Elastic Common Schema (ECS) fields from the CEF data.
Certain CEF header and extension values will be used to populate ECS fields. |
| `timezone` | no | UTC | IANA time zone name (e.g. `America/New_York`) or fixed time offset (e.g. `+0200`) to use when parsing times that do not contain a time zone. `Local` may be specified to use the machine's local time zone.|
| `ignore_missing` | no | false | Ignore errors when the source field is missing. |
| `ignore_failure` | no | false | Ignore failures when the source field does not contain a CEF message. |
| `id` | no | | An identifier for this processor instance. Useful for debugging. |
| Name | Required | Default | Description |
| `field` | no | message | Source field containing the CEF message to be parsed. |
| `target_field` | no | cef | Target field where the parsed CEF object will be written. |
| `ecs` | no | true | Generate Elastic Common Schema (ECS) fields from the CEF data.
Certain CEF header and extension values will be used to populate ECS fields. |
| `timezone` | no | UTC | IANA time zone name (e.g. `America/New_York`) or fixed time offset (e.g. `+0200`) to use when parsing times that do not contain a time zone. `Local` may be specified to use the machine's local time zone.|
| `ignore_missing` | no | false | Ignore errors when the source field is missing. |
| `ignore_failure` | no | false | Ignore failures when the source field does not contain a CEF message. |
| `ignore_empty_values` | no | false | Ignore CEF extensions with empty values (e.g. `spt= type=1`) |
| `id` | no | | An identifier for this processor instance. Useful for debugging. |
|======
Loading