diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index f0d08701011..fcd5e375104 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -124,4 +124,18 @@ output: rotate_every_kb: 1000 #number_of_files: 7 +{% if filter_enabled %} + +filter: + + {%- if drop_fields %} + - drop_fields: + fields: {{drop_fields}} + {%- endif %} + {%- if include_fields is not none %} + - include_fields: + fields: {{include_fields}} + {%- endif %} +{% endif %} + # vim: set ft=jinja: diff --git a/filebeat/tests/system/test_filtering.py b/filebeat/tests/system/test_filtering.py new file mode 100644 index 00000000000..f7d6561e7a9 --- /dev/null +++ b/filebeat/tests/system/test_filtering.py @@ -0,0 +1,53 @@ +from filebeat import BaseTest +import os + +""" +Contains tests for filtering. +""" + + +class Test(BaseTest): + def test_dropfields(self): + """ + Check drop_fields filtering action + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/test.log", + filter_enabled=True, + drop_fields=["beat"], + include_fields=None, + ) + with open(self.working_dir + "/test.log", "w") as f: + f.write("test message\n") + + filebeat = self.start_beat() + self.wait_until(lambda: self.output_has(lines=1)) + filebeat.check_kill_and_wait() + + output = self.read_output( + required_fields=["@timestamp", "type"], + )[0] + assert "beat.name" not in output + assert "message" in output + + def test_include_fields(self): + """ + Check drop_fields filtering action + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/test.log", + filter_enabled=True, + include_fields=["source", "offset", "message"] + ) + with open(self.working_dir + "/test.log", "w") as f: + f.write("test message\n") + + filebeat = self.start_beat() + self.wait_until(lambda: self.output_has(lines=1)) + filebeat.check_kill_and_wait() + + output = self.read_output( + required_fields=["@timestamp", "type"], + )[0] + assert "beat.name" not in output + assert "message" in output diff --git a/libbeat/beat/beat.go b/libbeat/beat/beat.go index 66543300d6b..904d270e017 100644 --- a/libbeat/beat/beat.go +++ b/libbeat/beat/beat.go @@ -32,6 +32,7 @@ import ( "github.com/urso/ucfg" "github.com/elastic/beats/libbeat/cfgfile" + "github.com/elastic/beats/libbeat/filter" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/libbeat/service" @@ -84,6 +85,7 @@ type BeatConfig struct { Output map[string]*ucfg.Config Logging logp.Logging Shipper publisher.ShipperConfig + Filter []filter.FilterConfig } var printVersion *bool @@ -149,7 +151,7 @@ func (b *Beat) Start() error { // Additional command line args are used to overwrite config options err, exit := b.CommandLineSetup() if err != nil { - return err + return fmt.Errorf("fails to load command line setup: %v\n", err) } if exit { @@ -159,13 +161,13 @@ func (b *Beat) Start() error { // Loads base config err = b.LoadConfig() if err != nil { - return err + return fmt.Errorf("fails to load the config: %v\n", err) } // Configures beat err = b.BT.Config(b) if err != nil { - return err + return fmt.Errorf("fails to load the beat config: %v\n", err) } b.setState(ConfigState) @@ -229,13 +231,20 @@ func (b *Beat) LoadConfig() error { pub, err := publisher.New(b.Name, b.Config.Output, b.Config.Shipper) if err != nil { - return fmt.Errorf("error Initialising publisher: %v\n", err) + return fmt.Errorf("error initializing publisher: %v\n", err) + } + + filters, err := filter.New(b.Config.Filter) + if err != nil { + return fmt.Errorf("error initializing filters: %v\n", err) } b.Publisher = pub + pub.RegisterFilter(filters) b.Events = pub.Client() logp.Info("Init Beat: %s; Version: %s", b.Name, b.Version) + logp.Info("Filter %v", filters) return nil } diff --git a/libbeat/common/event.go b/libbeat/common/event.go new file mode 100644 index 00000000000..4d85aa0ab16 --- /dev/null +++ b/libbeat/common/event.go @@ -0,0 +1,86 @@ +package common + +import ( + "encoding/json" + "reflect" + "time" + + "github.com/elastic/beats/libbeat/logp" +) + +func MarshallUnmarshall(v interface{}) (MapStr, error) { + // decode and encode JSON + marshaled, err := json.Marshal(v) + if err != nil { + logp.Warn("marshal err: %v", err) + return nil, err + } + var v1 MapStr + err = json.Unmarshal(marshaled, &v1) + if err != nil { + logp.Warn("unmarshal err: %v") + return nil, err + } + + return v1, nil +} + +func ConvertToGenericEvent(v MapStr) MapStr { + + for key, value := range v { + + switch value.(type) { + case Time, *Time: + continue + case time.Location, *time.Location: + continue + case MapStr: + v[key] = ConvertToGenericEvent(value.(MapStr)) + continue + case *MapStr: + v[key] = ConvertToGenericEvent(*value.(*MapStr)) + continue + default: + + typ := reflect.TypeOf(value) + + if typ.Kind() == reflect.Ptr { + typ = typ.Elem() + } + + switch typ.Kind() { + case reflect.Bool: + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + case reflect.Uintptr: + case reflect.Float32, reflect.Float64: + case reflect.Complex64, reflect.Complex128: + case reflect.String: + case reflect.UnsafePointer: + case reflect.Array, reflect.Slice: + //case reflect.Chan: + //case reflect.Func: + //case reflect.Interface: + case reflect.Map: + anothermap, err := MarshallUnmarshall(value) + if err != nil { + logp.Warn("fail to marschall & unmarshall map %v", key) + continue + } + v[key] = anothermap + + case reflect.Struct: + anothermap, err := MarshallUnmarshall(value) + if err != nil { + logp.Warn("fail to marschall & unmarshall struct %v", key) + continue + } + v[key] = anothermap + default: + logp.Warn("unknown type %v", typ) + continue + } + } + } + return v +} diff --git a/libbeat/common/event_test.go b/libbeat/common/event_test.go new file mode 100644 index 00000000000..b8bc2dd9d2b --- /dev/null +++ b/libbeat/common/event_test.go @@ -0,0 +1,123 @@ +package common + +import ( + "testing" + + "github.com/elastic/beats/libbeat/logp" + "github.com/stretchr/testify/assert" +) + +func TestConvertNestedMapStr(t *testing.T) { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) + + type io struct { + Input MapStr + Output MapStr + } + + type String string + + tests := []io{ + io{ + Input: MapStr{ + "key": MapStr{ + "key1": "value1", + }, + }, + Output: MapStr{ + "key": MapStr{ + "key1": "value1", + }, + }, + }, + io{ + Input: MapStr{ + "key": MapStr{ + "key1": String("value1"), + }, + }, + Output: MapStr{ + "key": MapStr{ + "key1": String("value1"), + }, + }, + }, + io{ + Input: MapStr{ + "key": MapStr{ + "key1": []string{"value1", "value2"}, + }, + }, + Output: MapStr{ + "key": MapStr{ + "key1": []string{"value1", "value2"}, + }, + }, + }, + io{ + Input: MapStr{ + "key": MapStr{ + "key1": []String{"value1", "value2"}, + }, + }, + Output: MapStr{ + "key": MapStr{ + "key1": []String{"value1", "value2"}, + }, + }, + }, + io{ + Input: MapStr{ + "@timestamp": MustParseTime("2015-03-01T12:34:56.123Z"), + }, + Output: MapStr{ + "@timestamp": MustParseTime("2015-03-01T12:34:56.123Z"), + }, + }, + } + + for _, test := range tests { + assert.EqualValues(t, test.Output, ConvertToGenericEvent(test.Input)) + } + +} + +func TestConvertNestedStruct(t *testing.T) { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) + + type io struct { + Input MapStr + Output MapStr + } + + type TestStruct struct { + A string + B int + } + + tests := []io{ + io{ + Input: MapStr{ + "key": MapStr{ + "key1": TestStruct{ + A: "hello", + B: 5, + }, + }, + }, + Output: MapStr{ + "key": MapStr{ + "key1": MapStr{ + "A": "hello", + "B": float64(5), + }, + }, + }, + }, + } + + for _, test := range tests { + assert.EqualValues(t, test.Output, ConvertToGenericEvent(test.Input)) + } + +} diff --git a/libbeat/common/mapstr.go b/libbeat/common/mapstr.go index e32a6473d81..d934fb34225 100644 --- a/libbeat/common/mapstr.go +++ b/libbeat/common/mapstr.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "strings" "time" ) @@ -57,6 +58,110 @@ func (m MapStr) Update(d MapStr) { } } +func (m MapStr) Delete(key string) error { + keyParts := strings.Split(key, ".") + keysLen := len(keyParts) + + mapp := m + for i := 0; i < keysLen-1; i++ { + keyPart := keyParts[i] + + if _, ok := mapp[keyPart]; ok { + mapp, ok = mapp[keyPart].(MapStr) + if !ok { + return fmt.Errorf("unexpected type of %s key", keyPart) + } + } else { + return fmt.Errorf("unknown key %s", keyPart) + } + } + delete(mapp, keyParts[keysLen-1]) + return nil +} + +func (m MapStr) CopyFieldsTo(to MapStr, key string) error { + + keyParts := strings.Split(key, ".") + keysLen := len(keyParts) + + toPointer := to + fromPointer := m + + for i := 0; i < keysLen-1; i++ { + keyPart := keyParts[i] + var success bool + + if _, ok := fromPointer[keyPart]; ok { + if _, already := toPointer[keyPart]; !already { + toPointer[keyPart] = MapStr{} + } + + fromPointer, success = fromPointer[keyPart].(MapStr) + if !success { + return fmt.Errorf("Unexpected type of %s key", keyPart) + } + + toPointer, success = toPointer[keyPart].(MapStr) + if !success { + return fmt.Errorf("Unexpected type of %s key", keyPart) + } + } else { + return nil + } + } + + if _, ok := fromPointer[keyParts[keysLen-1]]; ok { + toPointer[keyParts[keysLen-1]] = fromPointer[keyParts[keysLen-1]] + } else { + return nil + } + return nil +} + +func (m MapStr) Clone() MapStr { + result := MapStr{} + + for k, v := range m { + mapstr, ok := v.(MapStr) + if ok { + v = mapstr.Clone() + } + result[k] = v + } + + return result +} + +func (m MapStr) HasKey(key string) (bool, error) { + keyParts := strings.Split(key, ".") + keyPartsLen := len(keyParts) + + mapp := m + for i := 0; i < keyPartsLen; i++ { + keyPart := keyParts[i] + + if _, ok := mapp[keyPart]; ok { + if i < keyPartsLen-1 { + mapp, ok = mapp[keyPart].(MapStr) + if !ok { + return false, fmt.Errorf("Unknown type of %s key", keyPart) + } + } + } else { + return false, nil + } + } + return true, nil +} + +func (m MapStr) StringToPrint() string { + json, err := json.MarshalIndent(m, "", " ") + if err != nil { + return fmt.Sprintf("Not valid json: %v", err) + } + return string(json) +} + // Checks if a timestamp field exists and if it doesn't it adds // one by using the injected now() function as a time source. func (m MapStr) EnsureTimestampField(now func() time.Time) error { diff --git a/libbeat/common/mapstr_test.go b/libbeat/common/mapstr_test.go index 57b0f5ae3eb..65f5fc897f7 100644 --- a/libbeat/common/mapstr_test.go +++ b/libbeat/common/mapstr_test.go @@ -43,6 +43,126 @@ func TestMapStrUnion(t *testing.T) { assert.Equal(c, MapStr{"a": 1, "b": 3, "c": 4}) } +func TestMapStrCopyFieldsTo(t *testing.T) { + assert := assert.New(t) + + m := MapStr{ + "a": MapStr{ + "a1": 2, + "a2": 3, + }, + "b": 2, + "c": MapStr{ + "c1": 1, + "c2": 2, + "c3": MapStr{ + "c31": 1, + "c32": 2, + }, + }, + } + c := MapStr{} + + err := m.CopyFieldsTo(c, "dd") + assert.Equal(nil, err) + assert.Equal(MapStr{}, c) + + err = m.CopyFieldsTo(c, "a") + assert.Equal(nil, err) + assert.Equal(MapStr{"a": MapStr{"a1": 2, "a2": 3}}, c) + + err = m.CopyFieldsTo(c, "c.c1") + assert.Equal(nil, err) + assert.Equal(MapStr{"a": MapStr{"a1": 2, "a2": 3}, "c": MapStr{"c1": 1}}, c) + + err = m.CopyFieldsTo(c, "b") + assert.Equal(nil, err) + assert.Equal(MapStr{"a": MapStr{"a1": 2, "a2": 3}, "c": MapStr{"c1": 1}, "b": 2}, c) + + err = m.CopyFieldsTo(c, "c.c3.c32") + assert.Equal(nil, err) + assert.Equal(MapStr{"a": MapStr{"a1": 2, "a2": 3}, "c": MapStr{"c1": 1, "c3": MapStr{"c32": 2}}, "b": 2}, c) +} + +func TestMapStrDelete(t *testing.T) { + assert := assert.New(t) + + m := MapStr{ + "c": MapStr{ + "c1": 1, + "c2": 2, + "c3": MapStr{ + "c31": 1, + "c32": 2, + }, + }, + } + + err := m.Delete("c.c2") + assert.Equal(nil, err) + assert.Equal(MapStr{"c": MapStr{"c1": 1, "c3": MapStr{"c31": 1, "c32": 2}}}, m) + + err = m.Delete("c.c2.c21") + assert.NotEqual(nil, err) + assert.Equal(MapStr{"c": MapStr{"c1": 1, "c3": MapStr{"c31": 1, "c32": 2}}}, m) + + err = m.Delete("c.c3.c31") + assert.Equal(nil, err) + assert.Equal(MapStr{"c": MapStr{"c1": 1, "c3": MapStr{"c32": 2}}}, m) + + err = m.Delete("c") + assert.Equal(nil, err) + assert.Equal(MapStr{}, m) +} + +func TestHasKey(t *testing.T) { + assert := assert.New(t) + + m := MapStr{ + "c": MapStr{ + "c1": 1, + "c2": 2, + "c3": MapStr{ + "c31": 1, + "c32": 2, + }, + }, + } + + hasKey, err := m.HasKey("c.c2") + assert.Equal(nil, err) + assert.Equal(true, hasKey) + + hasKey, err = m.HasKey("c.c4") + assert.Equal(nil, err) + assert.Equal(false, hasKey) + + hasKey, err = m.HasKey("c.c3.c32") + assert.Equal(nil, err) + assert.Equal(true, hasKey) + + hasKey, err = m.HasKey("dd") + assert.Equal(nil, err) + assert.Equal(false, hasKey) + +} + +func TestClone(t *testing.T) { + assert := assert.New(t) + + m := MapStr{ + "c1": 1, + "c2": 2, + "c3": MapStr{ + "c31": 1, + "c32": 2, + }, + } + + c := m.Clone() + assert.Equal(MapStr{"c31": 1, "c32": 2}, c["c3"]) +} + func TestEnsureTimestampField(t *testing.T) { type io struct { @@ -188,6 +308,15 @@ func TestString(t *testing.T) { } } +// Smoke test. The method has no observable outputs so this +// is only verifying there are no panics. +func TestStringToPrint(t *testing.T) { + m := MapStr{} + + assert.Equal(t, "{}", m.StringToPrint()) + assert.Equal(t, true, len(m.StringToPrint()) > 0) +} + func TestMergeFields(t *testing.T) { type io struct { UnderRoot bool diff --git a/libbeat/filter/config.go b/libbeat/filter/config.go new file mode 100644 index 00000000000..db3937175f4 --- /dev/null +++ b/libbeat/filter/config.go @@ -0,0 +1,17 @@ +package filter + +type DropFieldsConfig struct { + Fields []string `config:"fields"` +} + +type IncludeFieldsConfig struct { + Fields []string `config:"fields"` +} + +type FilterConfig struct { + DropFields *DropFieldsConfig `config:"drop_fields"` + IncludeFields *IncludeFieldsConfig `config:"include_fields"` +} + +// fields that should be always exported +var MandatoryExportedFields = []string{"@timestamp", "type"} diff --git a/libbeat/filter/filter.go b/libbeat/filter/filter.go new file mode 100644 index 00000000000..7911526e71c --- /dev/null +++ b/libbeat/filter/filter.go @@ -0,0 +1,165 @@ +package filter + +import ( + "fmt" + "strings" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +type FilterCondition struct { +} + +type FilterRule interface { + Filter(event common.MapStr) (common.MapStr, error) + String() string +} + +/* extends FilterRule */ +type IncludeFields struct { + Fields []string + // condition +} + +/* extend FilterRule */ +type DropFields struct { + Fields []string + // condition +} + +type FilterList struct { + filters []FilterRule +} + +/* FilterList methods */ +func New(config []FilterConfig) (*FilterList, error) { + + Filters := &FilterList{} + Filters.filters = []FilterRule{} + + logp.Debug("filter", "configuration %v", config) + for _, filterConfig := range config { + if filterConfig.DropFields != nil { + Filters.Register(NewDropFields(filterConfig.DropFields.Fields)) + } + + if filterConfig.IncludeFields != nil { + Filters.Register(NewIncludeFields(filterConfig.IncludeFields.Fields)) + } + } + + logp.Debug("filter", "filters: %v", Filters) + return Filters, nil +} + +func (filters *FilterList) Register(filter FilterRule) { + filters.filters = append(filters.filters, filter) + logp.Debug("filter", "Register filter: %v", filter) +} + +func (filters *FilterList) Get(index int) FilterRule { + return filters.filters[index] +} + +// Applies a sequence of filtering rules and returns the filtered event +func (filters *FilterList) Filter(event common.MapStr) common.MapStr { + + // clone the event at first, before starting filtering + filtered := event.Clone() + var err error + + for _, filter := range filters.filters { + filtered, err = filter.Filter(filtered) + if err != nil { + logp.Err("fail to apply filtering rule %s: %s", filter, err) + } + } + + return filtered +} + +func (filters *FilterList) String() string { + s := []string{} + + for _, filter := range filters.filters { + + s = append(s, filter.String()) + } + return strings.Join(s, ", ") +} + +/* IncludeFields methods */ +func NewIncludeFields(fields []string) *IncludeFields { + + /* add read only fields if they are not yet */ + for _, readOnly := range MandatoryExportedFields { + found := false + for _, field := range fields { + if readOnly == field { + found = true + } + } + if !found { + fields = append(fields, readOnly) + } + } + + return &IncludeFields{Fields: fields} +} + +func (f *IncludeFields) Filter(event common.MapStr) (common.MapStr, error) { + + filtered := common.MapStr{} + + for _, field := range f.Fields { + hasKey, err := event.HasKey(field) + if err != nil { + return filtered, fmt.Errorf("Fail to check the key %s: %s", field, err) + } + + if hasKey { + errorOnCopy := event.CopyFieldsTo(filtered, field) + if errorOnCopy != nil { + return filtered, fmt.Errorf("Fail to copy key %s: %s", field, err) + } + } + } + + return filtered, nil +} + +func (f *IncludeFields) String() string { + return "include_fields=" + strings.Join(f.Fields, ", ") +} + +/* DropFields methods */ +func NewDropFields(fields []string) *DropFields { + + /* remove read only fields */ + for _, readOnly := range MandatoryExportedFields { + for i, field := range fields { + if readOnly == field { + fields = append(fields[:i], fields[i+1:]...) + } + } + } + return &DropFields{Fields: fields} +} + +func (f *DropFields) Filter(event common.MapStr) (common.MapStr, error) { + + for _, field := range f.Fields { + err := event.Delete(field) + if err != nil { + return event, fmt.Errorf("Fail to delete key %s: %s", field, err) + } + + } + return event, nil +} + +func (f *DropFields) String() string { + + return "drop_fields=" + strings.Join(f.Fields, ", ") +} diff --git a/libbeat/filter/filter_test.go b/libbeat/filter/filter_test.go new file mode 100644 index 00000000000..40c35521daf --- /dev/null +++ b/libbeat/filter/filter_test.go @@ -0,0 +1,249 @@ +package filter + +import ( + "testing" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/stretchr/testify/assert" +) + +func TestIncludeFields(t *testing.T) { + + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) + } + + Filters := FilterList{} + + Filters.Register(NewIncludeFields([]string{"proc.cpu.total_p", "proc.mem", "dd"})) + + event := common.MapStr{ + "@timestamp": "2016-01-24T18:35:19.308Z", + "beat": common.MapStr{ + "hostname": "mar", + "name": "my-shipper-1", + }, + "count": 1, + "proc": common.MapStr{ + "cpu": common.MapStr{ + "start_time": "Jan14", + "system": 26027, + "total": 79390, + "total_p": 0, + "user": 53363, + }, + "cmdline": "/sbin/launchd", + "mem": common.MapStr{ + "rss": 11194368, + "rss_p": 0, + "share": 0, + "size": 2555572224, + }, + }, + "type": "process", + } + + filteredEvent := Filters.Filter(event) + + expectedEvent := common.MapStr{ + "@timestamp": "2016-01-24T18:35:19.308Z", + "proc": common.MapStr{ + "cpu": common.MapStr{ + "total_p": 0, + }, + "mem": common.MapStr{ + "rss": 11194368, + "rss_p": 0, + "share": 0, + "size": 2555572224, + }, + }, + "type": "process", + } + + assert.Equal(t, expectedEvent, filteredEvent) +} + +func TestIncludeFields1(t *testing.T) { + + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) + } + + Filters := FilterList{} + + Filters.Register(NewIncludeFields([]string{"proc.cpu.total_ddd"})) + + event := common.MapStr{ + "@timestamp": "2016-01-24T18:35:19.308Z", + "beat": common.MapStr{ + "hostname": "mar", + "name": "my-shipper-1", + }, + "count": 1, + + "proc": common.MapStr{ + "cpu": common.MapStr{ + "start_time": "Jan14", + "system": 26027, + "total": 79390, + "total_p": 0, + "user": 53363, + }, + "cmdline": "/sbin/launchd", + "mem": common.MapStr{ + "rss": 11194368, + "rss_p": 0, + "share": 0, + "size": 2555572224, + }, + }, + "type": "process", + } + + filteredEvent := Filters.Filter(event) + + expectedEvent := common.MapStr{ + "@timestamp": "2016-01-24T18:35:19.308Z", + "type": "process", + } + + assert.Equal(t, expectedEvent, filteredEvent) +} + +func TestDropFields(t *testing.T) { + + Filters := FilterList{} + + Filters.Register(NewDropFields([]string{"proc.cpu.start_time", "mem", "proc.cmdline", "beat", "dd"})) + + event := common.MapStr{ + "@timestamp": "2016-01-24T18:35:19.308Z", + "beat": common.MapStr{ + "hostname": "mar", + "name": "my-shipper-1", + }, + "count": 1, + + "proc": common.MapStr{ + "cpu": common.MapStr{ + "start_time": "Jan14", + "system": 26027, + "total": 79390, + "total_p": 0, + "user": 53363, + }, + "cmdline": "/sbin/launchd", + }, + "mem": common.MapStr{ + "rss": 11194368, + "rss_p": 0, + "share": 0, + "size": 2555572224, + }, + "type": "process", + } + + filteredEvent := Filters.Filter(event) + + expectedEvent := common.MapStr{ + "@timestamp": "2016-01-24T18:35:19.308Z", + "count": 1, + "proc": common.MapStr{ + "cpu": common.MapStr{ + "system": 26027, + "total": 79390, + "total_p": 0, + "user": 53363, + }, + }, + "type": "process", + } + + assert.Equal(t, expectedEvent, filteredEvent) +} + +func TestMultipleIncludeFields(t *testing.T) { + + if testing.Verbose() { + logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"}) + } + Filters := FilterList{} + + Filters.Register(NewIncludeFields([]string{"proc"})) + Filters.Register(NewIncludeFields([]string{"proc.cpu.start_time", "proc.cpu.total_p", + "proc.mem.rss_p", "proc.cmdline"})) + + event1 := common.MapStr{ + "@timestamp": "2016-01-24T18:35:19.308Z", + "beat": common.MapStr{ + "hostname": "mar", + "name": "my-shipper-1", + }, + "count": 1, + + "proc": common.MapStr{ + "cpu": common.MapStr{ + "start_time": "Jan14", + "system": 26027, + "total": 79390, + "total_p": 0, + "user": 53363, + }, + "cmdline": "/sbin/launchd", + }, + "mem": common.MapStr{ + "rss": 11194368, + "rss_p": 0, + "share": 0, + "size": 2555572224, + }, + "type": "process", + } + + event2 := common.MapStr{ + "@timestamp": "2016-01-24T18:35:19.308Z", + "beat": common.MapStr{ + "hostname": "mar", + "name": "my-shipper-1", + }, + "count": 1, + "fs": common.MapStr{ + "device_name": "devfs", + "total": 198656, + "used": 198656, + "used_p": 1, + "free": 0, + "avail": 0, + "files": 677, + "free_files": 0, + "mount_point": "/dev", + }, + "type": "process", + } + + expected1 := common.MapStr{ + "@timestamp": "2016-01-24T18:35:19.308Z", + "proc": common.MapStr{ + "cpu": common.MapStr{ + "start_time": "Jan14", + "total_p": 0, + }, + "cmdline": "/sbin/launchd", + }, + + "type": "process", + } + + expected2 := common.MapStr{ + "@timestamp": "2016-01-24T18:35:19.308Z", + "type": "process", + } + + actual1 := Filters.Filter(event1) + actual2 := Filters.Filter(event2) + + assert.Equal(t, expected1, actual1) + assert.Equal(t, expected2, actual2) +} diff --git a/libbeat/publisher/client.go b/libbeat/publisher/client.go index 21e08784c8f..fe6cdb03214 100644 --- a/libbeat/publisher/client.go +++ b/libbeat/publisher/client.go @@ -110,20 +110,40 @@ func newClient(pub *PublisherType) *client { } func (c *client) PublishEvent(event common.MapStr, opts ...ClientOption) bool { + c.annotateEvent(event) + publishEvent := c.filterEvent(event) + if publishEvent == nil { + return false + } + ctx, client := c.getClient(opts) publishedEvents.Add(1) - return client.PublishEvent(ctx, event) + return client.PublishEvent(ctx, *publishEvent) } func (c *client) PublishEvents(events []common.MapStr, opts ...ClientOption) bool { + + // optimization: shares the backing array and capacity + publishEvents := events[:0] + for _, event := range events { c.annotateEvent(event) + + publishEvent := c.filterEvent(event) + if publishEvent != nil { + publishEvents = append(publishEvents, *publishEvent) + } } ctx, client := c.getClient(opts) - publishedEvents.Add(int64(len(events))) + if len(publishEvents) == 0 { + logp.Debug("filter", "No events to publish") + return true + } + + publishedEvents.Add(int64(len(publishEvents))) return client.PublishEvents(ctx, events) } @@ -157,9 +177,22 @@ func (c *client) annotateEvent(event common.MapStr) { delete(event, common.EventMetadataKey) } +} + +func (c *client) filterEvent(event common.MapStr) *common.MapStr { + + if event = common.ConvertToGenericEvent(event); event == nil { + logp.Err("fail to convert to a generic event") + return nil + + } + + // filter the event by applying the configured rules + publishEvent := c.publisher.Filters.Filter(event) if logp.IsDebug("publish") { - PrintPublishEvent(event) + logp.Debug("publish", "Publish: %s", publishEvent.StringToPrint()) } + return &publishEvent } func (c *client) getClient(opts []ClientOption) (Context, eventPublisher) { diff --git a/libbeat/publisher/publish.go b/libbeat/publisher/publish.go index e5ed61fb08e..3b9130c9ad9 100644 --- a/libbeat/publisher/publish.go +++ b/libbeat/publisher/publish.go @@ -1,7 +1,6 @@ package publisher import ( - "encoding/json" "errors" "flag" "os" @@ -11,6 +10,7 @@ import ( "github.com/urso/ucfg" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/filter" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" @@ -59,6 +59,7 @@ type PublisherType struct { TopologyOutput outputs.TopologyOutputer IgnoreOutgoing bool GeoLite *libgeo.GeoIP + Filters *filter.FilterList globalEventMetadata common.EventMetadata // Fields and tags to add to each event. @@ -104,15 +105,6 @@ func init() { publishDisabled = flag.Bool("N", false, "Disable actual publishing for testing") } -func PrintPublishEvent(event common.MapStr) { - json, err := json.MarshalIndent(event, "", " ") - if err != nil { - logp.Err("json.Marshal: %s", err) - } else { - debug("Publish: %s", string(json)) - } -} - func (publisher *PublisherType) IsPublisherIP(ip string) bool { for _, myip := range publisher.IpAddrs { if myip == ip { @@ -177,6 +169,12 @@ func (publisher *PublisherType) PublishTopology(params ...string) error { return nil } +func (publisher *PublisherType) RegisterFilter(filters *filter.FilterList) error { + + publisher.Filters = filters + return nil +} + // Create new PublisherType func New( beatName string, diff --git a/libbeat/publisher/publish_test.go b/libbeat/publisher/publish_test.go index eee5f81b6a4..8ca6208db64 100644 --- a/libbeat/publisher/publish_test.go +++ b/libbeat/publisher/publish_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/outputs" "github.com/stretchr/testify/assert" ) @@ -36,14 +35,6 @@ func (topo testTopology) GetNameByIP(ip string) string { return topo.hostname } -// Smoke test PrintPublishEvent. The method has no observable outputs so this -// is only verifying there are no panics. -func TestPrintPublishEvent(t *testing.T) { - PrintPublishEvent(nil) - PrintPublishEvent(common.MapStr{}) - PrintPublishEvent(testEvent()) -} - // Test GetServerName. func TestPublisherTypeGetServerName(t *testing.T) { pt := &PublisherType{name: shipperName} diff --git a/libbeat/tests/system/beat/beat.py b/libbeat/tests/system/beat/beat.py index a272c0f4770..83e661c67fd 100644 --- a/libbeat/tests/system/beat/beat.py +++ b/libbeat/tests/system/beat/beat.py @@ -11,6 +11,9 @@ import yaml from datetime import datetime, timedelta +BEAT_REQUIRED_FIELDS = ["@timestamp", "type", + "beat.name", "beat.hostname"] + class Proc(object): """ @@ -204,7 +207,9 @@ def render_config_template(self, template=None, f.write(output_str) # Returns output as JSON object with flattened fields (. notation) - def read_output(self, output_file=None): + def read_output(self, + output_file=None, + required_fields=None): # Init defaults if output_file is None: @@ -215,8 +220,7 @@ def read_output(self, output_file=None): for line in f: jsons.append(self.flatten_object(json.loads(line), [])) - self.all_have_fields(jsons, ["@timestamp", "type", - "beat.name", "beat.hostname"]) + self.all_have_fields(jsons, required_fields or BEAT_REQUIRED_FIELDS) return jsons # Returns output as JSON object diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index 56b1e454f84..94bc381911b 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -106,6 +106,9 @@ func (pb *Packetbeat) Config(b *beat.Beat) error { // Read beat implementation config as needed for setup err := cfgfile.Read(&pb.PbConfig, "") + if err != nil { + logp.Err("fails to read the beat config: %v, %v", err, pb.PbConfig) + } // CLI flags over-riding config if *pb.CmdLineArgs.TopSpeed { diff --git a/packetbeat/config/config.go b/packetbeat/config/config.go index 83db20a766f..255a96f5fdc 100644 --- a/packetbeat/config/config.go +++ b/packetbeat/config/config.go @@ -16,7 +16,6 @@ type Config struct { Procs procs.ProcsConfig RunOptions droppriv.RunOptions Logging logp.Logging - Filter map[string]interface{} } type InterfacesConfig struct { diff --git a/packetbeat/tests/system/config/packetbeat.yml.j2 b/packetbeat/tests/system/config/packetbeat.yml.j2 index b2b7717ea8f..edcef59a0db 100644 --- a/packetbeat/tests/system/config/packetbeat.yml.j2 +++ b/packetbeat/tests/system/config/packetbeat.yml.j2 @@ -51,28 +51,28 @@ flows: protocols: icmp: enabled: true -{% if icmp_send_request %} send_request: true{% endif %} -{% if icmp_send_response %} send_response: true{% endif %} +{% if icmp_send_request %} send_request: true{%- endif %} +{% if icmp_send_response %} send_response: true{%- endif %} dns: ports: [{{ dns_ports|default([53])|join(", ") }}] -{% if dns_include_authorities %} include_authorities: true{% endif %} -{% if dns_include_additionals %} include_additionals: true{% endif %} -{% if dns_send_request %} send_request: true{% endif %} -{% if dns_send_response %} send_response: true{% endif %} +{% if dns_include_authorities %} include_authorities: true{%- endif %} +{% if dns_include_additionals %} include_additionals: true{%- endif %} +{% if dns_send_request %} send_request: true{%- endif %} +{% if dns_send_response %} send_response: true{%- endif %} amqp: ports: [{{ amqp_ports|default([5672])|join(", ") }}] -{% if amqp_send_request %} send_request: true{% endif %} -{% if amqp_send_response %} send_response: true{% endif %} +{% if amqp_send_request %} send_request: true{%- endif %} +{% if amqp_send_response %} send_response: true{%- endif %} http: ports: [{{ http_ports|default([80])|join(", ") }}] -{% if http_send_request %} send_request: true{% endif %} -{% if http_send_response %} send_response: true{% endif %} -{% if http_send_all_headers %} send_all_headers: true{% endif %} -{% if http_split_cookie %} split_cookie: true{% endif %} -{%- if http_send_headers %} +{% if http_send_request %} send_request: true{%- endif %} +{% if http_send_response %} send_response: true{%- endif %} +{% if http_send_all_headers %} send_all_headers: true{%- endif %} +{% if http_split_cookie %} split_cookie: true{%- endif %} +{% if http_send_headers %} send_headers: [ {%- for hdr in http_send_headers -%} "{{ hdr }}" @@ -101,25 +101,25 @@ protocols: memcache: ports: [{{ memcache_ports|default([11211])|join(", ") }}] -{% if memcache_send_request %} send_request: true{% endif %} -{% if memcache_send_response %} send_response: true{% endif %} -{% if memcache_parse_unknown %} parseunknown: true{% endif %} -{% if memcache_max_values %} maxvalues: {{ memcache_max_values }}{% endif %} -{% if memcache_udp_transaction_timeout %} udptransactiontimeout: {{ memcache_udp_transaction_timeout}}{% endif %} +{% if memcache_send_request %} send_request: true{%- endif %} +{% if memcache_send_response %} send_response: true{%- endif %} +{% if memcache_parse_unknown %} parseunknown: true{%- endif %} +{% if memcache_max_values %} maxvalues: {{ memcache_max_values }}{%- endif %} +{% if memcache_udp_transaction_timeout %} udptransactiontimeout: {{ memcache_udp_transaction_timeout}}{%- endif %} mysql: ports: [{{ mysql_ports|default([3306])|join(", ") }}] -{% if mysql_max_rows %} max_rows: {{mysql_max_rows}}{% endif %} -{% if mysql_max_row_length %} max_row_length: {{mysql_max_row_length}}{% endif %} -{% if mysql_send_request %} send_request: true{% endif %} -{% if mysql_send_response %} send_response: true{% endif %} +{% if mysql_max_rows %} max_rows: {{mysql_max_rows}}{%- endif %} +{% if mysql_max_row_length %} max_row_length: {{mysql_max_row_length}}{%- endif %} +{% if mysql_send_request %} send_request: true{%- endif %} +{% if mysql_send_response %} send_response: true{%- endif %} pgsql: ports: [{{ pgsql_ports|default([5432])|join(", ") }}] -{% if pgsql_max_rows %} max_rows: {{pgsql_max_rows}}{% endif %} -{% if pgsql_max_row_length %} max_row_length: {{pgsql_max_row_length}}{% endif %} -{% if pgsql_send_request %} send_request: true{% endif %} -{% if pgsql_send_response %} send_response: true{% endif %} +{% if pgsql_max_rows %} max_rows: {{pgsql_max_rows}}{%- endif %} +{% if pgsql_max_row_length %} max_row_length: {{pgsql_max_row_length}}{%- endif %} +{% if pgsql_send_request %} send_request: true{%- endif %} +{% if pgsql_send_response %} send_response: true{%- endif %} redis: ports: [{{ redis_ports|default([6379])|join(", ") }}] @@ -129,7 +129,7 @@ protocols: thrift: ports: [{{ thrift_ports|default([9090])|join(", ") }}] transport_type: "{{ thrift_transport_type|default('socket') }}" -{%- if thrift_idl_files %} +{% if thrift_idl_files %} idl_files: [ {%- for file in thrift_idl_files -%} "{{ beat.working_dir + '/' + file }}" @@ -137,12 +137,12 @@ protocols: {%- endfor -%} ] {%- endif %} -{% if thrift_send_request %} send_request: true{% endif %} -{% if thrift_send_response %} send_response: true{% endif %} +{% if thrift_send_request %} send_request: true{%- endif %} +{% if thrift_send_response %} send_response: true{%- endif %} mongodb: ports: [{{ mongodb_ports|default([27017])|join(", ") }}] -{% if mongodb_send_request %} send_request: true{% endif %} +{% if mongodb_send_request %} send_request: true{%endif %} {% if mongodb_send_response %} send_response: true{% endif %} {% if mongodb_max_docs is not none %} max_docs: {{mongodb_max_docs}}{% endif %} {% if mongodb_max_doc_length is not none %} max_doc_length: {{mongodb_max_doc_length}}{% endif %} @@ -205,4 +205,17 @@ procs: cmdline_grep: memcached {% endif %} +{% if filter_enabled %} + +filter: + + {%- if drop_fields %} + - drop_fields: + fields: {{drop_fields}} + {%- endif %} + {%- if include_fields is not none %} + - include_fields: + fields: {{include_fields}} + {%- endif %} +{% endif %} # vim: set ft=jinja: diff --git a/packetbeat/tests/system/test_0007_tags.py b/packetbeat/tests/system/test_0007_tags.py index bd86696181b..fdb450449fc 100644 --- a/packetbeat/tests/system/test_0007_tags.py +++ b/packetbeat/tests/system/test_0007_tags.py @@ -21,6 +21,7 @@ def test_tags(self): assert len(objs) == 1 o = objs[0] + assert "tags" in o assert o["tags"] == ["nginx", "wsgi", "drum"] def test_empty_tags(self): diff --git a/packetbeat/tests/system/test_0060_filtering.py b/packetbeat/tests/system/test_0060_filtering.py new file mode 100644 index 00000000000..e98b40c245d --- /dev/null +++ b/packetbeat/tests/system/test_0060_filtering.py @@ -0,0 +1,124 @@ +from packetbeat import BaseTest + + +class Test(BaseTest): + + def test_drop_map_fields(self): + + self.render_config_template( + http_send_all_headers=True, + drop_fields=["http.request_headers"], + # export all fields + include_fields=None, + filter_enabled=True, + ) + + self.run_packetbeat(pcap="http_minitwit.pcap", + debug_selectors=["http", "httpdetailed"]) + objs = self.read_output() + + assert len(objs) == 3 + assert all([o["type"] == "http" for o in objs]) + + assert objs[0]["status"] == "OK" + assert objs[1]["status"] == "OK" + assert objs[2]["status"] == "Error" + + assert "http.request_headers" not in objs[0] + assert "http.response_headers" in objs[0] + + def test_drop_end_fields(self): + + self.render_config_template( + http_send_all_headers=True, + drop_fields=["http.response_headers.transfer-encoding"], + # export all fields + include_fields=None, + filter_enabled=True, + ) + + self.run_packetbeat(pcap="http_minitwit.pcap", + debug_selectors=["http", "httpdetailed"]) + objs = self.read_output() + + assert len(objs) == 3 + assert all([o["type"] == "http" for o in objs]) + + assert objs[0]["status"] == "OK" + assert objs[1]["status"] == "OK" + assert objs[2]["status"] == "Error" + + assert "http.request_headers" in objs[0] + assert "http.response_headers" in objs[0] + + # check if filtering deleted the + # htp.response_headers.transfer-encoding + assert "transfer-encoding" not in objs[0]["http.response_headers"] + + def test_drop_unknown_field(self): + + self.render_config_template( + http_send_all_headers=True, + drop_fields=["http.response_headers.transfer-encoding-test"], + # export all fields + include_fields=None, + filter_enabled=True, + ) + + self.run_packetbeat(pcap="http_minitwit.pcap", + debug_selectors=["http", "httpdetailed"]) + objs = self.read_output() + + assert len(objs) == 3 + assert all([o["type"] == "http" for o in objs]) + + assert objs[0]["status"] == "OK" + assert objs[1]["status"] == "OK" + assert objs[2]["status"] == "Error" + + assert "http.request_headers" in objs[0] + assert "http.response_headers" in objs[0] + + # check that htp.response_headers.transfer-encoding + # still exists + assert "transfer-encoding" in objs[0]["http.response_headers"] + + def test_include_empty_list(self): + + self.render_config_template( + http_send_all_headers=True, + # export all mandatory fields + include_fields=[], + filter_enabled=True, + ) + + self.run_packetbeat(pcap="http_minitwit.pcap", + debug_selectors=["http", "httpdetailed"]) + objs = self.read_output( + required_fields=["@timestamp", "type"], + ) + + assert len(objs) == 3 + assert "http.request_headers" not in objs[0] + assert "http.response_headers" not in objs[0] + + def test_drop_no_fields(self): + + self.render_config_template( + http_send_all_headers=True, + drop_fields=[], + # export all fields + include_fields=None, + filter_enabled=True, + ) + + self.run_packetbeat(pcap="http_minitwit.pcap", + debug_selectors=["http", "httpdetailed"]) + objs = self.read_output() + + assert len(objs) == 3 + assert all([o["type"] == "http" for o in objs]) + + assert objs[0]["status"] == "OK" + assert objs[1]["status"] == "OK" + assert objs[2]["status"] == "Error" diff --git a/topbeat/tests/system/config/topbeat.yml.j2 b/topbeat/tests/system/config/topbeat.yml.j2 index 7e63b1b015e..a75c3a16a3a 100644 --- a/topbeat/tests/system/config/topbeat.yml.j2 +++ b/topbeat/tests/system/config/topbeat.yml.j2 @@ -98,3 +98,16 @@ shipper: # Number of rotated log files to keep. Oldest files will be deleted first. #keepfiles: 7 + +filter: + + {% if include_fields %} + + - include_fields: + fields: {{ include_fields }} + + {% endif %} + + - drop_fields: + fields: {{ drop_fields | default([]) }} + diff --git a/topbeat/tests/system/test_filtering.py b/topbeat/tests/system/test_filtering.py new file mode 100644 index 00000000000..4937813ced7 --- /dev/null +++ b/topbeat/tests/system/test_filtering.py @@ -0,0 +1,156 @@ +from topbeat import BaseTest + +""" +Contains tests for filtering. +""" + + +class Test(BaseTest): + def test_dropfields(self): + """ + Check drop_fields filtering action + """ + self.render_config_template( + system_stats=False, + process_stats=True, + filesystem_stats=False, + drop_fields=["proc"] + ) + topbeat = self.start_beat() + self.wait_until( + lambda: self.log_contains( + "output worker: publish")) + topbeat.kill_and_wait() + + output = self.read_output( + required_fields=["@timestamp", "type"], + )[0] + + for key in [ + "proc.cpu.start_time", + "proc.cpu.total", + "proc.cpu.total_p", + "proc.cpu.user", + "proc.cpu.system", + "proc.name", + "proc.state", + "proc.pid", + ]: + assert key not in output + + def test_include_fields(self): + """ + Check include_fields filtering action + """ + self.render_config_template( + system_stats=False, + process_stats=True, + filesystem_stats=False, + include_fields=["proc.cpu", "proc.mem"] + ) + topbeat = self.start_beat() + self.wait_until( + lambda: self.log_contains( + "output worker: publish")) + + topbeat.kill_and_wait() + + output = self.read_output( + required_fields=["@timestamp", "type"], + )[0] + print(output) + + for key in [ + "proc.cpu.start_time", + "proc.cpu.total", + "proc.cpu.total_p", + "proc.cpu.user", + "proc.cpu.system", + "proc.mem.size", + "proc.mem.rss", + "proc.mem.rss_p" + ]: + assert key in output + + for key in [ + "proc.name", + "proc.pid", + ]: + assert key not in output + + def test_multiple_actions(self): + """ + Check the result when configuring two actions: include_fields + and drop_fields. + """ + self.render_config_template( + system_stats=False, + process_stats=True, + filesystem_stats=False, + include_fields=["proc"], + drop_fields=["proc.mem"], + ) + topbeat = self.start_beat() + self.wait_until( + lambda: self.log_contains( + "output worker: publish")) + + topbeat.kill_and_wait() + + output = self.read_output( + required_fields=["@timestamp", "type"], + )[0] + + for key in [ + "proc.cpu.start_time", + "proc.cpu.total", + "proc.cpu.total_p", + "proc.cpu.user", + "proc.cpu.system", + "proc.name", + "proc.pid", + ]: + assert key in output + + for key in [ + "proc.mem.size", + "proc.mem.rss", + "proc.mem.rss_p" + ]: + assert key not in output + + def test_contradictory_multiple_actions(self): + """ + Check the behaviour of a contradictory multiple actions + """ + self.render_config_template( + system_stats=False, + process_stats=True, + filesystem_stats=False, + include_fields=["proc.mem.size", "proc.mem.rss_p"], + drop_fields=["proc.mem.size", "proc.mem.rss_p"], + ) + topbeat = self.start_beat() + self.wait_until( + lambda: self.log_contains( + "output worker: publish")) + + topbeat.kill_and_wait() + + output = self.read_output( + required_fields=["@timestamp", "type"], + )[0] + + for key in [ + "proc.mem.size", + "proc.mem.rss", + "proc.cpu.start_time", + "proc.cpu.total", + "proc.cpu.total_p", + "proc.cpu.user", + "proc.cpu.system", + "proc.name", + "proc.pid", + "proc.mem.rss_p" + ]: + assert key not in output