From 09ca0f4cf67a314366d2023525858c1b8e8efa65 Mon Sep 17 00:00:00 2001 From: Monica Sarbu Date: Mon, 7 Mar 2016 22:36:53 +0100 Subject: [PATCH] Add support for include_fields and drop_fields include_fields - removes all the fields from the event except the configured fields drop_fields - removes only the configured fields Both actions receive fields as argument that can be of type map or a value. For example: "proc", "proc.cpu.total_p". In case a map is passed as an argument, all the fields inside the map are considered. Each event MUST contain the fields ["@timestamp","type"] and they cannot be removed from the event. When multiple filtering rules are defined, we start with a copy of the event and apply the filtering rules one by to the event. event -> filter rule 1 -> event 1 -> filter rule 2 -> event 2 ... where event is the initial event, event1 is the event resulted after applying "filter rule1" and it's considered input for the "filter rule 2" and so on. Problem encountered: The generic filtering expects to receive an event with known types, meaning that it contains MapStr and other primitive types and not structures. In case the event contains an unknown structure for libbeat (it's defined inside the Beat), then we do a marshal & unmarshal to conver it to a MapStr. Configuration: If include_fields=None or not defined, then all the fields are exported If include_fields=[], then only the mandatory fields are exported If drop_fields=[], then all the fields are exported The default values are: include_fields=None drop_fields=[] --- filebeat/tests/system/config/filebeat.yml.j2 | 14 + filebeat/tests/system/test_filtering.py | 53 ++++ libbeat/beat/beat.go | 17 +- libbeat/common/event.go | 86 ++++++ libbeat/common/event_test.go | 123 +++++++++ libbeat/common/mapstr.go | 105 ++++++++ libbeat/common/mapstr_test.go | 129 +++++++++ libbeat/filter/config.go | 17 ++ libbeat/filter/filter.go | 165 ++++++++++++ libbeat/filter/filter_test.go | 249 ++++++++++++++++++ libbeat/publisher/client.go | 39 ++- libbeat/publisher/publish.go | 18 +- libbeat/publisher/publish_test.go | 9 - libbeat/tests/system/beat/beat.py | 10 +- packetbeat/beater/packetbeat.go | 3 + packetbeat/config/config.go | 1 - .../tests/system/config/packetbeat.yml.j2 | 73 ++--- packetbeat/tests/system/test_0007_tags.py | 1 + .../tests/system/test_0060_filtering.py | 124 +++++++++ topbeat/tests/system/config/topbeat.yml.j2 | 13 + topbeat/tests/system/test_filtering.py | 156 +++++++++++ 21 files changed, 1345 insertions(+), 60 deletions(-) create mode 100644 filebeat/tests/system/test_filtering.py create mode 100644 libbeat/common/event.go create mode 100644 libbeat/common/event_test.go create mode 100644 libbeat/filter/config.go create mode 100644 libbeat/filter/filter.go create mode 100644 libbeat/filter/filter_test.go create mode 100644 packetbeat/tests/system/test_0060_filtering.py create mode 100644 topbeat/tests/system/test_filtering.py diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index f0d08701011..fcd5e375104 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -124,4 +124,18 @@ output: rotate_every_kb: 1000 #number_of_files: 7 +{% if filter_enabled %} + +filter: + + {%- if drop_fields %} + - drop_fields: + fields: {{drop_fields}} + {%- endif %} + {%- if include_fields is not none %} + - include_fields: + fields: {{include_fields}} + {%- endif %} +{% endif %} + # vim: set ft=jinja: diff --git a/filebeat/tests/system/test_filtering.py b/filebeat/tests/system/test_filtering.py new file mode 100644 index 00000000000..f7d6561e7a9 --- /dev/null +++ b/filebeat/tests/system/test_filtering.py @@ -0,0 +1,53 @@ +from filebeat import BaseTest +import os + +""" +Contains tests for filtering. +""" + + +class Test(BaseTest): + def test_dropfields(self): + """ + Check drop_fields filtering action + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/test.log", + filter_enabled=True, + drop_fields=["beat"], + include_fields=None, + ) + with open(self.working_dir + "/test.log", "w") as f: + f.write("test message\n") + + filebeat = self.start_beat() + self.wait_until(lambda: self.output_has(lines=1)) + filebeat.check_kill_and_wait() + + output = self.read_output( + required_fields=["@timestamp", "type"], + )[0] + assert "beat.name" not in output + assert "message" in output + + def test_include_fields(self): + """ + Check drop_fields filtering action + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/test.log", + filter_enabled=True, + include_fields=["source", "offset", "message"] + ) + with open(self.working_dir + "/test.log", "w") as f: + f.write("test message\n") + + filebeat = self.start_beat() + self.wait_until(lambda: self.output_has(lines=1)) + filebeat.check_kill_and_wait() + + output = self.read_output( + required_fields=["@timestamp", "type"], + )[0] + assert "beat.name" not in output + assert "message" in output diff --git a/libbeat/beat/beat.go b/libbeat/beat/beat.go index 66543300d6b..904d270e017 100644 --- a/libbeat/beat/beat.go +++ b/libbeat/beat/beat.go @@ -32,6 +32,7 @@ import ( "github.com/urso/ucfg" "github.com/elastic/beats/libbeat/cfgfile" + "github.com/elastic/beats/libbeat/filter" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/libbeat/service" @@ -84,6 +85,7 @@ type BeatConfig struct { Output map[string]*ucfg.Config Logging logp.Logging Shipper publisher.ShipperConfig + Filter []filter.FilterConfig } var printVersion *bool @@ -149,7 +151,7 @@ func (b *Beat) Start() error { // Additional command line args are used to overwrite config options err, exit := b.CommandLineSetup() if err != nil { - return err + return fmt.Errorf("fails to load command line setup: %v\n", err) } if exit { @@ -159,13 +161,13 @@ func (b *Beat) Start() error { // Loads base config err = b.LoadConfig() if err != nil { - return err + return fmt.Errorf("fails to load the config: %v\n", err) } // Configures beat err = b.BT.Config(b) if err != nil { - return err + return fmt.Errorf("fails to load the beat config: %v\n", err) } b.setState(ConfigState) @@ -229,13 +231,20 @@ func (b *Beat) LoadConfig() error { pub, err := publisher.New(b.Name, b.Config.Output, b.Config.Shipper) if err != nil { - return fmt.Errorf("error Initialising publisher: %v\n", err) + return fmt.Errorf("error initializing publisher: %v\n", err) + } + + filters, err := filter.New(b.Config.Filter) + if err != nil { + return fmt.Errorf("error initializing filters: %v\n", err) } b.Publisher = pub + pub.RegisterFilter(filters) b.Events = pub.Client() logp.Info("Init Beat: %s; Version: %s", b.Name, b.Version) + logp.Info("Filter %v", filters) return nil } diff --git a/libbeat/common/event.go b/libbeat/common/event.go new file mode 100644 index 00000000000..4d85aa0ab16 --- /dev/null +++ b/libbeat/common/event.go @@ -0,0 +1,86 @@ +package common + +import ( + "encoding/json" + "reflect" + "time" + + "github.com/elastic/beats/libbeat/logp" +) + +func MarshallUnmarshall(v interface{}) (MapStr, error) { + // decode and encode JSON + marshaled, err := json.Marshal(v) + if err != nil { + logp.Warn("marshal err: %v", err) + return nil, err + } + var v1 MapStr + err = json.Unmarshal(marshaled, &v1) + if err != nil { + logp.Warn("unmarshal err: %v") + return nil, err + } + + return v1, nil +} + +func ConvertToGenericEvent(v MapStr) MapStr { + + for key, value := range v { + + switch value.(type) { + case Time, *Time: + continue + case time.Location, *time.Location: + continue + case MapStr: + v[key] = ConvertToGenericEvent(value.(MapStr)) + continue + case *MapStr: + v[key] = ConvertToGenericEvent(*value.(*MapStr)) + continue + default: + + typ := reflect.TypeOf(value) + + if typ.Kind() == reflect.Ptr { + typ = typ.Elem() + } + + switch typ.Kind() { + case reflect.Bool: + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + case reflect.Uintptr: + case reflect.Float32, reflect.Float64: + case reflect.Complex64, reflect.Complex128: + case reflect.String: + case reflect.UnsafePointer: + case reflect.Array, reflect.Slice: + //case reflect.Chan: + //case reflect.Func: + //case reflect.Interface: + case reflect.Map: + anothermap, err := MarshallUnmarshall(value) + if err != nil { + logp.Warn("fail to marschall & unmarshall map %v", key) + continue + } + v[key] = anothermap + + case reflect.Struct: + anothermap, err := MarshallUnmarshall(value) + if err != nil { + logp.Warn("fail to marschall & unmarshall struct %v", key) + continue + } + v[key] = anothermap + default: + logp.Warn("unknown type %v", typ) + continue + } + } + } + return v +} diff --git a/libbeat/common/event_test.go b/libbeat/common/event_test.go new file mode 100644 index 00000000000..b8bc2dd9d2b --- /dev/null +++ b/libbeat/common/event_test.go @@ -0,0 +1,123 @@ +package common + +import ( + "testing" + + "github.com/elastic/beats/libbeat/logp" + "github.com/stretchr/testify/assert" +) + +func TestConvertNestedMapStr(t *testing.T) { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) + + type io struct { + Input MapStr + Output MapStr + } + + type String string + + tests := []io{ + io{ + Input: MapStr{ + "key": MapStr{ + "key1": "value1", + }, + }, + Output: MapStr{ + "key": MapStr{ + "key1": "value1", + }, + }, + }, + io{ + Input: MapStr{ + "key": MapStr{ + "key1": String("value1"), + }, + }, + Output: MapStr{ + "key": MapStr{ + "key1": String("value1"), + }, + }, + }, + io{ + Input: MapStr{ + "key": MapStr{ + "key1": []string{"value1", "value2"}, + }, + }, + Output: MapStr{ + "key": MapStr{ + "key1": []string{"value1", "value2"}, + }, + }, + }, + io{ + Input: MapStr{ + "key": MapStr{ + "key1": []String{"value1", "value2"}, + }, + }, + Output: MapStr{ + "key": MapStr{ + "key1": []String{"value1", "value2"}, + }, + }, + }, + io{ + Input: MapStr{ + "@timestamp": MustParseTime("2015-03-01T12:34:56.123Z"), + }, + Output: MapStr{ + "@timestamp": MustParseTime("2015-03-01T12:34:56.123Z"), + }, + }, + } + + for _, test := range tests { + assert.EqualValues(t, test.Output, ConvertToGenericEvent(test.Input)) + } + +} + +func TestConvertNestedStruct(t *testing.T) { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) + + type io struct { + Input MapStr + Output MapStr + } + + type TestStruct struct { + A string + B int + } + + tests := []io{ + io{ + Input: MapStr{ + "key": MapStr{ + "key1": TestStruct{ + A: "hello", + B: 5, + }, + }, + }, + Output: MapStr{ + "key": MapStr{ + "key1": MapStr{ + "A": "hello", + "B": float64(5), + }, + }, + }, + }, + } + + for _, test := range tests { + assert.EqualValues(t, test.Output, ConvertToGenericEvent(test.Input)) + } + +} diff --git a/libbeat/common/mapstr.go b/libbeat/common/mapstr.go index e32a6473d81..d934fb34225 100644 --- a/libbeat/common/mapstr.go +++ b/libbeat/common/mapstr.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "strings" "time" ) @@ -57,6 +58,110 @@ func (m MapStr) Update(d MapStr) { } } +func (m MapStr) Delete(key string) error { + keyParts := strings.Split(key, ".") + keysLen := len(keyParts) + + mapp := m + for i := 0; i < keysLen-1; i++ { + keyPart := keyParts[i] + + if _, ok := mapp[keyPart]; ok { + mapp, ok = mapp[keyPart].(MapStr) + if !ok { + return fmt.Errorf("unexpected type of %s key", keyPart) + } + } else { + return fmt.Errorf("unknown key %s", keyPart) + } + } + delete(mapp, keyParts[keysLen-1]) + return nil +} + +func (m MapStr) CopyFieldsTo(to MapStr, key string) error { + + keyParts := strings.Split(key, ".") + keysLen := len(keyParts) + + toPointer := to + fromPointer := m + + for i := 0; i < keysLen-1; i++ { + keyPart := keyParts[i] + var success bool + + if _, ok := fromPointer[keyPart]; ok { + if _, already := toPointer[keyPart]; !already { + toPointer[keyPart] = MapStr{} + } + + fromPointer, success = fromPointer[keyPart].(MapStr) + if !success { + return fmt.Errorf("Unexpected type of %s key", keyPart) + } + + toPointer, success = toPointer[keyPart].(MapStr) + if !success { + return fmt.Errorf("Unexpected type of %s key", keyPart) + } + } else { + return nil + } + } + + if _, ok := fromPointer[keyParts[keysLen-1]]; ok { + toPointer[keyParts[keysLen-1]] = fromPointer[keyParts[keysLen-1]] + } else { + return nil + } + return nil +} + +func (m MapStr) Clone() MapStr { + result := MapStr{} + + for k, v := range m { + mapstr, ok := v.(MapStr) + if ok { + v = mapstr.Clone() + } + result[k] = v + } + + return result +} + +func (m MapStr) HasKey(key string) (bool, error) { + keyParts := strings.Split(key, ".") + keyPartsLen := len(keyParts) + + mapp := m + for i := 0; i < keyPartsLen; i++ { + keyPart := keyParts[i] + + if _, ok := mapp[keyPart]; ok { + if i < keyPartsLen-1 { + mapp, ok = mapp[keyPart].(MapStr) + if !ok { + return false, fmt.Errorf("Unknown type of %s key", keyPart) + } + } + } else { + return false, nil + } + } + return true, nil +} + +func (m MapStr) StringToPrint() string { + json, err := json.MarshalIndent(m, "", " ") + if err != nil { + return fmt.Sprintf("Not valid json: %v", err) + } + return string(json) +} + // Checks if a timestamp field exists and if it doesn't it adds // one by using the injected now() function as a time source. func (m MapStr) EnsureTimestampField(now func() time.Time) error { diff --git a/libbeat/common/mapstr_test.go b/libbeat/common/mapstr_test.go index 57b0f5ae3eb..65f5fc897f7 100644 --- a/libbeat/common/mapstr_test.go +++ b/libbeat/common/mapstr_test.go @@ -43,6 +43,126 @@ func TestMapStrUnion(t *testing.T) { assert.Equal(c, MapStr{"a": 1, "b": 3, "c": 4}) } +func TestMapStrCopyFieldsTo(t *testing.T) { + assert := assert.New(t) + + m := MapStr{ + "a": MapStr{ + "a1": 2, + "a2": 3, + }, + "b": 2, + "c": MapStr{ + "c1": 1, + "c2": 2, + "c3": MapStr{ + "c31": 1, + "c32": 2, + }, + }, + } + c := MapStr{} + + err := m.CopyFieldsTo(c, "dd") + assert.Equal(nil, err) + assert.Equal(MapStr{}, c) + + err = m.CopyFieldsTo(c, "a") + assert.Equal(nil, err) + assert.Equal(MapStr{"a": MapStr{"a1": 2, "a2": 3}}, c) + + err = m.CopyFieldsTo(c, "c.c1") + assert.Equal(nil, err) + assert.Equal(MapStr{"a": MapStr{"a1": 2, "a2": 3}, "c": MapStr{"c1": 1}}, c) + + err = m.CopyFieldsTo(c, "b") + assert.Equal(nil, err) + assert.Equal(MapStr{"a": MapStr{"a1": 2, "a2": 3}, "c": MapStr{"c1": 1}, "b": 2}, c) + + err = m.CopyFieldsTo(c, "c.c3.c32") + assert.Equal(nil, err) + assert.Equal(MapStr{"a": MapStr{"a1": 2, "a2": 3}, "c": MapStr{"c1": 1, "c3": MapStr{"c32": 2}}, "b": 2}, c) +} + +func TestMapStrDelete(t *testing.T) { + assert := assert.New(t) + + m := MapStr{ + "c": MapStr{ + "c1": 1, + "c2": 2, + "c3": MapStr{ + "c31": 1, + "c32": 2, + }, + }, + } + + err := m.Delete("c.c2") + assert.Equal(nil, err) + assert.Equal(MapStr{"c": MapStr{"c1": 1, "c3": MapStr{"c31": 1, "c32": 2}}}, m) + + err = m.Delete("c.c2.c21") + assert.NotEqual(nil, err) + assert.Equal(MapStr{"c": MapStr{"c1": 1, "c3": MapStr{"c31": 1, "c32": 2}}}, m) + + err = m.Delete("c.c3.c31") + assert.Equal(nil, err) + assert.Equal(MapStr{"c": MapStr{"c1": 1, "c3": MapStr{"c32": 2}}}, m) + + err = m.Delete("c") + assert.Equal(nil, err) + assert.Equal(MapStr{}, m) +} + +func TestHasKey(t *testing.T) { + assert := assert.New(t) + + m := MapStr{ + "c": MapStr{ + "c1": 1, + "c2": 2, + "c3": MapStr{ + "c31": 1, + "c32": 2, + }, + }, + } + + hasKey, err := m.HasKey("c.c2") + assert.Equal(nil, err) + assert.Equal(true, hasKey) + + hasKey, err = m.HasKey("c.c4") + assert.Equal(nil, err) + assert.Equal(false, hasKey) + + hasKey, err = m.HasKey("c.c3.c32") + assert.Equal(nil, err) + assert.Equal(true, hasKey) + + hasKey, err = m.HasKey("dd") + assert.Equal(nil, err) + assert.Equal(false, hasKey) + +} + +func TestClone(t *testing.T) { + assert := assert.New(t) + + m := MapStr{ + "c1": 1, + "c2": 2, + "c3": MapStr{ + "c31": 1, + "c32": 2, + }, + } + + c := m.Clone() + assert.Equal(MapStr{"c31": 1, "c32": 2}, c["c3"]) +} + func TestEnsureTimestampField(t *testing.T) { type io struct { @@ -188,6 +308,15 @@ func TestString(t *testing.T) { } } +// Smoke test. The method has no observable outputs so this +// is only verifying there are no panics. +func TestStringToPrint(t *testing.T) { + m := MapStr{} + + assert.Equal(t, "{}", m.StringToPrint()) + assert.Equal(t, true, len(m.StringToPrint()) > 0) +} + func TestMergeFields(t *testing.T) { type io struct { UnderRoot bool diff --git a/libbeat/filter/config.go b/libbeat/filter/config.go new file mode 100644 index 00000000000..db3937175f4 --- /dev/null +++ b/libbeat/filter/config.go @@ -0,0 +1,17 @@ +package filter + +type DropFieldsConfig struct { + Fields []string `config:"fields"` +} + +type IncludeFieldsConfig struct { + Fields []string `config:"fields"` +} + +type FilterConfig struct { + DropFields *DropFieldsConfig `config:"drop_fields"` + IncludeFields *IncludeFieldsConfig `config:"include_fields"` +} + +// fields that should be always exported +var MandatoryExportedFields = []string{"@timestamp", "type"} diff --git a/libbeat/filter/filter.go b/libbeat/filter/filter.go new file mode 100644 index 00000000000..7911526e71c --- /dev/null +++ b/libbeat/filter/filter.go @@ -0,0 +1,165 @@ +package filter + +import ( + "fmt" + "strings" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +type FilterCondition struct { +} + +type FilterRule interface { + Filter(event common.MapStr) (common.MapStr, error) + String() string +} + +/* extends FilterRule */ +type IncludeFields struct { + Fields []string + // condition +} + +/* extend FilterRule */ +type DropFields struct { + Fields []string + // condition +} + +type FilterList struct { + filters []FilterRule +} + +/* FilterList methods */ +func New(config []FilterConfig) (*FilterList, error) { + + Filters := &FilterList{} + Filters.filters = []FilterRule{} + + logp.Debug("filter", "configuration %v", config) + for _, filterConfig := range config { + if filterConfig.DropFields != nil { + Filters.Register(NewDropFields(filterConfig.DropFields.Fields)) + } + + if filterConfig.IncludeFields != nil { + Filters.Register(NewIncludeFields(filterConfig.IncludeFields.Fields)) + } + } + + logp.Debug("filter", "filters: %v", Filters) + return Filters, nil +} + +func (filters *FilterList) Register(filter FilterRule) { + filters.filters = append(filters.filters, filter) + logp.Debug("filter", "Register filter: %v", filter) +} + +func (filters *FilterList) Get(index int) FilterRule { + return filters.filters[index] +} + +// Applies a sequence of filtering rules and returns the filtered event +func (filters *FilterList) Filter(event common.MapStr) common.MapStr { + + // clone the event at first, before starting filtering + filtered := event.Clone() + var err error + + for _, filter := range filters.filters { + filtered, err = filter.Filter(filtered) + if err != nil { + logp.Err("fail to apply filtering rule %s: %s", filter, err) + } + } + + return filtered +} + +func (filters *FilterList) String() string { + s := []string{} + + for _, filter := range filters.filters { + + s = append(s, filter.String()) + } + return strings.Join(s, ", ") +} + +/* IncludeFields methods */ +func NewIncludeFields(fields []string) *IncludeFields { + + /* add read only fields if they are not yet */ + for _, readOnly := range MandatoryExportedFields { + found := false + for _, field := range fields { + if readOnly == field { + found = true + } + } + if !found { + fields = append(fields, readOnly) + } + } + + return &IncludeFields{Fields: fields} +} + +func (f *IncludeFields) Filter(event common.MapStr) (common.MapStr, error) { + + filtered := common.MapStr{} + + for _, field := range f.Fields { + hasKey, err := event.HasKey(field) + if err != nil { + return filtered, fmt.Errorf("Fail to check the key %s: %s", field, err) + } + + if hasKey { + errorOnCopy := event.CopyFieldsTo(filtered, field) + if errorOnCopy != nil { + return filtered, fmt.Errorf("Fail to copy key %s: %s", field, err) + } + } + } + + return filtered, nil +} + +func (f *IncludeFields) String() string { + return "include_fields=" + strings.Join(f.Fields, ", ") +} + +/* DropFields methods */ +func NewDropFields(fields []string) *DropFields { + + /* remove read only fields */ + for _, readOnly := range MandatoryExportedFields { + for i, field := range fields { + if readOnly == field { + fields = append(fields[:i], fields[i+1:]...) + } + } + } + return &DropFields{Fields: fields} +} + +func (f *DropFields) Filter(event common.MapStr) (common.MapStr, error) { + + for _, field := range f.Fields { + err := event.Delete(field) + if err != nil { + return event, fmt.Errorf("Fail to delete key %s: %s", field, err) + } + + } + return event, nil +} + +func (f *DropFields) String() string { + + return "drop_fields=" + strings.Join(f.Fields, ", ") +} diff --git a/libbeat/filter/filter_test.go b/libbeat/filter/filter_test.go new file mode 100644 index 00000000000..40c35521daf --- /dev/null +++ b/libbeat/filter/filter_test.go @@ -0,0 +1,249 @@ +package filter + +import ( + "testing" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/stretchr/testify/assert" +) + +func TestIncludeFields(t *testing.T) { + + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) + } + + Filters := FilterList{} + + Filters.Register(NewIncludeFields([]string{"proc.cpu.total_p", "proc.mem", "dd"})) + + event := common.MapStr{ + "@timestamp": "2016-01-24T18:35:19.308Z", + "beat": common.MapStr{ + "hostname": "mar", + "name": "my-shipper-1", + }, + "count": 1, + "proc": common.MapStr{ + "cpu": common.MapStr{ + "start_time": "Jan14", + "system": 26027, + "total": 79390, + "total_p": 0, + "user": 53363, + }, + "cmdline": "/sbin/launchd", + "mem": common.MapStr{ + "rss": 11194368, + "rss_p": 0, + "share": 0, + "size": 2555572224, + }, + }, + "type": "process", + } + + filteredEvent := Filters.Filter(event) + + expectedEvent := common.MapStr{ + "@timestamp": "2016-01-24T18:35:19.308Z", + "proc": common.MapStr{ + "cpu": common.MapStr{ + "total_p": 0, + }, + "mem": common.MapStr{ + "rss": 11194368, + "rss_p": 0, + "share": 0, + "size": 2555572224, + }, + }, + "type": "process", + } + + assert.Equal(t, expectedEvent, filteredEvent) +} + +func TestIncludeFields1(t *testing.T) { + + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) + } + + Filters := FilterList{} + + Filters.Register(NewIncludeFields([]string{"proc.cpu.total_ddd"})) + + event := common.MapStr{ + "@timestamp": "2016-01-24T18:35:19.308Z", + "beat": common.MapStr{ + "hostname": "mar", + "name": "my-shipper-1", + }, + "count": 1, + + "proc": common.MapStr{ + "cpu": common.MapStr{ + "start_time": "Jan14", + "system": 26027, + "total": 79390, + "total_p": 0, + "user": 53363, + }, + "cmdline": "/sbin/launchd", + "mem": common.MapStr{ + "rss": 11194368, + "rss_p": 0, + "share": 0, + "size": 2555572224, + }, + }, + "type": "process", + } + + filteredEvent := Filters.Filter(event) + + expectedEvent := common.MapStr{ + "@timestamp": "2016-01-24T18:35:19.308Z", + "type": "process", + } + + assert.Equal(t, expectedEvent, filteredEvent) +} + +func TestDropFields(t *testing.T) { + + Filters := FilterList{} + + Filters.Register(NewDropFields([]string{"proc.cpu.start_time", "mem", "proc.cmdline", "beat", "dd"})) + + event := common.MapStr{ + "@timestamp": "2016-01-24T18:35:19.308Z", + "beat": common.MapStr{ + "hostname": "mar", + "name": "my-shipper-1", + }, + "count": 1, + + "proc": common.MapStr{ + "cpu": common.MapStr{ + "start_time": "Jan14", + "system": 26027, + "total": 79390, + "total_p": 0, + "user": 53363, + }, + "cmdline": "/sbin/launchd", + }, + "mem": common.MapStr{ + "rss": 11194368, + "rss_p": 0, + "share": 0, + "size": 2555572224, + }, + "type": "process", + } + + filteredEvent := Filters.Filter(event) + + expectedEvent := common.MapStr{ + "@timestamp": "2016-01-24T18:35:19.308Z", + "count": 1, + "proc": common.MapStr{ + "cpu": common.MapStr{ + "system": 26027, + "total": 79390, + "total_p": 0, + "user": 53363, + }, + }, + "type": "process", + } + + assert.Equal(t, expectedEvent, filteredEvent) +} + +func TestMultipleIncludeFields(t *testing.T) { + + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) + } + Filters := FilterList{} + + Filters.Register(NewIncludeFields([]string{"proc"})) + Filters.Register(NewIncludeFields([]string{"proc.cpu.start_time", "proc.cpu.total_p", + "proc.mem.rss_p", "proc.cmdline"})) + + event1 := common.MapStr{ + "@timestamp": "2016-01-24T18:35:19.308Z", + "beat": common.MapStr{ + "hostname": "mar", + "name": "my-shipper-1", + }, + "count": 1, + + "proc": common.MapStr{ + "cpu": common.MapStr{ + "start_time": "Jan14", + "system": 26027, + "total": 79390, + "total_p": 0, + "user": 53363, + }, + "cmdline": "/sbin/launchd", + }, + "mem": common.MapStr{ + "rss": 11194368, + "rss_p": 0, + "share": 0, + "size": 2555572224, + }, + "type": "process", + } + + event2 := common.MapStr{ + "@timestamp": "2016-01-24T18:35:19.308Z", + "beat": common.MapStr{ + "hostname": "mar", + "name": "my-shipper-1", + }, + "count": 1, + "fs": common.MapStr{ + "device_name": "devfs", + "total": 198656, + "used": 198656, + "used_p": 1, + "free": 0, + "avail": 0, + "files": 677, + "free_files": 0, + "mount_point": "/dev", + }, + "type": "process", + } + + expected1 := common.MapStr{ + "@timestamp": "2016-01-24T18:35:19.308Z", + "proc": common.MapStr{ + "cpu": common.MapStr{ + "start_time": "Jan14", + "total_p": 0, + }, + "cmdline": "/sbin/launchd", + }, + + "type": "process", + } + + expected2 := common.MapStr{ + "@timestamp": "2016-01-24T18:35:19.308Z", + "type": "process", + } + + actual1 := Filters.Filter(event1) + actual2 := Filters.Filter(event2) + + assert.Equal(t, expected1, actual1) + assert.Equal(t, expected2, actual2) +} diff --git a/libbeat/publisher/client.go b/libbeat/publisher/client.go index 21e08784c8f..fe6cdb03214 100644 --- a/libbeat/publisher/client.go +++ b/libbeat/publisher/client.go @@ -110,20 +110,40 @@ func newClient(pub *PublisherType) *client { } func (c *client) PublishEvent(event common.MapStr, opts ...ClientOption) bool { + c.annotateEvent(event) + publishEvent := c.filterEvent(event) + if publishEvent == nil { + return false + } + ctx, client := c.getClient(opts) publishedEvents.Add(1) - return client.PublishEvent(ctx, event) + return client.PublishEvent(ctx, *publishEvent) } func (c *client) PublishEvents(events []common.MapStr, opts ...ClientOption) bool { + + // optimization: shares the backing array and capacity + publishEvents := events[:0] + for _, event := range events { c.annotateEvent(event) + + publishEvent := c.filterEvent(event) + if publishEvent != nil { + publishEvents = append(publishEvents, *publishEvent) + } } ctx, client := c.getClient(opts) - publishedEvents.Add(int64(len(events))) + if len(publishEvents) == 0 { + logp.Debug("filter", "No events to publish") + return true + } + + publishedEvents.Add(int64(len(publishEvents))) return client.PublishEvents(ctx, events) } @@ -157,9 +177,22 @@ func (c *client) annotateEvent(event common.MapStr) { delete(event, common.EventMetadataKey) } +} + +func (c *client) filterEvent(event common.MapStr) *common.MapStr { + + if event = common.ConvertToGenericEvent(event); event == nil { + logp.Err("fail to convert to a generic event") + return nil + + } + + // filter the event by applying the configured rules + publishEvent := c.publisher.Filters.Filter(event) if logp.IsDebug("publish") { - PrintPublishEvent(event) + logp.Debug("publish", "Publish: %s", publishEvent.StringToPrint()) } + return &publishEvent } func (c *client) getClient(opts []ClientOption) (Context, eventPublisher) { diff --git a/libbeat/publisher/publish.go b/libbeat/publisher/publish.go index e5ed61fb08e..3b9130c9ad9 100644 --- a/libbeat/publisher/publish.go +++ b/libbeat/publisher/publish.go @@ -1,7 +1,6 @@ package publisher import ( - "encoding/json" "errors" "flag" "os" @@ -11,6 +10,7 @@ import ( "github.com/urso/ucfg" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/filter" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" @@ -59,6 +59,7 @@ type PublisherType struct { TopologyOutput outputs.TopologyOutputer IgnoreOutgoing bool GeoLite *libgeo.GeoIP + Filters *filter.FilterList globalEventMetadata common.EventMetadata // Fields and tags to add to each event. @@ -104,15 +105,6 @@ func init() { publishDisabled = flag.Bool("N", false, "Disable actual publishing for testing") } -func PrintPublishEvent(event common.MapStr) { - json, err := json.MarshalIndent(event, "", " ") - if err != nil { - logp.Err("json.Marshal: %s", err) - } else { - debug("Publish: %s", string(json)) - } -} - func (publisher *PublisherType) IsPublisherIP(ip string) bool { for _, myip := range publisher.IpAddrs { if myip == ip { @@ -177,6 +169,12 @@ func (publisher *PublisherType) PublishTopology(params ...string) error { return nil } +func (publisher *PublisherType) RegisterFilter(filters *filter.FilterList) error { + + publisher.Filters = filters + return nil +} + // Create new PublisherType func New( beatName string, diff --git a/libbeat/publisher/publish_test.go b/libbeat/publisher/publish_test.go index eee5f81b6a4..8ca6208db64 100644 --- a/libbeat/publisher/publish_test.go +++ b/libbeat/publisher/publish_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/outputs" "github.com/stretchr/testify/assert" ) @@ -36,14 +35,6 @@ func (topo testTopology) GetNameByIP(ip string) string { return topo.hostname } -// Smoke test PrintPublishEvent. The method has no observable outputs so this -// is only verifying there are no panics. -func TestPrintPublishEvent(t *testing.T) { - PrintPublishEvent(nil) - PrintPublishEvent(common.MapStr{}) - PrintPublishEvent(testEvent()) -} - // Test GetServerName. func TestPublisherTypeGetServerName(t *testing.T) { pt := &PublisherType{name: shipperName} diff --git a/libbeat/tests/system/beat/beat.py b/libbeat/tests/system/beat/beat.py index a272c0f4770..83e661c67fd 100644 --- a/libbeat/tests/system/beat/beat.py +++ b/libbeat/tests/system/beat/beat.py @@ -11,6 +11,9 @@ import yaml from datetime import datetime, timedelta +BEAT_REQUIRED_FIELDS = ["@timestamp", "type", + "beat.name", "beat.hostname"] + class Proc(object): """ @@ -204,7 +207,9 @@ def render_config_template(self, template=None, f.write(output_str) # Returns output as JSON object with flattened fields (. notation) - def read_output(self, output_file=None): + def read_output(self, + output_file=None, + required_fields=None): # Init defaults if output_file is None: @@ -215,8 +220,7 @@ def read_output(self, output_file=None): for line in f: jsons.append(self.flatten_object(json.loads(line), [])) - self.all_have_fields(jsons, ["@timestamp", "type", - "beat.name", "beat.hostname"]) + self.all_have_fields(jsons, required_fields or BEAT_REQUIRED_FIELDS) return jsons # Returns output as JSON object diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index 56b1e454f84..94bc381911b 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -106,6 +106,9 @@ func (pb *Packetbeat) Config(b *beat.Beat) error { // Read beat implementation config as needed for setup err := cfgfile.Read(&pb.PbConfig, "") + if err != nil { + logp.Err("fails to read the beat config: %v, %v", err, pb.PbConfig) + } // CLI flags over-riding config if *pb.CmdLineArgs.TopSpeed { diff --git a/packetbeat/config/config.go b/packetbeat/config/config.go index 83db20a766f..255a96f5fdc 100644 --- a/packetbeat/config/config.go +++ b/packetbeat/config/config.go @@ -16,7 +16,6 @@ type Config struct { Procs procs.ProcsConfig RunOptions droppriv.RunOptions Logging logp.Logging - Filter map[string]interface{} } type InterfacesConfig struct { diff --git a/packetbeat/tests/system/config/packetbeat.yml.j2 b/packetbeat/tests/system/config/packetbeat.yml.j2 index b2b7717ea8f..edcef59a0db 100644 --- a/packetbeat/tests/system/config/packetbeat.yml.j2 +++ b/packetbeat/tests/system/config/packetbeat.yml.j2 @@ -51,28 +51,28 @@ flows: protocols: icmp: enabled: true -{% if icmp_send_request %} send_request: true{% endif %} -{% if icmp_send_response %} send_response: true{% endif %} +{% if icmp_send_request %} send_request: true{%- endif %} +{% if icmp_send_response %} send_response: true{%- endif %} dns: ports: [{{ dns_ports|default([53])|join(", ") }}] -{% if dns_include_authorities %} include_authorities: true{% endif %} -{% if dns_include_additionals %} include_additionals: true{% endif %} -{% if dns_send_request %} send_request: true{% endif %} -{% if dns_send_response %} send_response: true{% endif %} +{% if dns_include_authorities %} include_authorities: true{%- endif %} +{% if dns_include_additionals %} include_additionals: true{%- endif %} +{% if dns_send_request %} send_request: true{%- endif %} +{% if dns_send_response %} send_response: true{%- endif %} amqp: ports: [{{ amqp_ports|default([5672])|join(", ") }}] -{% if amqp_send_request %} send_request: true{% endif %} -{% if amqp_send_response %} send_response: true{% endif %} +{% if amqp_send_request %} send_request: true{%- endif %} +{% if amqp_send_response %} send_response: true{%- endif %} http: ports: [{{ http_ports|default([80])|join(", ") }}] -{% if http_send_request %} send_request: true{% endif %} -{% if http_send_response %} send_response: true{% endif %} -{% if http_send_all_headers %} send_all_headers: true{% endif %} -{% if http_split_cookie %} split_cookie: true{% endif %} -{%- if http_send_headers %} +{% if http_send_request %} send_request: true{%- endif %} +{% if http_send_response %} send_response: true{%- endif %} +{% if http_send_all_headers %} send_all_headers: true{%- endif %} +{% if http_split_cookie %} split_cookie: true{%- endif %} +{% if http_send_headers %} send_headers: [ {%- for hdr in http_send_headers -%} "{{ hdr }}" @@ -101,25 +101,25 @@ protocols: memcache: ports: [{{ memcache_ports|default([11211])|join(", ") }}] -{% if memcache_send_request %} send_request: true{% endif %} -{% if memcache_send_response %} send_response: true{% endif %} -{% if memcache_parse_unknown %} parseunknown: true{% endif %} -{% if memcache_max_values %} maxvalues: {{ memcache_max_values }}{% endif %} -{% if memcache_udp_transaction_timeout %} udptransactiontimeout: {{ memcache_udp_transaction_timeout}}{% endif %} +{% if memcache_send_request %} send_request: true{%- endif %} +{% if memcache_send_response %} send_response: true{%- endif %} +{% if memcache_parse_unknown %} parseunknown: true{%- endif %} +{% if memcache_max_values %} maxvalues: {{ memcache_max_values }}{%- endif %} +{% if memcache_udp_transaction_timeout %} udptransactiontimeout: {{ memcache_udp_transaction_timeout}}{%- endif %} mysql: ports: [{{ mysql_ports|default([3306])|join(", ") }}] -{% if mysql_max_rows %} max_rows: {{mysql_max_rows}}{% endif %} -{% if mysql_max_row_length %} max_row_length: {{mysql_max_row_length}}{% endif %} -{% if mysql_send_request %} send_request: true{% endif %} -{% if mysql_send_response %} send_response: true{% endif %} +{% if mysql_max_rows %} max_rows: {{mysql_max_rows}}{%- endif %} +{% if mysql_max_row_length %} max_row_length: {{mysql_max_row_length}}{%- endif %} +{% if mysql_send_request %} send_request: true{%- endif %} +{% if mysql_send_response %} send_response: true{%- endif %} pgsql: ports: [{{ pgsql_ports|default([5432])|join(", ") }}] -{% if pgsql_max_rows %} max_rows: {{pgsql_max_rows}}{% endif %} -{% if pgsql_max_row_length %} max_row_length: {{pgsql_max_row_length}}{% endif %} -{% if pgsql_send_request %} send_request: true{% endif %} -{% if pgsql_send_response %} send_response: true{% endif %} +{% if pgsql_max_rows %} max_rows: {{pgsql_max_rows}}{%- endif %} +{% if pgsql_max_row_length %} max_row_length: {{pgsql_max_row_length}}{%- endif %} +{% if pgsql_send_request %} send_request: true{%- endif %} +{% if pgsql_send_response %} send_response: true{%- endif %} redis: ports: [{{ redis_ports|default([6379])|join(", ") }}] @@ -129,7 +129,7 @@ protocols: thrift: ports: [{{ thrift_ports|default([9090])|join(", ") }}] transport_type: "{{ thrift_transport_type|default('socket') }}" -{%- if thrift_idl_files %} +{% if thrift_idl_files %} idl_files: [ {%- for file in thrift_idl_files -%} "{{ beat.working_dir + '/' + file }}" @@ -137,12 +137,12 @@ protocols: {%- endfor -%} ] {%- endif %} -{% if thrift_send_request %} send_request: true{% endif %} -{% if thrift_send_response %} send_response: true{% endif %} +{% if thrift_send_request %} send_request: true{%- endif %} +{% if thrift_send_response %} send_response: true{%- endif %} mongodb: ports: [{{ mongodb_ports|default([27017])|join(", ") }}] -{% if mongodb_send_request %} send_request: true{% endif %} +{% if mongodb_send_request %} send_request: true{%endif %} {% if mongodb_send_response %} send_response: true{% endif %} {% if mongodb_max_docs is not none %} max_docs: {{mongodb_max_docs}}{% endif %} {% if mongodb_max_doc_length is not none %} max_doc_length: {{mongodb_max_doc_length}}{% endif %} @@ -205,4 +205,17 @@ procs: cmdline_grep: memcached {% endif %} +{% if filter_enabled %} + +filter: + + {%- if drop_fields %} + - drop_fields: + fields: {{drop_fields}} + {%- endif %} + {%- if include_fields is not none %} + - include_fields: + fields: {{include_fields}} + {%- endif %} +{% endif %} # vim: set ft=jinja: diff --git a/packetbeat/tests/system/test_0007_tags.py b/packetbeat/tests/system/test_0007_tags.py index bd86696181b..fdb450449fc 100644 --- a/packetbeat/tests/system/test_0007_tags.py +++ b/packetbeat/tests/system/test_0007_tags.py @@ -21,6 +21,7 @@ def test_tags(self): assert len(objs) == 1 o = objs[0] + assert "tags" in o assert o["tags"] == ["nginx", "wsgi", "drum"] def test_empty_tags(self): diff --git a/packetbeat/tests/system/test_0060_filtering.py b/packetbeat/tests/system/test_0060_filtering.py new file mode 100644 index 00000000000..e98b40c245d --- /dev/null +++ b/packetbeat/tests/system/test_0060_filtering.py @@ -0,0 +1,124 @@ +from packetbeat import BaseTest + + +class Test(BaseTest): + + def test_drop_map_fields(self): + + self.render_config_template( + http_send_all_headers=True, + drop_fields=["http.request_headers"], + # export all fields + include_fields=None, + filter_enabled=True, + ) + + self.run_packetbeat(pcap="http_minitwit.pcap", + debug_selectors=["http", "httpdetailed"]) + objs = self.read_output() + + assert len(objs) == 3 + assert all([o["type"] == "http" for o in objs]) + + assert objs[0]["status"] == "OK" + assert objs[1]["status"] == "OK" + assert objs[2]["status"] == "Error" + + assert "http.request_headers" not in objs[0] + assert "http.response_headers" in objs[0] + + def test_drop_end_fields(self): + + self.render_config_template( + http_send_all_headers=True, + drop_fields=["http.response_headers.transfer-encoding"], + # export all fields + include_fields=None, + filter_enabled=True, + ) + + self.run_packetbeat(pcap="http_minitwit.pcap", + debug_selectors=["http", "httpdetailed"]) + objs = self.read_output() + + assert len(objs) == 3 + assert all([o["type"] == "http" for o in objs]) + + assert objs[0]["status"] == "OK" + assert objs[1]["status"] == "OK" + assert objs[2]["status"] == "Error" + + assert "http.request_headers" in objs[0] + assert "http.response_headers" in objs[0] + + # check if filtering deleted the + # htp.response_headers.transfer-encoding + assert "transfer-encoding" not in objs[0]["http.response_headers"] + + def test_drop_unknown_field(self): + + self.render_config_template( + http_send_all_headers=True, + drop_fields=["http.response_headers.transfer-encoding-test"], + # export all fields + include_fields=None, + filter_enabled=True, + ) + + self.run_packetbeat(pcap="http_minitwit.pcap", + debug_selectors=["http", "httpdetailed"]) + objs = self.read_output() + + assert len(objs) == 3 + assert all([o["type"] == "http" for o in objs]) + + assert objs[0]["status"] == "OK" + assert objs[1]["status"] == "OK" + assert objs[2]["status"] == "Error" + + assert "http.request_headers" in objs[0] + assert "http.response_headers" in objs[0] + + # check that htp.response_headers.transfer-encoding + # still exists + assert "transfer-encoding" in objs[0]["http.response_headers"] + + def test_include_empty_list(self): + + self.render_config_template( + http_send_all_headers=True, + # export all mandatory fields + include_fields=[], + filter_enabled=True, + ) + + self.run_packetbeat(pcap="http_minitwit.pcap", + debug_selectors=["http", "httpdetailed"]) + objs = self.read_output( + required_fields=["@timestamp", "type"], + ) + + assert len(objs) == 3 + assert "http.request_headers" not in objs[0] + assert "http.response_headers" not in objs[0] + + def test_drop_no_fields(self): + + self.render_config_template( + http_send_all_headers=True, + drop_fields=[], + # export all fields + include_fields=None, + filter_enabled=True, + ) + + self.run_packetbeat(pcap="http_minitwit.pcap", + debug_selectors=["http", "httpdetailed"]) + objs = self.read_output() + + assert len(objs) == 3 + assert all([o["type"] == "http" for o in objs]) + + assert objs[0]["status"] == "OK" + assert objs[1]["status"] == "OK" + assert objs[2]["status"] == "Error" diff --git a/topbeat/tests/system/config/topbeat.yml.j2 b/topbeat/tests/system/config/topbeat.yml.j2 index 7e63b1b015e..a75c3a16a3a 100644 --- a/topbeat/tests/system/config/topbeat.yml.j2 +++ b/topbeat/tests/system/config/topbeat.yml.j2 @@ -98,3 +98,16 @@ shipper: # Number of rotated log files to keep. Oldest files will be deleted first. #keepfiles: 7 + +filter: + + {% if include_fields %} + + - include_fields: + fields: {{ include_fields }} + + {% endif %} + + - drop_fields: + fields: {{ drop_fields | default([]) }} + diff --git a/topbeat/tests/system/test_filtering.py b/topbeat/tests/system/test_filtering.py new file mode 100644 index 00000000000..4937813ced7 --- /dev/null +++ b/topbeat/tests/system/test_filtering.py @@ -0,0 +1,156 @@ +from topbeat import BaseTest + +""" +Contains tests for filtering. +""" + + +class Test(BaseTest): + def test_dropfields(self): + """ + Check drop_fields filtering action + """ + self.render_config_template( + system_stats=False, + process_stats=True, + filesystem_stats=False, + drop_fields=["proc"] + ) + topbeat = self.start_beat() + self.wait_until( + lambda: self.log_contains( + "output worker: publish")) + topbeat.kill_and_wait() + + output = self.read_output( + required_fields=["@timestamp", "type"], + )[0] + + for key in [ + "proc.cpu.start_time", + "proc.cpu.total", + "proc.cpu.total_p", + "proc.cpu.user", + "proc.cpu.system", + "proc.name", + "proc.state", + "proc.pid", + ]: + assert key not in output + + def test_include_fields(self): + """ + Check include_fields filtering action + """ + self.render_config_template( + system_stats=False, + process_stats=True, + filesystem_stats=False, + include_fields=["proc.cpu", "proc.mem"] + ) + topbeat = self.start_beat() + self.wait_until( + lambda: self.log_contains( + "output worker: publish")) + + topbeat.kill_and_wait() + + output = self.read_output( + required_fields=["@timestamp", "type"], + )[0] + print(output) + + for key in [ + "proc.cpu.start_time", + "proc.cpu.total", + "proc.cpu.total_p", + "proc.cpu.user", + "proc.cpu.system", + "proc.mem.size", + "proc.mem.rss", + "proc.mem.rss_p" + ]: + assert key in output + + for key in [ + "proc.name", + "proc.pid", + ]: + assert key not in output + + def test_multiple_actions(self): + """ + Check the result when configuring two actions: include_fields + and drop_fields. + """ + self.render_config_template( + system_stats=False, + process_stats=True, + filesystem_stats=False, + include_fields=["proc"], + drop_fields=["proc.mem"], + ) + topbeat = self.start_beat() + self.wait_until( + lambda: self.log_contains( + "output worker: publish")) + + topbeat.kill_and_wait() + + output = self.read_output( + required_fields=["@timestamp", "type"], + )[0] + + for key in [ + "proc.cpu.start_time", + "proc.cpu.total", + "proc.cpu.total_p", + "proc.cpu.user", + "proc.cpu.system", + "proc.name", + "proc.pid", + ]: + assert key in output + + for key in [ + "proc.mem.size", + "proc.mem.rss", + "proc.mem.rss_p" + ]: + assert key not in output + + def test_contradictory_multiple_actions(self): + """ + Check the behaviour of a contradictory multiple actions + """ + self.render_config_template( + system_stats=False, + process_stats=True, + filesystem_stats=False, + include_fields=["proc.mem.size", "proc.mem.rss_p"], + drop_fields=["proc.mem.size", "proc.mem.rss_p"], + ) + topbeat = self.start_beat() + self.wait_until( + lambda: self.log_contains( + "output worker: publish")) + + topbeat.kill_and_wait() + + output = self.read_output( + required_fields=["@timestamp", "type"], + )[0] + + for key in [ + "proc.mem.size", + "proc.mem.rss", + "proc.cpu.start_time", + "proc.cpu.total", + "proc.cpu.total_p", + "proc.cpu.user", + "proc.cpu.system", + "proc.name", + "proc.pid", + "proc.mem.rss_p" + ]: + assert key not in output