Skip to content

Commit

Permalink
Merge pull request #1120 from monicasarbu/add_generic_filtering
Browse files Browse the repository at this point in the history
Add support for include_fields and drop_fields
  • Loading branch information
tsg committed Mar 21, 2016
2 parents 07d5d72 + 09ca0f4 commit 06c3132
Show file tree
Hide file tree
Showing 21 changed files with 1,345 additions and 60 deletions.
14 changes: 14 additions & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,18 @@ output:
rotate_every_kb: 1000
#number_of_files: 7

{% if filter_enabled %}

filter:

{%- if drop_fields %}
- drop_fields:
fields: {{drop_fields}}
{%- endif %}
{%- if include_fields is not none %}
- include_fields:
fields: {{include_fields}}
{%- endif %}
{% endif %}

# vim: set ft=jinja:
53 changes: 53 additions & 0 deletions filebeat/tests/system/test_filtering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from filebeat import BaseTest
import os

"""
Contains tests for filtering.
"""


class Test(BaseTest):
def test_dropfields(self):
"""
Check drop_fields filtering action
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/test.log",
filter_enabled=True,
drop_fields=["beat"],
include_fields=None,
)
with open(self.working_dir + "/test.log", "w") as f:
f.write("test message\n")

filebeat = self.start_beat()
self.wait_until(lambda: self.output_has(lines=1))
filebeat.check_kill_and_wait()

output = self.read_output(
required_fields=["@timestamp", "type"],
)[0]
assert "beat.name" not in output
assert "message" in output

def test_include_fields(self):
"""
Check drop_fields filtering action
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/test.log",
filter_enabled=True,
include_fields=["source", "offset", "message"]
)
with open(self.working_dir + "/test.log", "w") as f:
f.write("test message\n")

filebeat = self.start_beat()
self.wait_until(lambda: self.output_has(lines=1))
filebeat.check_kill_and_wait()

output = self.read_output(
required_fields=["@timestamp", "type"],
)[0]
assert "beat.name" not in output
assert "message" in output
17 changes: 13 additions & 4 deletions libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/urso/ucfg"

"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/filter"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/service"
Expand Down Expand Up @@ -84,6 +85,7 @@ type BeatConfig struct {
Output map[string]*ucfg.Config
Logging logp.Logging
Shipper publisher.ShipperConfig
Filter []filter.FilterConfig
}

var printVersion *bool
Expand Down Expand Up @@ -149,7 +151,7 @@ func (b *Beat) Start() error {
// Additional command line args are used to overwrite config options
err, exit := b.CommandLineSetup()
if err != nil {
return err
return fmt.Errorf("fails to load command line setup: %v\n", err)
}

if exit {
Expand All @@ -159,13 +161,13 @@ func (b *Beat) Start() error {
// Loads base config
err = b.LoadConfig()
if err != nil {
return err
return fmt.Errorf("fails to load the config: %v\n", err)
}

// Configures beat
err = b.BT.Config(b)
if err != nil {
return err
return fmt.Errorf("fails to load the beat config: %v\n", err)
}
b.setState(ConfigState)

Expand Down Expand Up @@ -229,13 +231,20 @@ func (b *Beat) LoadConfig() error {

pub, err := publisher.New(b.Name, b.Config.Output, b.Config.Shipper)
if err != nil {
return fmt.Errorf("error Initialising publisher: %v\n", err)
return fmt.Errorf("error initializing publisher: %v\n", err)
}

filters, err := filter.New(b.Config.Filter)
if err != nil {
return fmt.Errorf("error initializing filters: %v\n", err)
}

b.Publisher = pub
pub.RegisterFilter(filters)
b.Events = pub.Client()

logp.Info("Init Beat: %s; Version: %s", b.Name, b.Version)
logp.Info("Filter %v", filters)

return nil
}
Expand Down
86 changes: 86 additions & 0 deletions libbeat/common/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package common

import (
"encoding/json"
"reflect"
"time"

"github.com/elastic/beats/libbeat/logp"
)

func MarshallUnmarshall(v interface{}) (MapStr, error) {
// decode and encode JSON
marshaled, err := json.Marshal(v)
if err != nil {
logp.Warn("marshal err: %v", err)
return nil, err
}
var v1 MapStr
err = json.Unmarshal(marshaled, &v1)
if err != nil {
logp.Warn("unmarshal err: %v")
return nil, err
}

return v1, nil
}

func ConvertToGenericEvent(v MapStr) MapStr {

for key, value := range v {

switch value.(type) {
case Time, *Time:
continue
case time.Location, *time.Location:
continue
case MapStr:
v[key] = ConvertToGenericEvent(value.(MapStr))
continue
case *MapStr:
v[key] = ConvertToGenericEvent(*value.(*MapStr))
continue
default:

typ := reflect.TypeOf(value)

if typ.Kind() == reflect.Ptr {
typ = typ.Elem()
}

switch typ.Kind() {
case reflect.Bool:
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
case reflect.Uintptr:
case reflect.Float32, reflect.Float64:
case reflect.Complex64, reflect.Complex128:
case reflect.String:
case reflect.UnsafePointer:
case reflect.Array, reflect.Slice:
//case reflect.Chan:
//case reflect.Func:
//case reflect.Interface:
case reflect.Map:
anothermap, err := MarshallUnmarshall(value)
if err != nil {
logp.Warn("fail to marschall & unmarshall map %v", key)
continue
}
v[key] = anothermap

case reflect.Struct:
anothermap, err := MarshallUnmarshall(value)
if err != nil {
logp.Warn("fail to marschall & unmarshall struct %v", key)
continue
}
v[key] = anothermap
default:
logp.Warn("unknown type %v", typ)
continue
}
}
}
return v
}
123 changes: 123 additions & 0 deletions libbeat/common/event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package common

import (
"testing"

"github.com/elastic/beats/libbeat/logp"
"github.com/stretchr/testify/assert"
)

func TestConvertNestedMapStr(t *testing.T) {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"})

type io struct {
Input MapStr
Output MapStr
}

type String string

tests := []io{
io{
Input: MapStr{
"key": MapStr{
"key1": "value1",
},
},
Output: MapStr{
"key": MapStr{
"key1": "value1",
},
},
},
io{
Input: MapStr{
"key": MapStr{
"key1": String("value1"),
},
},
Output: MapStr{
"key": MapStr{
"key1": String("value1"),
},
},
},
io{
Input: MapStr{
"key": MapStr{
"key1": []string{"value1", "value2"},
},
},
Output: MapStr{
"key": MapStr{
"key1": []string{"value1", "value2"},
},
},
},
io{
Input: MapStr{
"key": MapStr{
"key1": []String{"value1", "value2"},
},
},
Output: MapStr{
"key": MapStr{
"key1": []String{"value1", "value2"},
},
},
},
io{
Input: MapStr{
"@timestamp": MustParseTime("2015-03-01T12:34:56.123Z"),
},
Output: MapStr{
"@timestamp": MustParseTime("2015-03-01T12:34:56.123Z"),
},
},
}

for _, test := range tests {
assert.EqualValues(t, test.Output, ConvertToGenericEvent(test.Input))
}

}

func TestConvertNestedStruct(t *testing.T) {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"*"})

type io struct {
Input MapStr
Output MapStr
}

type TestStruct struct {
A string
B int
}

tests := []io{
io{
Input: MapStr{
"key": MapStr{
"key1": TestStruct{
A: "hello",
B: 5,
},
},
},
Output: MapStr{
"key": MapStr{
"key1": MapStr{
"A": "hello",
"B": float64(5),
},
},
},
},
}

for _, test := range tests {
assert.EqualValues(t, test.Output, ConvertToGenericEvent(test.Input))
}

}
Loading

0 comments on commit 06c3132

Please sign in to comment.