From 388c90a5b6ca07578b5ecddf655208592a385a3b Mon Sep 17 00:00:00 2001 From: Camden Cheek Date: Tue, 7 Jul 2020 12:51:32 -0400 Subject: [PATCH 1/2] Expand fields to select labels Previously, fields could only select values located on the entry's record. However, it's often necessary to filter or parse values that would more naturally exist on fields like labels. This commit refactors fields into an interface which is fulfilled by both the `RecordField` and the `LabelField` objects. `RecordField` is essentially what was previously called `Field`. Notably, this change makes the zero-value of `Field` invalid, since it will contain a `nil` pointer. --- docs/types/field.md | 48 +-- entry/entry.go | 10 +- entry/entry_test.go | 78 ++++- entry/field.go | 228 ++------------ entry/field_test.go | 297 +----------------- entry/label_field.go | 46 +++ entry/label_field_test.go | 202 ++++++++++++ entry/record_field.go | 220 +++++++++++++ entry/record_field_test.go | 274 ++++++++++++++++ plugin/builtin/input/file/file_test.go | 4 +- plugin/builtin/input/generate_test.go | 4 +- plugin/builtin/input/tcp_test.go | 4 +- plugin/builtin/input/udp_test.go | 4 +- plugin/builtin/output/elastic_test.go | 4 +- plugin/builtin/output/google_cloud_test.go | 6 +- plugin/builtin/parser/json_test.go | 6 +- plugin/builtin/parser/regex_test.go | 2 + plugin/builtin/parser/severity_test.go | 4 +- plugin/builtin/parser/syslog.go | 2 +- plugin/builtin/parser/syslog_test.go | 2 + plugin/builtin/parser/time_test.go | 12 +- .../transformer/k8s_metadata_decorator.go | 4 +- .../k8s_metadata_decorator_test.go | 4 +- plugin/builtin/transformer/metadata.go | 12 +- plugin/builtin/transformer/restructure.go | 46 ++- .../builtin/transformer/restructure_test.go | 48 +-- plugin/builtin/transformer/router.go | 6 +- plugin/helper/input.go | 4 + plugin/helper/input_test.go | 2 +- plugin/helper/parser.go | 8 + plugin/helper/parser_test.go | 37 ++- 31 files changed, 1029 insertions(+), 599 deletions(-) create mode 100644 entry/label_field.go create mode 100644 entry/label_field_test.go create mode 100644 entry/record_field.go create mode 100644 entry/record_field_test.go diff --git a/docs/types/field.md b/docs/types/field.md index 7e792c234..f7f882d4d 100644 --- a/docs/types/field.md +++ b/docs/types/field.md @@ -1,18 +1,13 @@ ## Fields -_Fields_ are the primary way to tell carbon which fields of a log's record to use for the operations of its plugins. +_Fields_ are the primary way to tell carbon which fields of an entry to use for the operations of its plugins. Most often, these will be things like fields to parse for a parser plugin, or the field to write a new value to. -Fields are `.`-delimited strings which allow you to selected into nested records in the field. The root level is specified by `$` such as in `$.key1`, but since all fields are expected to be relative to root, the `$` is implied and be omitted. For example, in the record below, `nested_key` can be equivalently selected with `$.key2.nested_key` or `key2.nested_key`. +Fields are `.`-delimited strings which allow you to select labels or records on the entry. Fields can currently be used to select labels or values on a record. To select a label, prefix your field with `$label.` such as with `$label.my_label`. For values on the record, use the prefix `$record.` such as `$record.my_value`. -```json -{ - "key1": "value1", - "key2": { - "nested_key": "nested_value" - } -} -``` +Record fields can be nested arbitrarily deeply, such as `$record.my_value.my_nested_value`. + +If a field does not start with either `$label` or `$record`, `$record` is assumed. For example, `my_value` is equivalent to `$record.my_value`. ## Examples @@ -27,20 +22,27 @@ Config: - add: field: "key3" value: "value3" - - remove: "key2.nested_key1" + - remove: "$record.key2.nested_key1" + - add: + field: "$labels.my_label" + value: "my_label_value" ``` - +
Input record Output record
Input entry Output entry
```json { - "key1": "value1", - "key2": { - "nested_key1": "nested_value1", - "nested_key2": "nested_value2" + "timestamp": "", + "labels": {}, + "record": { + "key1": "value1", + "key2": { + "nested_key1": "nested_value1", + "nested_key2": "nested_value2" + } } } ``` @@ -50,11 +52,17 @@ Config: ```json { - "key1": "value1", - "key2": { - "nested_key2": "nested_value2" + "timestamp": "", + "labels": { + "my_label": "my_label_value" }, - "key3": "value3" + "record": { + "key1": "value1", + "key2": { + "nested_key2": "nested_value2" + }, + "key3": "value3" + } } ``` diff --git a/entry/entry.go b/entry/entry.go index d0aa57ece..db7827137 100644 --- a/entry/entry.go +++ b/entry/entry.go @@ -28,19 +28,19 @@ func (entry *Entry) AddLabel(key, value string) { entry.Labels[key] = value } -func (entry *Entry) Get(field Field) (interface{}, bool) { +func (entry *Entry) Get(field FieldInterface) (interface{}, bool) { return field.Get(entry) } -func (entry *Entry) Set(field Field, val interface{}) { - field.Set(entry, val, true) +func (entry *Entry) Set(field FieldInterface, val interface{}) error { + return field.Set(entry, val) } -func (entry *Entry) Delete(field Field) (interface{}, bool) { +func (entry *Entry) Delete(field FieldInterface) (interface{}, bool) { return field.Delete(entry) } -func (entry *Entry) Read(field Field, dest interface{}) error { +func (entry *Entry) Read(field FieldInterface, dest interface{}) error { val, ok := entry.Get(field) if !ok { return fmt.Errorf("field does not exist") diff --git a/entry/entry_test.go b/entry/entry_test.go index 5663dffb9..232d746bf 100644 --- a/entry/entry_test.go +++ b/entry/entry_test.go @@ -35,84 +35,84 @@ func TestRead(t *testing.T) { t.Run("field not exist error", func(t *testing.T) { var s string - err := testEntry.Read(Field{[]string{"nonexistant_field"}}, &s) + err := testEntry.Read(NewRecordField("nonexistant_field"), &s) require.Error(t, err) }) t.Run("unsupported type error", func(t *testing.T) { var s **string - err := testEntry.Read(Field{[]string{"string_field"}}, &s) + err := testEntry.Read(NewRecordField("string_field"), &s) require.Error(t, err) }) t.Run("string", func(t *testing.T) { var s string - err := testEntry.Read(Field{[]string{"string_field"}}, &s) + err := testEntry.Read(NewRecordField("string_field"), &s) require.NoError(t, err) require.Equal(t, "string_val", s) }) t.Run("string error", func(t *testing.T) { var s string - err := testEntry.Read(Field{[]string{"map_string_interface_field"}}, &s) + err := testEntry.Read(NewRecordField("map_string_interface_field"), &s) require.Error(t, err) }) t.Run("map[string]interface{}", func(t *testing.T) { var m map[string]interface{} - err := testEntry.Read(Field{[]string{"map_string_interface_field"}}, &m) + err := testEntry.Read(NewRecordField("map_string_interface_field"), &m) require.NoError(t, err) require.Equal(t, map[string]interface{}{"nested": "interface_val"}, m) }) t.Run("map[string]interface{} error", func(t *testing.T) { var m map[string]interface{} - err := testEntry.Read(Field{[]string{"string_field"}}, &m) + err := testEntry.Read(NewRecordField("string_field"), &m) require.Error(t, err) }) t.Run("map[string]string from map[string]interface{}", func(t *testing.T) { var m map[string]string - err := testEntry.Read(Field{[]string{"map_string_interface_field"}}, &m) + err := testEntry.Read(NewRecordField("map_string_interface_field"), &m) require.NoError(t, err) require.Equal(t, map[string]string{"nested": "interface_val"}, m) }) t.Run("map[string]string from map[string]interface{} err", func(t *testing.T) { var m map[string]string - err := testEntry.Read(Field{[]string{"map_string_interface_nonstring_field"}}, &m) + err := testEntry.Read(NewRecordField("map_string_interface_nonstring_field"), &m) require.Error(t, err) }) t.Run("map[string]string from map[interface{}]interface{}", func(t *testing.T) { var m map[string]string - err := testEntry.Read(Field{[]string{"map_interface_interface_field"}}, &m) + err := testEntry.Read(NewRecordField("map_interface_interface_field"), &m) require.NoError(t, err) require.Equal(t, map[string]string{"nested": "interface_val"}, m) }) t.Run("map[string]string from map[interface{}]interface{} nonstring key error", func(t *testing.T) { var m map[string]string - err := testEntry.Read(Field{[]string{"map_interface_interface_nonstring_key_field"}}, &m) + err := testEntry.Read(NewRecordField("map_interface_interface_nonstring_key_field"), &m) require.Error(t, err) }) t.Run("map[string]string from map[interface{}]interface{} nonstring value error", func(t *testing.T) { var m map[string]string - err := testEntry.Read(Field{[]string{"map_interface_interface_nonstring_value_field"}}, &m) + err := testEntry.Read(NewRecordField("map_interface_interface_nonstring_value_field"), &m) require.Error(t, err) }) t.Run("interface{} from any", func(t *testing.T) { var i interface{} - err := testEntry.Read(Field{[]string{"map_interface_interface_field"}}, &i) + err := testEntry.Read(NewRecordField("map_interface_interface_field"), &i) require.NoError(t, err) require.Equal(t, map[interface{}]interface{}{"nested": "interface_val"}, i) }) t.Run("string from []byte", func(t *testing.T) { var i string - err := testEntry.Read(NewField("byte_field"), &i) + err := testEntry.Read(NewRecordField("byte_field"), &i) require.NoError(t, err) require.Equal(t, "test", i) }) @@ -139,3 +139,55 @@ func TestCopy(t *testing.T) { require.Equal(t, map[string]string{"label": "value"}, copy.Labels) require.Equal(t, "test", copy.Record) } + +func TestFieldFromString(t *testing.T) { + cases := []struct { + name string + input string + output Field + expectedError bool + }{ + { + "SimpleRecord", + "test", + Field{RecordField{[]string{"test"}}}, + false, + }, + { + "PrefixedRecord", + "$.test", + Field{RecordField{[]string{"test"}}}, + false, + }, + { + "FullPrefixedRecord", + "$record.test", + Field{RecordField{[]string{"test"}}}, + false, + }, + { + "SimpleLabel", + "$labels.test", + Field{LabelField{"test"}}, + false, + }, + { + "LabelsTooManyFields", + "$labels.test.bar", + Field{}, + true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + f, err := fieldFromString(tc.input) + if tc.expectedError { + require.Error(t, err) + return + } + + require.Equal(t, tc.output, f) + }) + } +} diff --git a/entry/field.go b/entry/field.go index 6412cd7bf..88cd18f0f 100644 --- a/entry/field.go +++ b/entry/field.go @@ -10,218 +10,56 @@ import ( // It is used to get, set, and delete values at this field. // It is deserialized from JSON dot notation. type Field struct { - Keys []string + FieldInterface } -// Parent returns the parent of the current field. -// In the case that the record field points to the root node, it is a no-op. -func (f Field) Parent() Field { - if f.IsRoot() { - return f - } - - keys := f.Keys[0 : f.Length()-1] - return NewField(keys...) +type FieldInterface interface { + Get(*Entry) (interface{}, bool) + Set(entry *Entry, value interface{}) error + Delete(entry *Entry) (interface{}, bool) + String() string } -// Child returns a child of the current field using the given key. -func (f Field) Child(key string) Field { - child := make([]string, f.Length(), f.Length()+1) - copy(child, f.Keys) - keys := append(child, key) - return NewField(keys...) -} - -// IsRoot returns a boolean indicating if this is a root level field. -func (f Field) IsRoot() bool { - return f.Length() == 0 -} - -// String returns the string representation of this field. -func (f Field) String() string { - return toJSONDot(f) -} - -// Length returns the number of keys in the field. -func (f Field) Length() int { - return len(f.Keys) -} - -// Get will retrieve a value from an entry's record using the field. -// It will return the value and whether the field existed. -func (f Field) Get(entry *Entry) (interface{}, bool) { - var currentValue interface{} = entry.Record - - for _, key := range f.Keys { - currentRecord, ok := currentValue.(map[string]interface{}) - if !ok { - return nil, false - } - - currentValue, ok = currentRecord[key] - if !ok { - return nil, false - } - } - - return currentValue, true -} - -// Set will set a value on an entry's record using the field. -// If a key already exists, it will be overwritten. -// If mergeMaps is set to true, map values will be merged together. -func (f Field) Set(entry *Entry, value interface{}, mergeMaps bool) { - mapValue, isMapValue := value.(map[string]interface{}) - if isMapValue && mergeMaps { - f.Merge(entry, mapValue) - return - } - - if f.IsRoot() { - entry.Record = value - return - } - - currentMap, ok := entry.Record.(map[string]interface{}) - if !ok { - currentMap = map[string]interface{}{} - entry.Record = currentMap - } - - for i, key := range f.Keys { - if i == f.Length()-1 { - currentMap[key] = value - return - } - currentMap = f.getNestedMap(currentMap, key) +func (f *Field) UnmarshalJSON(raw []byte) error { + var s string + err := json.Unmarshal(raw, &s) + if err != nil { + return err } + *f, err = fieldFromString(s) + return err } -// Merge will attempt to merge the contents of a map into an entry's record. -// It will overwrite any intermediate values as necessary. -func (f Field) Merge(entry *Entry, mapValues map[string]interface{}) { - currentMap, ok := entry.Record.(map[string]interface{}) - if !ok { - currentMap = map[string]interface{}{} - entry.Record = currentMap - } - - for _, key := range f.Keys { - currentMap = f.getNestedMap(currentMap, key) - } - - for key, value := range mapValues { - currentMap[key] = value +func (f *Field) UnmarshalYAML(unmarshal func(interface{}) error) error { + var s string + err := unmarshal(&s) + if err != nil { + return err } + *f, err = fieldFromString(s) + return err } -// Delete removes a value from an entry's record using the field. -// It will return the deleted value and whether the field existed. -func (f Field) Delete(entry *Entry) (interface{}, bool) { - if f.IsRoot() { - oldRecord := entry.Record - entry.Record = nil - return oldRecord, true - } - - currentValue := entry.Record - for i, key := range f.Keys { - currentMap, ok := currentValue.(map[string]interface{}) - if !ok { - return nil, false - } - - currentValue, ok = currentMap[key] - if !ok { - return nil, false - } +func fieldFromString(s string) (Field, error) { + split := strings.Split(s, ".") - if i == f.Length()-1 { - delete(currentMap, key) - return currentValue, true + switch split[0] { + case "$labels": + if len(split) != 2 { + return Field{}, fmt.Errorf("labels cannot be nested") } + return Field{LabelField{split[1]}}, nil + case "$record", "$": + return Field{RecordField{split[1:]}}, nil + default: + return Field{RecordField{split}}, nil } - - return nil, false } -// getNestedMap will get a nested map assigned to a key. -// If the map does not exist, it will create and return it. -func (f Field) getNestedMap(currentMap map[string]interface{}, key string) map[string]interface{} { - currentValue, ok := currentMap[key] - if !ok { - currentMap[key] = map[string]interface{}{} - } - - nextMap, ok := currentValue.(map[string]interface{}) - if !ok { - nextMap = map[string]interface{}{} - currentMap[key] = nextMap - } - - return nextMap -} - -/**************** - Serialization -****************/ - -// UnmarshalJSON will attempt to unmarshal the field from JSON. -func (f *Field) UnmarshalJSON(raw []byte) error { - var value string - if err := json.Unmarshal(raw, &value); err != nil { - return fmt.Errorf("the field is not a string: %s", err) - } - - *f = fromJSONDot(value) - return nil -} - -// MarshalJSON will marshal the field for JSON. func (f Field) MarshalJSON() ([]byte, error) { - json := fmt.Sprintf(`"%s"`, toJSONDot(f)) - return []byte(json), nil + return []byte(fmt.Sprintf("\"%s\"", f.String())), nil } -// UnmarshalYAML will attempt to unmarshal a field from YAML. -func (f *Field) UnmarshalYAML(unmarshal func(interface{}) error) error { - var value string - if err := unmarshal(&value); err != nil { - return fmt.Errorf("the field is not a string: %s", err) - } - - *f = fromJSONDot(value) - return nil -} - -// MarshalYAML will marshal the field for YAML. func (f Field) MarshalYAML() (interface{}, error) { - return toJSONDot(f), nil -} - -// fromJSONDot creates a field from JSON dot notation. -func fromJSONDot(value string) Field { - keys := strings.Split(value, ".") - - if keys[0] == "$" { - keys = keys[1:] - } - - return NewField(keys...) -} - -// toJSONDot returns the JSON dot notation for a field. -func toJSONDot(field Field) string { - if field.IsRoot() { - return "$" - } - - return strings.Join(field.Keys, ".") -} - -// NewField creates a new field from an ordered array of keys. -func NewField(keys ...string) Field { - return Field{ - Keys: keys, - } + return f.String(), nil } diff --git a/entry/field_test.go b/entry/field_test.go index 6d478bfc8..9b2bcc63c 100644 --- a/entry/field_test.go +++ b/entry/field_test.go @@ -4,262 +4,10 @@ import ( "encoding/json" "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" yaml "gopkg.in/yaml.v2" ) -func testRecord() map[string]interface{} { - return map[string]interface{}{ - "simple_key": "simple_value", - "map_key": nestedMap(), - } -} - -func nestedMap() map[string]interface{} { - return map[string]interface{}{ - "nested_key": "nested_value", - } -} - -func TestFieldGet(t *testing.T) { - cases := []struct { - name string - field Field - record interface{} - expectedVal interface{} - expectedOk bool - }{ - { - "EmptyField", - NewField(), - testRecord(), - testRecord(), - true, - }, - { - "SimpleField", - NewField("simple_key"), - testRecord(), - "simple_value", - true, - }, - { - "MapField", - NewField("map_key"), - testRecord(), - nestedMap(), - true, - }, - { - "NestedField", - NewField("map_key", "nested_key"), - testRecord(), - "nested_value", - true, - }, - { - "MissingField", - NewField("invalid"), - testRecord(), - nil, - false, - }, - { - "InvalidField", - NewField("simple_key", "nested_key"), - testRecord(), - nil, - false, - }, - { - "RawField", - NewField(), - "raw string", - "raw string", - true, - }, - } - - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - entry := New() - entry.Record = tc.record - - val, ok := entry.Get(tc.field) - if !assert.Equal(t, tc.expectedOk, ok) { - return - } - if !assert.Equal(t, tc.expectedVal, val) { - return - } - }) - } -} - -func TestFieldDelete(t *testing.T) { - cases := []struct { - name string - field Field - record interface{} - expectedRecord interface{} - expectedReturned interface{} - expectedOk bool - }{ - { - "SimpleKey", - NewField("simple_key"), - testRecord(), - map[string]interface{}{ - "map_key": nestedMap(), - }, - "simple_value", - true, - }, - { - "EmptyRecordAndField", - NewField(), - map[string]interface{}{}, - nil, - map[string]interface{}{}, - true, - }, - { - "EmptyField", - NewField(), - testRecord(), - nil, - testRecord(), - true, - }, - { - "MissingKey", - NewField("missing_key"), - testRecord(), - testRecord(), - nil, - false, - }, - { - "NestedKey", - NewField("map_key", "nested_key"), - testRecord(), - map[string]interface{}{ - "simple_key": "simple_value", - "map_key": map[string]interface{}{}, - }, - "nested_value", - true, - }, - { - "MapKey", - NewField("map_key"), - testRecord(), - map[string]interface{}{ - "simple_key": "simple_value", - }, - nestedMap(), - true, - }, - } - - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - entry := New() - entry.Record = tc.record - - deleted, ok := entry.Delete(tc.field) - assert.Equal(t, tc.expectedOk, ok) - assert.Equal(t, tc.expectedReturned, deleted) - assert.Equal(t, tc.expectedRecord, entry.Record) - }) - } -} - -func TestFieldSet(t *testing.T) { - cases := []struct { - name string - field Field - record interface{} - setTo interface{} - expectedVal interface{} - }{ - { - "OverwriteMap", - NewField(), - testRecord(), - "new_value", - "new_value", - }, - { - "OverwriteRaw", - NewField(), - "raw_value", - "new_value", - "new_value", - }, - { - "NewMapValue", - NewField(), - map[string]interface{}{}, - testRecord(), - testRecord(), - }, - { - "NewRootField", - NewField("new_key"), - map[string]interface{}{}, - "new_value", - map[string]interface{}{"new_key": "new_value"}, - }, - { - "NewNestedField", - NewField("new_key", "nested_key"), - map[string]interface{}{}, - "nested_value", - map[string]interface{}{ - "new_key": map[string]interface{}{ - "nested_key": "nested_value", - }, - }, - }, - { - "OverwriteNestedMap", - NewField("map_key"), - testRecord(), - "new_value", - map[string]interface{}{ - "simple_key": "simple_value", - "map_key": "new_value", - }, - }, - { - "MergedNestedValue", - NewField("map_key"), - testRecord(), - map[string]interface{}{ - "merged_key": "merged_value", - }, - map[string]interface{}{ - "simple_key": "simple_value", - "map_key": map[string]interface{}{ - "nested_key": "nested_value", - "merged_key": "merged_value", - }, - }, - }, - } - - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - entry := New() - entry.Record = tc.record - entry.Set(tc.field, tc.setTo) - assert.Equal(t, tc.expectedVal, entry.Record) - }) - } -} - func TestFieldUnmarshalJSON(t *testing.T) { cases := []struct { name string @@ -269,17 +17,17 @@ func TestFieldUnmarshalJSON(t *testing.T) { { "SimpleField", []byte(`"test1"`), - NewField("test1"), + NewRecordField("test1"), }, { "ComplexField", []byte(`"test1.test2"`), - NewField("test1", "test2"), + NewRecordField("test1", "test2"), }, { "RootField", []byte(`"$"`), - NewField([]string{}...), + NewRecordField([]string{}...), }, } @@ -302,12 +50,12 @@ func TestFieldMarshalJSON(t *testing.T) { }{ { "SimpleField", - NewField("test1"), + NewRecordField("test1"), []byte(`"test1"`), }, { "ComplexField", - NewField("test1", "test2"), + NewRecordField("test1", "test2"), []byte(`"test1.test2"`), }, } @@ -331,27 +79,27 @@ func TestFieldUnmarshalYAML(t *testing.T) { { "SimpleField", []byte(`"test1"`), - NewField("test1"), + NewRecordField("test1"), }, { "UnquotedField", []byte(`test1`), - NewField("test1"), + NewRecordField("test1"), }, { "RootField", []byte(`"$"`), - NewField([]string{}...), + NewRecordField([]string{}...), }, { "ComplexField", []byte(`"test1.test2"`), - NewField("test1", "test2"), + NewRecordField("test1", "test2"), }, { "ComplexFieldWithRoot", []byte(`"$.test1.test2"`), - NewField("test1", "test2"), + NewRecordField("test1", "test2"), }, } @@ -374,18 +122,18 @@ func TestFieldMarshalYAML(t *testing.T) { }{ { "SimpleField", - NewField("test1"), + NewRecordField("test1"), []byte("test1\n"), }, { "ComplexField", - NewField("test1", "test2"), + NewRecordField("test1", "test2"), []byte("test1.test2\n"), }, { "EmptyField", - NewField(), - []byte("$\n"), + NewRecordField(), + []byte("$record\n"), }, } @@ -398,20 +146,3 @@ func TestFieldMarshalYAML(t *testing.T) { }) } } - -func TestFieldParent(t *testing.T) { - t.Run("Simple", func(t *testing.T) { - field := Field{[]string{"child"}} - require.Equal(t, Field{[]string{}}, field.Parent()) - }) - - t.Run("Root", func(t *testing.T) { - field := Field{[]string{}} - require.Equal(t, Field{[]string{}}, field.Parent()) - }) -} - -func TestFieldChild(t *testing.T) { - field := Field{[]string{"parent"}} - require.Equal(t, Field{[]string{"parent", "child"}}, field.Child("child")) -} diff --git a/entry/label_field.go b/entry/label_field.go new file mode 100644 index 000000000..f44698f83 --- /dev/null +++ b/entry/label_field.go @@ -0,0 +1,46 @@ +package entry + +import "fmt" + +type LabelField struct { + key string +} + +func (l LabelField) Get(entry *Entry) (interface{}, bool) { + if entry.Labels == nil { + return "", false + } + val, ok := entry.Labels[l.key] + return val, ok +} + +func (l LabelField) Set(entry *Entry, val interface{}) error { + if entry.Labels == nil { + entry.Labels = make(map[string]string, 1) + } + + str, ok := val.(string) + if !ok { + return fmt.Errorf("cannot set a label to a non-string value") + } + entry.Labels[l.key] = str + return nil +} + +func (l LabelField) Delete(entry *Entry) (interface{}, bool) { + if entry.Labels == nil { + return "", false + } + + val, ok := entry.Labels[l.key] + delete(entry.Labels, l.key) + return val, ok +} + +func (l LabelField) String() string { + return "$labels." + l.key +} + +func NewLabelField(key string) Field { + return Field{LabelField{key}} +} diff --git a/entry/label_field_test.go b/entry/label_field_test.go new file mode 100644 index 000000000..3bd5d2dc0 --- /dev/null +++ b/entry/label_field_test.go @@ -0,0 +1,202 @@ +package entry + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestLabelFieldGet(t *testing.T) { + cases := []struct { + name string + labels map[string]string + field Field + expected interface{} + expectedOK bool + }{ + { + "Simple", + map[string]string{ + "test": "val", + }, + NewLabelField("test"), + "val", + true, + }, + { + "NonexistentKey", + map[string]string{ + "test": "val", + }, + NewLabelField("nonexistent"), + "", + false, + }, + { + "NilMap", + nil, + NewLabelField("nonexistent"), + "", + false, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + entry := New() + entry.Labels = tc.labels + val, ok := entry.Get(tc.field) + require.Equal(t, tc.expectedOK, ok) + require.Equal(t, tc.expected, val) + }) + } + +} + +func TestLabelFieldDelete(t *testing.T) { + cases := []struct { + name string + labels map[string]string + field Field + expected interface{} + expectedOK bool + expectedLabels map[string]string + }{ + { + "Simple", + map[string]string{ + "test": "val", + }, + NewLabelField("test"), + "val", + true, + map[string]string{}, + }, + { + "NonexistentKey", + map[string]string{ + "test": "val", + }, + NewLabelField("nonexistent"), + "", + false, + map[string]string{ + "test": "val", + }, + }, + { + "NilMap", + nil, + NewLabelField("nonexistent"), + "", + false, + nil, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + entry := New() + entry.Labels = tc.labels + val, ok := entry.Delete(tc.field) + require.Equal(t, tc.expectedOK, ok) + require.Equal(t, tc.expected, val) + }) + } + +} + +func TestLabelFieldSet(t *testing.T) { + cases := []struct { + name string + labels map[string]string + field Field + val interface{} + expected map[string]string + expectedErr bool + }{ + { + "Simple", + map[string]string{}, + NewLabelField("test"), + "val", + map[string]string{ + "test": "val", + }, + false, + }, + { + "Overwrite", + map[string]string{ + "test": "original", + }, + NewLabelField("test"), + "val", + map[string]string{ + "test": "val", + }, + false, + }, + { + "NilMap", + nil, + NewLabelField("test"), + "val", + map[string]string{ + "test": "val", + }, + false, + }, + { + "NonString", + map[string]string{}, + NewLabelField("test"), + 123, + map[string]string{ + "test": "val", + }, + true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + entry := New() + entry.Labels = tc.labels + err := entry.Set(tc.field, tc.val) + if tc.expectedErr { + require.Error(t, err) + return + } + + require.Equal(t, tc.expected, entry.Labels) + }) + } + +} + +func TestLabelFieldString(t *testing.T) { + cases := []struct { + name string + field LabelField + expected string + }{ + { + "Simple", + LabelField{"foo"}, + "$labels.foo", + }, + { + "Empty", + LabelField{""}, + "$labels.", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.expected, tc.field.String()) + }) + } + +} diff --git a/entry/record_field.go b/entry/record_field.go new file mode 100644 index 000000000..78a3a4bb1 --- /dev/null +++ b/entry/record_field.go @@ -0,0 +1,220 @@ +package entry + +import ( + "encoding/json" + "fmt" + "strings" +) + +type RecordField struct { + Keys []string +} + +// Parent returns the parent of the current field. +// In the case that the record field points to the root node, it is a no-op. +func (f RecordField) Parent() RecordField { + if f.isRoot() { + return f + } + + keys := f.Keys[:len(f.Keys)-1] + return RecordField{keys} +} + +// Child returns a child of the current field using the given key. +func (f RecordField) Child(key string) RecordField { + child := make([]string, len(f.Keys), len(f.Keys)+1) + copy(child, f.Keys) + keys := append(child, key) + return RecordField{keys} +} + +// IsRoot returns a boolean indicating if this is a root level field. +func (f RecordField) isRoot() bool { + return len(f.Keys) == 0 +} + +// String returns the string representation of this field. +func (f RecordField) String() string { + return toJSONDot(f) +} + +// Get will retrieve a value from an entry's record using the field. +// It will return the value and whether the field existed. +func (f RecordField) Get(entry *Entry) (interface{}, bool) { + var currentValue interface{} = entry.Record + + for _, key := range f.Keys { + currentRecord, ok := currentValue.(map[string]interface{}) + if !ok { + return nil, false + } + + currentValue, ok = currentRecord[key] + if !ok { + return nil, false + } + } + + return currentValue, true +} + +// Set will set a value on an entry's record using the field. +// If a key already exists, it will be overwritten. +// If mergeMaps is set to true, map values will be merged together. +func (f RecordField) Set(entry *Entry, value interface{}) error { + mapValue, isMapValue := value.(map[string]interface{}) + if isMapValue { + f.Merge(entry, mapValue) + return nil + } + + if f.isRoot() { + entry.Record = value + return nil + } + + currentMap, ok := entry.Record.(map[string]interface{}) + if !ok { + currentMap = map[string]interface{}{} + entry.Record = currentMap + } + + for i, key := range f.Keys { + if i == len(f.Keys)-1 { + currentMap[key] = value + return nil + } + currentMap = f.getNestedMap(currentMap, key) + } + return nil +} + +// Merge will attempt to merge the contents of a map into an entry's record. +// It will overwrite any intermediate values as necessary. +func (f RecordField) Merge(entry *Entry, mapValues map[string]interface{}) { + currentMap, ok := entry.Record.(map[string]interface{}) + if !ok { + currentMap = map[string]interface{}{} + entry.Record = currentMap + } + + for _, key := range f.Keys { + currentMap = f.getNestedMap(currentMap, key) + } + + for key, value := range mapValues { + currentMap[key] = value + } +} + +// Delete removes a value from an entry's record using the field. +// It will return the deleted value and whether the field existed. +func (f RecordField) Delete(entry *Entry) (interface{}, bool) { + if f.isRoot() { + oldRecord := entry.Record + entry.Record = nil + return oldRecord, true + } + + currentValue := entry.Record + for i, key := range f.Keys { + currentMap, ok := currentValue.(map[string]interface{}) + if !ok { + return nil, false + } + + currentValue, ok = currentMap[key] + if !ok { + return nil, false + } + + if i == len(f.Keys)-1 { + delete(currentMap, key) + return currentValue, true + } + } + + return nil, false +} + +// getNestedMap will get a nested map assigned to a key. +// If the map does not exist, it will create and return it. +func (f RecordField) getNestedMap(currentMap map[string]interface{}, key string) map[string]interface{} { + currentValue, ok := currentMap[key] + if !ok { + currentMap[key] = map[string]interface{}{} + } + + nextMap, ok := currentValue.(map[string]interface{}) + if !ok { + nextMap = map[string]interface{}{} + currentMap[key] = nextMap + } + + return nextMap +} + +/**************** + Serialization +****************/ + +// UnmarshalJSON will attempt to unmarshal the field from JSON. +func (f *RecordField) UnmarshalJSON(raw []byte) error { + var value string + if err := json.Unmarshal(raw, &value); err != nil { + return fmt.Errorf("the field is not a string: %s", err) + } + + *f = fromJSONDot(value) + return nil +} + +// MarshalJSON will marshal the field for JSON. +func (f RecordField) MarshalJSON() ([]byte, error) { + json := fmt.Sprintf(`"%s"`, toJSONDot(f)) + return []byte(json), nil +} + +// UnmarshalYAML will attempt to unmarshal a field from YAML. +func (f *RecordField) UnmarshalYAML(unmarshal func(interface{}) error) error { + var value string + if err := unmarshal(&value); err != nil { + return fmt.Errorf("the field is not a string: %s", err) + } + + *f = fromJSONDot(value) + return nil +} + +// MarshalYAML will marshal the field for YAML. +func (f RecordField) MarshalYAML() (interface{}, error) { + return toJSONDot(f), nil +} + +// fromJSONDot creates a field from JSON dot notation. +func fromJSONDot(value string) RecordField { + keys := strings.Split(value, ".") + + if keys[0] == "$" || keys[0] == "$record" { + keys = keys[1:] + } + + return RecordField{keys} +} + +// toJSONDot returns the JSON dot notation for a field. +func toJSONDot(field RecordField) string { + if field.isRoot() { + return "$record" + } + + return strings.Join(field.Keys, ".") +} + +// NewField creates a new field from an ordered array of keys. +func NewRecordField(keys ...string) Field { + return Field{RecordField{ + Keys: keys, + }} +} diff --git a/entry/record_field_test.go b/entry/record_field_test.go new file mode 100644 index 000000000..45b31ef7d --- /dev/null +++ b/entry/record_field_test.go @@ -0,0 +1,274 @@ +package entry + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func testRecord() map[string]interface{} { + return map[string]interface{}{ + "simple_key": "simple_value", + "map_key": nestedMap(), + } +} + +func nestedMap() map[string]interface{} { + return map[string]interface{}{ + "nested_key": "nested_value", + } +} + +func TestRecordFieldGet(t *testing.T) { + cases := []struct { + name string + field Field + record interface{} + expectedVal interface{} + expectedOk bool + }{ + { + "EmptyField", + NewRecordField(), + testRecord(), + testRecord(), + true, + }, + { + "SimpleField", + NewRecordField("simple_key"), + testRecord(), + "simple_value", + true, + }, + { + "MapField", + NewRecordField("map_key"), + testRecord(), + nestedMap(), + true, + }, + { + "NestedField", + NewRecordField("map_key", "nested_key"), + testRecord(), + "nested_value", + true, + }, + { + "MissingField", + NewRecordField("invalid"), + testRecord(), + nil, + false, + }, + { + "InvalidField", + NewRecordField("simple_key", "nested_key"), + testRecord(), + nil, + false, + }, + { + "RawField", + NewRecordField(), + "raw string", + "raw string", + true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + entry := New() + entry.Record = tc.record + + val, ok := entry.Get(tc.field) + if !assert.Equal(t, tc.expectedOk, ok) { + return + } + if !assert.Equal(t, tc.expectedVal, val) { + return + } + }) + } +} + +func TestRecordFieldDelete(t *testing.T) { + cases := []struct { + name string + field Field + record interface{} + expectedRecord interface{} + expectedReturned interface{} + expectedOk bool + }{ + { + "SimpleKey", + NewRecordField("simple_key"), + testRecord(), + map[string]interface{}{ + "map_key": nestedMap(), + }, + "simple_value", + true, + }, + { + "EmptyRecordAndField", + NewRecordField(), + map[string]interface{}{}, + nil, + map[string]interface{}{}, + true, + }, + { + "EmptyField", + NewRecordField(), + testRecord(), + nil, + testRecord(), + true, + }, + { + "MissingKey", + NewRecordField("missing_key"), + testRecord(), + testRecord(), + nil, + false, + }, + { + "NestedKey", + NewRecordField("map_key", "nested_key"), + testRecord(), + map[string]interface{}{ + "simple_key": "simple_value", + "map_key": map[string]interface{}{}, + }, + "nested_value", + true, + }, + { + "MapKey", + NewRecordField("map_key"), + testRecord(), + map[string]interface{}{ + "simple_key": "simple_value", + }, + nestedMap(), + true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + entry := New() + entry.Record = tc.record + + entry.Delete(tc.field) + assert.Equal(t, tc.expectedRecord, entry.Record) + }) + } +} + +func TestRecordFieldSet(t *testing.T) { + cases := []struct { + name string + field Field + record interface{} + setTo interface{} + expectedVal interface{} + }{ + { + "OverwriteMap", + NewRecordField(), + testRecord(), + "new_value", + "new_value", + }, + { + "OverwriteRaw", + NewRecordField(), + "raw_value", + "new_value", + "new_value", + }, + { + "NewMapValue", + NewRecordField(), + map[string]interface{}{}, + testRecord(), + testRecord(), + }, + { + "NewRootField", + NewRecordField("new_key"), + map[string]interface{}{}, + "new_value", + map[string]interface{}{"new_key": "new_value"}, + }, + { + "NewNestedField", + NewRecordField("new_key", "nested_key"), + map[string]interface{}{}, + "nested_value", + map[string]interface{}{ + "new_key": map[string]interface{}{ + "nested_key": "nested_value", + }, + }, + }, + { + "OverwriteNestedMap", + NewRecordField("map_key"), + testRecord(), + "new_value", + map[string]interface{}{ + "simple_key": "simple_value", + "map_key": "new_value", + }, + }, + { + "MergedNestedValue", + NewRecordField("map_key"), + testRecord(), + map[string]interface{}{ + "merged_key": "merged_value", + }, + map[string]interface{}{ + "simple_key": "simple_value", + "map_key": map[string]interface{}{ + "nested_key": "nested_value", + "merged_key": "merged_value", + }, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + entry := New() + entry.Record = tc.record + entry.Set(tc.field, tc.setTo) + assert.Equal(t, tc.expectedVal, entry.Record) + }) + } +} + +func TestRecordFieldParent(t *testing.T) { + t.Run("Simple", func(t *testing.T) { + field := RecordField{[]string{"child"}} + require.Equal(t, RecordField{[]string{}}, field.Parent()) + }) + + t.Run("Root", func(t *testing.T) { + field := RecordField{[]string{}} + require.Equal(t, RecordField{[]string{}}, field.Parent()) + }) +} + +func TestFieldChild(t *testing.T) { + field := RecordField{[]string{"parent"}} + require.Equal(t, RecordField{[]string{"parent", "child"}}, field.Child("child")) +} diff --git a/plugin/builtin/input/file/file_test.go b/plugin/builtin/input/file/file_test.go index ed5a20b3b..c10285fc2 100644 --- a/plugin/builtin/input/file/file_test.go +++ b/plugin/builtin/input/file/file_test.go @@ -40,6 +40,7 @@ func newTestFileSource(t *testing.T) (*FileInput, chan string) { WriterPlugin: helper.WriterPlugin{ OutputPlugins: []plugin.Plugin{mockOutput}, }, + WriteTo: entry.NewRecordField(), }, SplitFunc: bufio.ScanLines, PollInterval: 50 * time.Millisecond, @@ -58,7 +59,7 @@ func TestFileSource_Build(t *testing.T) { t.Parallel() mockOutput := testutil.NewMockPlugin("mock") - pathField := entry.NewField("testpath") + pathField := entry.NewRecordField("testpath") basicConfig := func() *FileInputConfig { return &FileInputConfig{ @@ -70,6 +71,7 @@ func TestFileSource_Build(t *testing.T) { WriterConfig: helper.WriterConfig{ OutputIDs: []string{"mock"}, }, + WriteTo: entry.NewRecordField(), }, Include: []string{"/var/log/testpath.*"}, Exclude: []string{"/var/log/testpath.ex*"}, diff --git a/plugin/builtin/input/generate_test.go b/plugin/builtin/input/generate_test.go index b81b225b4..0953180ae 100644 --- a/plugin/builtin/input/generate_test.go +++ b/plugin/builtin/input/generate_test.go @@ -22,9 +22,7 @@ func TestInputGenerate(t *testing.T) { PluginID: "test_plugin_id", PluginType: "generate_input", }, - WriteTo: entry.Field{ - Keys: []string{}, - }, + WriteTo: entry.NewRecordField(), WriterConfig: helper.WriterConfig{ OutputIDs: []string{"output1"}, }, diff --git a/plugin/builtin/input/tcp_test.go b/plugin/builtin/input/tcp_test.go index ba529b5b3..3473944db 100644 --- a/plugin/builtin/input/tcp_test.go +++ b/plugin/builtin/input/tcp_test.go @@ -21,9 +21,7 @@ func TestTCPInput(t *testing.T) { PluginID: "test_id", PluginType: "tcp_input", }, - WriteTo: entry.Field{ - Keys: []string{}, - }, + WriteTo: entry.NewRecordField(), WriterConfig: helper.WriterConfig{ OutputIDs: []string{"test_output_id"}, }, diff --git a/plugin/builtin/input/udp_test.go b/plugin/builtin/input/udp_test.go index dbe3af46b..1ab35aa60 100644 --- a/plugin/builtin/input/udp_test.go +++ b/plugin/builtin/input/udp_test.go @@ -21,9 +21,7 @@ func TestUDPInput(t *testing.T) { PluginID: "test_id", PluginType: "udp_input", }, - WriteTo: entry.Field{ - Keys: []string{}, - }, + WriteTo: entry.NewRecordField(), WriterConfig: helper.WriterConfig{ OutputIDs: []string{"test_output_id"}, }, diff --git a/plugin/builtin/output/elastic_test.go b/plugin/builtin/output/elastic_test.go index 810344769..a16a8133f 100644 --- a/plugin/builtin/output/elastic_test.go +++ b/plugin/builtin/output/elastic_test.go @@ -8,7 +8,7 @@ import ( ) func TestFindIndex(t *testing.T) { - indexField := entry.NewField("bar") + indexField := entry.NewRecordField("bar") output := &ElasticOutput{ indexField: &indexField, } @@ -45,7 +45,7 @@ func TestFindIndex(t *testing.T) { } func TestFindID(t *testing.T) { - idField := entry.NewField("foo") + idField := entry.NewRecordField("foo") output := &ElasticOutput{ idField: &idField, } diff --git a/plugin/builtin/output/google_cloud_test.go b/plugin/builtin/output/google_cloud_test.go index 9b26e657d..097d9b592 100644 --- a/plugin/builtin/output/google_cloud_test.go +++ b/plugin/builtin/output/google_cloud_test.go @@ -108,7 +108,7 @@ func TestGoogleCloudOutput(t *testing.T) { "LogNameField", func() *GoogleCloudOutputConfig { c := googleCloudBasicConfig() - f := entry.NewField("log_name") + f := entry.NewRecordField("log_name") c.LogNameField = &f return c }(), @@ -188,8 +188,8 @@ func TestGoogleCloudOutput(t *testing.T) { "TraceAndSpanFields", func() *GoogleCloudOutputConfig { c := googleCloudBasicConfig() - traceField := entry.NewField("trace") - spanIDField := entry.NewField("span_id") + traceField := entry.NewRecordField("trace") + spanIDField := entry.NewRecordField("span_id") c.TraceField = &traceField c.SpanIDField = &spanIDField return c diff --git a/plugin/builtin/parser/json_test.go b/plugin/builtin/parser/json_test.go index edb87c6dd..32d3d1cf2 100644 --- a/plugin/builtin/parser/json_test.go +++ b/plugin/builtin/parser/json_test.go @@ -31,8 +31,8 @@ func NewFakeJSONPlugin() (*JSONParser, *testutil.Plugin) { OutputPlugins: []plugin.Plugin{&mock}, }, }, - ParseFrom: entry.NewField("testfield"), - ParseTo: entry.NewField("testparsed"), + ParseFrom: entry.NewRecordField("testfield"), + ParseTo: entry.NewRecordField("testparsed"), }, json: jsoniter.ConfigFastest, }, &mock @@ -153,7 +153,7 @@ func TestJSONParserWithEmbeddedTimeParser(t *testing.T) { parser, mockOutput := NewFakeJSONPlugin() parser.ParserPlugin.TimeParser = &helper.TimeParser{ - ParseFrom: entry.NewField("testparsed", "timestamp"), + ParseFrom: entry.NewRecordField("testparsed", "timestamp"), LayoutType: "epoch", Layout: "s", Preserve: tc.preserve, diff --git a/plugin/builtin/parser/regex_test.go b/plugin/builtin/parser/regex_test.go index eb419e56f..40321dbdf 100644 --- a/plugin/builtin/parser/regex_test.go +++ b/plugin/builtin/parser/regex_test.go @@ -28,6 +28,8 @@ func newFakeRegexParser() (*RegexParser, *testutil.Plugin) { OutputPlugins: []plugin.Plugin{&mockPlugin}, }, }, + ParseFrom: entry.NewRecordField(), + ParseTo: entry.NewRecordField(), }, }, &mockPlugin } diff --git a/plugin/builtin/parser/severity_test.go b/plugin/builtin/parser/severity_test.go index f21176128..c8e85cb2d 100644 --- a/plugin/builtin/parser/severity_test.go +++ b/plugin/builtin/parser/severity_test.go @@ -283,8 +283,8 @@ func TestSeverityParser(t *testing.T) { }, } - rootField := entry.NewField() - someField := entry.NewField("some_field") + rootField := entry.NewRecordField() + someField := entry.NewRecordField("some_field") for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { diff --git a/plugin/builtin/parser/syslog.go b/plugin/builtin/parser/syslog.go index cac59c8ea..57251a99b 100644 --- a/plugin/builtin/parser/syslog.go +++ b/plugin/builtin/parser/syslog.go @@ -28,7 +28,7 @@ type SyslogParserConfig struct { func (c SyslogParserConfig) Build(context plugin.BuildContext) (plugin.Plugin, error) { if c.ParserConfig.TimeParser == nil { c.ParserConfig.TimeParser = &helper.TimeParser{ - ParseFrom: entry.NewField("timestamp"), + ParseFrom: entry.NewRecordField("timestamp"), LayoutType: "native", } } diff --git a/plugin/builtin/parser/syslog_test.go b/plugin/builtin/parser/syslog_test.go index 9c358c411..8900ca366 100644 --- a/plugin/builtin/parser/syslog_test.go +++ b/plugin/builtin/parser/syslog_test.go @@ -26,6 +26,8 @@ func TestSyslogParser(t *testing.T) { OutputIDs: []string{"output1"}, }, }, + ParseFrom: entry.NewRecordField(), + ParseTo: entry.NewRecordField(), }, } } diff --git a/plugin/builtin/parser/time_test.go b/plugin/builtin/parser/time_test.go index 7a190bc42..fbe8f57d8 100644 --- a/plugin/builtin/parser/time_test.go +++ b/plugin/builtin/parser/time_test.go @@ -119,8 +119,8 @@ func TestTimeParser(t *testing.T) { }, } - rootField := entry.NewField() - someField := entry.NewField("some_field") + rootField := entry.NewRecordField() + someField := entry.NewRecordField("some_field") for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -300,8 +300,8 @@ func TestTimeEpochs(t *testing.T) { }, } - rootField := entry.NewField() - someField := entry.NewField("some_field") + rootField := entry.NewRecordField() + someField := entry.NewRecordField("some_field") for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -363,8 +363,8 @@ func TestTimeErrors(t *testing.T) { }, } - rootField := entry.NewField() - someField := entry.NewField("some_field") + rootField := entry.NewRecordField() + someField := entry.NewRecordField("some_field") for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { diff --git a/plugin/builtin/transformer/k8s_metadata_decorator.go b/plugin/builtin/transformer/k8s_metadata_decorator.go index 35eb73d05..575b23dff 100644 --- a/plugin/builtin/transformer/k8s_metadata_decorator.go +++ b/plugin/builtin/transformer/k8s_metadata_decorator.go @@ -32,12 +32,12 @@ func (c K8sMetadataDecoratorConfig) Build(context plugin.BuildContext) (plugin.P } if c.PodNameField == nil { - field := entry.NewField("pod_name") + field := entry.NewRecordField("pod_name") c.PodNameField = &field } if c.NamespaceField == nil { - field := entry.NewField("namespace") + field := entry.NewRecordField("namespace") c.NamespaceField = &field } diff --git a/plugin/builtin/transformer/k8s_metadata_decorator_test.go b/plugin/builtin/transformer/k8s_metadata_decorator_test.go index 6f2afb355..7eb8e0779 100644 --- a/plugin/builtin/transformer/k8s_metadata_decorator_test.go +++ b/plugin/builtin/transformer/k8s_metadata_decorator_test.go @@ -58,8 +58,8 @@ func TestK8sMetadataDecoratorBuildDefault(t *testing.T) { }, OnError: "send", }, - podNameField: entry.NewField("pod_name"), - namespaceField: entry.NewField("namespace"), + podNameField: entry.NewRecordField("pod_name"), + namespaceField: entry.NewRecordField("namespace"), cache_ttl: 10 * time.Minute, } diff --git a/plugin/builtin/transformer/metadata.go b/plugin/builtin/transformer/metadata.go index 52e6d7793..c7799dd57 100644 --- a/plugin/builtin/transformer/metadata.go +++ b/plugin/builtin/transformer/metadata.go @@ -77,7 +77,11 @@ type labeler struct { func (l *labeler) Label(e *entry.Entry) error { env := map[string]interface{}{ - "$": e.Record, + "$": e.Record, + "$record": e.Record, + "$labels": e.Labels, + "$timestamp": e.Timestamp, + "$tags": e.Tags, } for k, v := range l.labels { @@ -112,7 +116,11 @@ type tagger struct { func (t *tagger) Tag(e *entry.Entry) error { env := map[string]interface{}{ - "$": e.Record, + "$": e.Record, + "$record": e.Record, + "$labels": e.Labels, + "$timestamp": e.Timestamp, + "$tags": e.Tags, } for _, v := range t.tags { diff --git a/plugin/builtin/transformer/restructure.go b/plugin/builtin/transformer/restructure.go index 47c94fe69..bdaf2f608 100644 --- a/plugin/builtin/transformer/restructure.go +++ b/plugin/builtin/transformer/restructure.go @@ -8,6 +8,7 @@ import ( "github.com/antonmedv/expr" "github.com/antonmedv/expr/vm" "github.com/observiq/carbon/entry" + "github.com/observiq/carbon/errors" "github.com/observiq/carbon/plugin" "github.com/observiq/carbon/plugin/helper" ) @@ -199,16 +200,26 @@ type OpAdd struct { func (op *OpAdd) Apply(e *entry.Entry) error { switch { case op.Value != nil: - e.Set(op.Field, op.Value) + err := e.Set(op.Field, op.Value) + if err != nil { + return err + } case op.program != nil: env := map[string]interface{}{ - "$": e.Record, + "$": e.Record, + "$record": e.Record, + "$labels": e.Labels, + "$tags": e.Tags, + "$timestamp": e.Timestamp, } result, err := vm.Run(op.program, env) if err != nil { return fmt.Errorf("evaluate value_expr: %s", err) } - e.Set(op.Field, result) + err = e.Set(op.Field, result) + if err != nil { + return err + } default: // Should never reach here if we went through the unmarshalling code return fmt.Errorf("neither value or value_expr are are set") @@ -322,7 +333,10 @@ func (op *OpRetain) Apply(e *entry.Entry) error { if !ok { continue } - newEntry.Set(field, val) + err := newEntry.Set(field, val) + if err != nil { + return err + } } *e = *newEntry return nil @@ -363,8 +377,7 @@ func (op *OpMove) Apply(e *entry.Entry) error { return fmt.Errorf("apply move: field %s does not exist on record", op.From) } - e.Set(op.To, val) - return nil + return e.Set(op.To, val) } func (op *OpMove) Type() string { @@ -376,27 +389,32 @@ func (op *OpMove) Type() string { **********/ type OpFlatten struct { - Field entry.Field + Field entry.RecordField } func (op *OpFlatten) Apply(e *entry.Entry) error { - fs := entry.Field(op.Field) - parent := fs.Parent() - val, ok := e.Delete(fs) + parent := op.Field.Parent() + val, ok := e.Delete(op.Field) if !ok { // The field doesn't exist, so ignore it - return fmt.Errorf("apply flatten: field %s does not exist on record", fs) + return fmt.Errorf("apply flatten: field %s does not exist on record", op.Field) } valMap, ok := val.(map[string]interface{}) if !ok { // The field we were asked to flatten was not a map, so put it back - e.Set(fs, val) - return fmt.Errorf("apply flatten: field %s is not a map", fs) + err := e.Set(op.Field, val) + if err != nil { + return errors.Wrap(err, "reset non-map field") + } + return fmt.Errorf("apply flatten: field %s is not a map", op.Field) } for k, v := range valMap { - e.Set(parent.Child(k), v) + err := e.Set(parent.Child(k), v) + if err != nil { + return err + } } return nil } diff --git a/plugin/builtin/transformer/restructure_test.go b/plugin/builtin/transformer/restructure_test.go index 1781c1e3c..5606ba9ad 100644 --- a/plugin/builtin/transformer/restructure_test.go +++ b/plugin/builtin/transformer/restructure_test.go @@ -64,7 +64,7 @@ func TestRestructurePlugin(t *testing.T) { ops: []Op{ { &OpAdd{ - Field: entry.NewField("new"), + Field: entry.NewRecordField("new"), Value: "message", }, }, @@ -81,7 +81,7 @@ func TestRestructurePlugin(t *testing.T) { ops: []Op{ { &OpAdd{ - Field: entry.NewField("new"), + Field: entry.NewRecordField("new"), program: func() *vm.Program { vm, err := expr.Compile(`$.key + "_suffix"`) require.NoError(t, err) @@ -101,7 +101,7 @@ func TestRestructurePlugin(t *testing.T) { name: "Remove", ops: []Op{ { - &OpRemove{entry.NewField("nested")}, + &OpRemove{entry.NewRecordField("nested")}, }, }, input: newTestEntry(), @@ -117,7 +117,7 @@ func TestRestructurePlugin(t *testing.T) { name: "Retain", ops: []Op{ { - &OpRetain{[]entry.Field{entry.NewField("key")}}, + &OpRetain{[]entry.Field{entry.NewRecordField("key")}}, }, }, input: newTestEntry(), @@ -134,8 +134,8 @@ func TestRestructurePlugin(t *testing.T) { ops: []Op{ { &OpMove{ - From: entry.NewField("key"), - To: entry.NewField("newkey"), + From: entry.NewRecordField("key"), + To: entry.NewRecordField("newkey"), }, }, }, @@ -156,7 +156,9 @@ func TestRestructurePlugin(t *testing.T) { ops: []Op{ { &OpFlatten{ - Field: entry.NewField("nested"), + Field: entry.RecordField{ + Keys: []string{"nested"}, + }, }, }, }, @@ -196,14 +198,14 @@ func TestRestructureSerializeRoundtrip(t *testing.T) { { name: "AddValue", op: Op{&OpAdd{ - Field: entry.NewField("new"), + Field: entry.NewRecordField("new"), Value: "message", }}, }, { name: "AddValueExpr", op: Op{&OpAdd{ - Field: entry.NewField("new"), + Field: entry.NewRecordField("new"), ValueExpr: func() *string { s := `$.key + "_suffix"` return &s @@ -217,23 +219,25 @@ func TestRestructureSerializeRoundtrip(t *testing.T) { }, { name: "Remove", - op: Op{&OpRemove{entry.NewField("nested")}}, + op: Op{&OpRemove{entry.NewRecordField("nested")}}, }, { name: "Retain", - op: Op{&OpRetain{[]entry.Field{entry.NewField("key")}}}, + op: Op{&OpRetain{[]entry.Field{entry.NewRecordField("key")}}}, }, { name: "Move", op: Op{&OpMove{ - From: entry.NewField("key"), - To: entry.NewField("newkey"), + From: entry.NewRecordField("key"), + To: entry.NewRecordField("newkey"), }}, }, { name: "Flatten", op: Op{&OpFlatten{ - Field: entry.NewField("nested"), + Field: entry.RecordField{ + Keys: []string{"nested"}, + }, }}, }, } @@ -326,11 +330,11 @@ ops: }, Ops: []Op{ Op{&OpAdd{ - Field: entry.NewField("message"), + Field: entry.NewRecordField("message"), Value: "val", }}, Op{&OpAdd{ - Field: entry.NewField("message_suffix"), + Field: entry.NewRecordField("message_suffix"), ValueExpr: func() *string { s := `$.message + "_suffix"` return &s @@ -342,19 +346,21 @@ ops: }(), }}, Op{&OpRemove{ - Field: entry.NewField("message"), + Field: entry.NewRecordField("message"), }}, Op{&OpRetain{ Fields: []entry.Field{ - entry.NewField("message_retain"), + entry.NewRecordField("message_retain"), }, }}, Op{&OpFlatten{ - Field: entry.NewField("message_flatten"), + Field: entry.RecordField{ + Keys: []string{"message_flatten"}, + }, }}, Op{&OpMove{ - From: entry.NewField("message1"), - To: entry.NewField("message2"), + From: entry.NewRecordField("message1"), + To: entry.NewRecordField("message2"), }}, }, }, diff --git a/plugin/builtin/transformer/router.go b/plugin/builtin/transformer/router.go index ac0e6fd15..bc4ce22f4 100644 --- a/plugin/builtin/transformer/router.go +++ b/plugin/builtin/transformer/router.go @@ -81,7 +81,11 @@ func (p *RouterPlugin) CanProcess() bool { func (p *RouterPlugin) Process(ctx context.Context, entry *entry.Entry) error { env := map[string]interface{}{ - "$": entry.Record, + "$": entry.Record, + "$record": entry.Record, + "$labels": entry.Labels, + "$timestamp": entry.Timestamp, + "$tags": entry.Tags, } for _, route := range p.routes { diff --git a/plugin/helper/input.go b/plugin/helper/input.go index 41f594f1d..650878dc1 100644 --- a/plugin/helper/input.go +++ b/plugin/helper/input.go @@ -28,6 +28,10 @@ func (c InputConfig) Build(context plugin.BuildContext) (InputPlugin, error) { return InputPlugin{}, errors.WithDetails(err, "plugin_id", c.PluginID) } + if c.WriteTo.FieldInterface == nil { + c.WriteTo.FieldInterface = entry.NewRecordField() + } + inputPlugin := InputPlugin{ BasicPlugin: basicPlugin, WriterPlugin: writerPlugin, diff --git a/plugin/helper/input_test.go b/plugin/helper/input_test.go index 9c7e3d863..871311b50 100644 --- a/plugin/helper/input_test.go +++ b/plugin/helper/input_test.go @@ -98,7 +98,7 @@ func TestInputPluginProcess(t *testing.T) { func TestInputPluginNewEntry(t *testing.T) { buildContext := testutil.NewBuildContext(t) - writeTo := entry.NewField("test-field") + writeTo := entry.NewRecordField("test-field") input := InputPlugin{ BasicPlugin: BasicPlugin{ PluginID: "test-id", diff --git a/plugin/helper/parser.go b/plugin/helper/parser.go index c6bf6ed33..2f164ea69 100644 --- a/plugin/helper/parser.go +++ b/plugin/helper/parser.go @@ -26,6 +26,14 @@ func (c ParserConfig) Build(context plugin.BuildContext) (ParserPlugin, error) { return ParserPlugin{}, err } + if c.ParseFrom.FieldInterface == nil { + c.ParseFrom.FieldInterface = entry.NewRecordField() + } + + if c.ParseTo.FieldInterface == nil { + c.ParseTo.FieldInterface = entry.NewRecordField() + } + parserPlugin := ParserPlugin{ TransformerPlugin: transformerPlugin, ParseFrom: c.ParseFrom, diff --git a/plugin/helper/parser_test.go b/plugin/helper/parser_test.go index 0787d932d..ca5063c81 100644 --- a/plugin/helper/parser_test.go +++ b/plugin/helper/parser_test.go @@ -75,7 +75,7 @@ func TestParserMissingField(t *testing.T) { }, OnError: DropOnError, }, - ParseFrom: entry.NewField("test"), + ParseFrom: entry.NewRecordField("test"), } parse := func(i interface{}) (interface{}, error) { return i, nil @@ -98,6 +98,7 @@ func TestParserInvalidParse(t *testing.T) { }, OnError: DropOnError, }, + ParseFrom: entry.NewRecordField(), } parse := func(i interface{}) (interface{}, error) { return i, fmt.Errorf("parse failure") @@ -120,8 +121,10 @@ func TestParserInvalidTimeParse(t *testing.T) { }, OnError: DropOnError, }, + ParseFrom: entry.NewRecordField(), + ParseTo: entry.NewRecordField(), TimeParser: &TimeParser{ - ParseFrom: entry.NewField("missing-key"), + ParseFrom: entry.NewRecordField("missing-key"), }, } parse := func(i interface{}) (interface{}, error) { @@ -146,8 +149,10 @@ func TestParserInvalidSeverityParse(t *testing.T) { OnError: DropOnError, }, SeverityParser: &SeverityParser{ - ParseFrom: entry.NewField("missing-key"), + ParseFrom: entry.NewRecordField("missing-key"), }, + ParseFrom: entry.NewRecordField(), + ParseTo: entry.NewRecordField(), } parse := func(i interface{}) (interface{}, error) { return i, nil @@ -171,21 +176,23 @@ func TestParserInvalidTimeValidSeverityParse(t *testing.T) { OnError: DropOnError, }, TimeParser: &TimeParser{ - ParseFrom: entry.NewField("missing-key"), + ParseFrom: entry.NewRecordField("missing-key"), }, SeverityParser: &SeverityParser{ - ParseFrom: entry.NewField("severity"), + ParseFrom: entry.NewRecordField("severity"), Mapping: map[string]entry.Severity{ "info": entry.Info, }, }, + ParseFrom: entry.NewRecordField(), + ParseTo: entry.NewRecordField(), } parse := func(i interface{}) (interface{}, error) { return i, nil } ctx := context.Background() testEntry := entry.New() - testEntry.Set(entry.NewField("severity"), "info") + testEntry.Set(entry.NewRecordField("severity"), "info") err := parser.ProcessWith(ctx, testEntry, parse) require.Error(t, err) @@ -207,20 +214,22 @@ func TestParserValidTimeInvalidSeverityParse(t *testing.T) { OnError: DropOnError, }, TimeParser: &TimeParser{ - ParseFrom: entry.NewField("timestamp"), + ParseFrom: entry.NewRecordField("timestamp"), LayoutType: "gotime", Layout: time.Kitchen, }, SeverityParser: &SeverityParser{ - ParseFrom: entry.NewField("missing-key"), + ParseFrom: entry.NewRecordField("missing-key"), }, + ParseFrom: entry.NewRecordField(), + ParseTo: entry.NewRecordField(), } parse := func(i interface{}) (interface{}, error) { return i, nil } ctx := context.Background() testEntry := entry.New() - testEntry.Set(entry.NewField("timestamp"), "12:34PM") + testEntry.Set(entry.NewRecordField("timestamp"), "12:34PM") err := parser.ProcessWith(ctx, testEntry, parse) require.Error(t, err) @@ -248,6 +257,8 @@ func TestParserOutput(t *testing.T) { OutputPlugins: []plugin.Plugin{output}, }, }, + ParseFrom: entry.NewRecordField(), + ParseTo: entry.NewRecordField(), } parse := func(i interface{}) (interface{}, error) { return i, nil @@ -276,8 +287,8 @@ func TestParserWithPreserve(t *testing.T) { OutputPlugins: []plugin.Plugin{output}, }, }, - ParseFrom: entry.NewField("parse_from"), - ParseTo: entry.NewField("parse_to"), + ParseFrom: entry.NewRecordField("parse_from"), + ParseTo: entry.NewRecordField("parse_to"), Preserve: true, } parse := func(i interface{}) (interface{}, error) { @@ -315,8 +326,8 @@ func TestParserWithoutPreserve(t *testing.T) { OutputPlugins: []plugin.Plugin{output}, }, }, - ParseFrom: entry.NewField("parse_from"), - ParseTo: entry.NewField("parse_to"), + ParseFrom: entry.NewRecordField("parse_from"), + ParseTo: entry.NewRecordField("parse_to"), Preserve: false, } parse := func(i interface{}) (interface{}, error) { From b723fa505cb6fd4a897a4547e88f9738479fd5fd Mon Sep 17 00:00:00 2001 From: Camden Cheek Date: Wed, 8 Jul 2020 14:32:38 -0400 Subject: [PATCH 2/2] Add env() support to expressions To provide support for using environment variables for labeling, tagging, and routing entries, this commit adds the `env()` function to expressions. An example expression: `env("FOO")`. In addition, this commit fixes an issue with the algorithm for parsing expressions from `ExprString`s where there are paranthesis nested inside the expression. This issue was made clear by trying to use the `env()` function inside an `ExprString`. This commit also adds the `GetExprEnv` and `PutExprEnv` functions which provide a way to re-use allocated environment maps. This keeps us from having to allocate a map for every entry that is rendered as an expression, which should help improve performance of plugins that make heavy use of expressions. --- .github/PULL_REQUEST_TEMPLATE.md | 1 + CHANGELOG.md | 8 +- docs/types/expression.md | 19 +++- plugin/builtin/transformer/metadata.go | 18 +--- plugin/builtin/transformer/metadata_test.go | 36 +++++++ plugin/builtin/transformer/restructure.go | 12 +-- .../builtin/transformer/restructure_test.go | 25 +++++ plugin/builtin/transformer/router.go | 9 +- plugin/builtin/transformer/router_test.go | 23 +++++ plugin/helper/expr_string.go | 95 ++++++++++++++----- plugin/helper/expr_string_test.go | 14 ++- 11 files changed, 202 insertions(+), 58 deletions(-) diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index 2e52b0927..b29aa2afe 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -3,4 +3,5 @@ ## **Please check that the PR fulfills these requirements** - [ ] Tests for the changes have been added (for bug fixes / features) - [ ] Docs have been added / updated (for bug fixes / features) +- [ ] Add a changelog entry (for non-trivial bug fixes / features) - [ ] CI passes diff --git a/CHANGELOG.md b/CHANGELOG.md index 933d0b992..ee86a79b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] +### Added +- Support for reading from environment in expressions +- Ability to point to labels with fields + + ## [0.9.0] - 2020-07-07 -Initial Open Source Release \ No newline at end of file +Initial Open Source Release diff --git a/docs/types/expression.md b/docs/types/expression.md index 92fa0926f..36bb5095f 100644 --- a/docs/types/expression.md +++ b/docs/types/expression.md @@ -6,10 +6,25 @@ being processed. For reference documentation of the expression language, see [here](https://github.com/antonmedv/expr/blob/master/docs/Language-Definition.md). -In most cases, the record of the entry being processed can be accessed with the `$` variable in the expression. See the examples below for syntax. +Available to the expressions are a few special variables: +- `$record` contains the entry's record +- `$labels` contains the entry's labels +- `$tags` contains the entry's tags +- `$timestamp` contains the entry's timestamp +- `env()` is a function that allows you to read environment variables ## Examples +### Add a label from an environment variable + +```yaml +- id: add_stack_label + type: metadata + output: my_receiver + labels: + stack: 'EXPR(env("STACK"))' +``` + ### Map severity values to standard values ```yaml @@ -19,5 +34,5 @@ In most cases, the record of the entry being processed can be accessed with the ops: - add: field: severity - value_expr: '$.raw_severity in ["critical", "super_critical"] ? "error" : $.raw_severity' + value_expr: '$record.raw_severity in ["critical", "super_critical"] ? "error" : $record.raw_severity' ``` diff --git a/plugin/builtin/transformer/metadata.go b/plugin/builtin/transformer/metadata.go index c7799dd57..4d0a03bcb 100644 --- a/plugin/builtin/transformer/metadata.go +++ b/plugin/builtin/transformer/metadata.go @@ -76,13 +76,8 @@ type labeler struct { } func (l *labeler) Label(e *entry.Entry) error { - env := map[string]interface{}{ - "$": e.Record, - "$record": e.Record, - "$labels": e.Labels, - "$timestamp": e.Timestamp, - "$tags": e.Tags, - } + env := helper.GetExprEnv(e) + defer helper.PutExprEnv(env) for k, v := range l.labels { rendered, err := v.Render(env) @@ -115,13 +110,8 @@ type tagger struct { } func (t *tagger) Tag(e *entry.Entry) error { - env := map[string]interface{}{ - "$": e.Record, - "$record": e.Record, - "$labels": e.Labels, - "$timestamp": e.Timestamp, - "$tags": e.Tags, - } + env := helper.GetExprEnv(e) + defer helper.PutExprEnv(env) for _, v := range t.tags { rendered, err := v.Render(env) diff --git a/plugin/builtin/transformer/metadata_test.go b/plugin/builtin/transformer/metadata_test.go index deced3da1..4a0f09f5f 100644 --- a/plugin/builtin/transformer/metadata_test.go +++ b/plugin/builtin/transformer/metadata_test.go @@ -2,6 +2,7 @@ package transformer import ( "context" + "os" "testing" "time" @@ -14,6 +15,9 @@ import ( ) func TestMetadata(t *testing.T) { + os.Setenv("TEST_METADATA_PLUGIN_ENV", "foo") + defer os.Unsetenv("TEST_METADATA_PLUGIN_ENV") + baseConfig := func() *MetadataPluginConfig { return &MetadataPluginConfig{ TransformerConfig: helper.TransformerConfig{ @@ -98,6 +102,38 @@ func TestMetadata(t *testing.T) { return e }(), }, + { + "AddLabelEnv", + func() *MetadataPluginConfig { + cfg := baseConfig() + cfg.Labels = map[string]helper.ExprStringConfig{ + "label1": `EXPR(env("TEST_METADATA_PLUGIN_ENV"))`, + } + return cfg + }(), + entry.New(), + func() *entry.Entry { + e := entry.New() + e.Labels = map[string]string{ + "label1": "foo", + } + return e + }(), + }, + { + "AddTagEnv", + func() *MetadataPluginConfig { + cfg := baseConfig() + cfg.Tags = []helper.ExprStringConfig{`EXPR(env("TEST_METADATA_PLUGIN_ENV"))`} + return cfg + }(), + entry.New(), + func() *entry.Entry { + e := entry.New() + e.Tags = []string{"foo"} + return e + }(), + }, } for _, tc := range cases { diff --git a/plugin/builtin/transformer/restructure.go b/plugin/builtin/transformer/restructure.go index bdaf2f608..196e5a74a 100644 --- a/plugin/builtin/transformer/restructure.go +++ b/plugin/builtin/transformer/restructure.go @@ -188,7 +188,7 @@ func (o Op) MarshalYAML() (interface{}, error) { /****** Add -******/ +*******/ type OpAdd struct { Field entry.Field `json:"field" yaml:"field"` @@ -205,13 +205,9 @@ func (op *OpAdd) Apply(e *entry.Entry) error { return err } case op.program != nil: - env := map[string]interface{}{ - "$": e.Record, - "$record": e.Record, - "$labels": e.Labels, - "$tags": e.Tags, - "$timestamp": e.Timestamp, - } + env := helper.GetExprEnv(e) + defer helper.PutExprEnv(env) + result, err := vm.Run(op.program, env) if err != nil { return fmt.Errorf("evaluate value_expr: %s", err) diff --git a/plugin/builtin/transformer/restructure_test.go b/plugin/builtin/transformer/restructure_test.go index 5606ba9ad..fcf50ea36 100644 --- a/plugin/builtin/transformer/restructure_test.go +++ b/plugin/builtin/transformer/restructure_test.go @@ -3,6 +3,7 @@ package transformer import ( "context" "encoding/json" + "os" "testing" "time" @@ -36,6 +37,9 @@ func NewFakeRestructurePlugin() (*RestructurePlugin, *testutil.Plugin) { } func TestRestructurePlugin(t *testing.T) { + os.Setenv("TEST_RESTRUCTURE_PLUGIN_ENV", "foo") + defer os.Unsetenv("TEST_RESTRUCTURE_PLUGIN_ENV") + newTestEntry := func() *entry.Entry { e := entry.New() e.Timestamp = time.Unix(1586632809, 0) @@ -97,6 +101,27 @@ func TestRestructurePlugin(t *testing.T) { return e }(), }, + { + name: "AddValueExprEnv", + ops: []Op{ + { + &OpAdd{ + Field: entry.NewRecordField("new"), + program: func() *vm.Program { + vm, err := expr.Compile(`env("TEST_RESTRUCTURE_PLUGIN_ENV")`) + require.NoError(t, err) + return vm + }(), + }, + }, + }, + input: newTestEntry(), + output: func() *entry.Entry { + e := newTestEntry() + e.Record.(map[string]interface{})["new"] = "foo" + return e + }(), + }, { name: "Remove", ops: []Op{ diff --git a/plugin/builtin/transformer/router.go b/plugin/builtin/transformer/router.go index bc4ce22f4..b0a0c0fa7 100644 --- a/plugin/builtin/transformer/router.go +++ b/plugin/builtin/transformer/router.go @@ -80,13 +80,8 @@ func (p *RouterPlugin) CanProcess() bool { } func (p *RouterPlugin) Process(ctx context.Context, entry *entry.Entry) error { - env := map[string]interface{}{ - "$": entry.Record, - "$record": entry.Record, - "$labels": entry.Labels, - "$timestamp": entry.Timestamp, - "$tags": entry.Tags, - } + env := helper.GetExprEnv(entry) + defer helper.PutExprEnv(env) for _, route := range p.routes { matches, err := vm.Run(route.Expression, env) diff --git a/plugin/builtin/transformer/router_test.go b/plugin/builtin/transformer/router_test.go index 35c171e5f..d8c415499 100644 --- a/plugin/builtin/transformer/router_test.go +++ b/plugin/builtin/transformer/router_test.go @@ -2,6 +2,7 @@ package transformer import ( "context" + "os" "testing" "github.com/observiq/carbon/entry" @@ -13,6 +14,9 @@ import ( ) func TestRouterPlugin(t *testing.T) { + os.Setenv("TEST_ROUTER_PLUGIN_ENV", "foo") + defer os.Unsetenv("TEST_ROUTER_PLUGIN_ENV") + basicConfig := func() *RouterPluginConfig { return &RouterPluginConfig{ BasicConfig: helper.BasicConfig{ @@ -69,6 +73,25 @@ func TestRouterPlugin(t *testing.T) { }, map[string]int{"output2": 1}, }, + { + "MatchEnv", + &entry.Entry{ + Record: map[string]interface{}{ + "message": "test_message", + }, + }, + []*RouterPluginRouteConfig{ + { + `env("TEST_ROUTER_PLUGIN_ENV") == "foo"`, + []string{"output1"}, + }, + { + `true`, + []string{"output2"}, + }, + }, + map[string]int{"output1": 1}, + }, } for _, tc := range cases { diff --git a/plugin/helper/expr_string.go b/plugin/helper/expr_string.go index 7edd9e02e..c09ce8acd 100644 --- a/plugin/helper/expr_string.go +++ b/plugin/helper/expr_string.go @@ -2,47 +2,73 @@ package helper import ( "fmt" + "os" "strings" + "sync" "github.com/antonmedv/expr" "github.com/antonmedv/expr/vm" + "github.com/observiq/carbon/entry" "github.com/observiq/carbon/errors" ) type ExprStringConfig string const ( - exprStartTag = "EXPR(" - exprEndTag = ")" + exprStartToken = "EXPR(" + exprEndToken = ")" ) func (e ExprStringConfig) Build() (*ExprString, error) { s := string(e) - begin := 0 + rangeStart := 0 - subStrings := make([]string, 0) - subExprStrings := make([]string, 0) + subStrings := make([]string, 0, 4) + subExprStrings := make([]string, 0, 4) -LOOP: for { - indexStart := strings.Index(s[begin:], exprStartTag) - indexEnd := strings.Index(s[begin:], exprEndTag) - switch { - case indexStart == -1 || indexEnd == -1: - fallthrough - case indexStart > indexEnd: - // if we don't have a "{{" followed by a "}}" in the remainder - // of the string, treat the remainder as a string literal - subStrings = append(subStrings, s[begin:]) - break LOOP - default: - // make indexes relative to whole string again - indexStart += begin - indexEnd += begin + rangeEnd := len(s) + + // Find the first instance of the start token + indexStart := strings.Index(s[rangeStart:rangeEnd], exprStartToken) + if indexStart == -1 { + // Start token does not exist in the remainder of the string, + // so treat the rest as a string literal + subStrings = append(subStrings, s[rangeStart:]) + break + } else { + indexStart = rangeStart + indexStart + } + + // Restrict our end token search range to the next instance of the start token + nextIndexStart := strings.Index(s[indexStart+len(exprStartToken):], exprStartToken) + if nextIndexStart == -1 { + rangeEnd = len(s) + } else { + rangeEnd = indexStart + len(exprStartToken) + nextIndexStart + } + + // Greedily search for the last end token in the search range + indexEnd := strings.LastIndex(s[indexStart:rangeEnd], exprEndToken) + if indexEnd == -1 { + // End token does not exist before the next start token + // or end of expression string, so treat the remainder of the string + // as a string literal + subStrings = append(subStrings, s[rangeStart:]) + break + } else { + indexEnd = indexStart + indexEnd + } + + // Unscope the indexes and add the partitioned strings + subStrings = append(subStrings, s[rangeStart:indexStart]) + subExprStrings = append(subExprStrings, s[indexStart+len(exprStartToken):indexEnd]) + + // Reset the starting range and finish if it reaches the end of the string + rangeStart = indexEnd + len(exprEndToken) + if rangeStart > len(s) { + break } - subStrings = append(subStrings, s[begin:indexStart]) - subExprStrings = append(subExprStrings, s[indexStart+len(exprStartTag):indexEnd]) - begin = indexEnd + len(exprEndTag) } subExprs := make([]*vm.Program, 0, len(subExprStrings)) @@ -85,3 +111,26 @@ func (e *ExprString) Render(env map[string]interface{}) (string, error) { return b.String(), nil } + +var envPool = sync.Pool{ + New: func() interface{} { + return map[string]interface{}{ + "env": os.Getenv, + } + }, +} + +func GetExprEnv(e *entry.Entry) map[string]interface{} { + env := envPool.Get().(map[string]interface{}) + env["$"] = e.Record + env["$record"] = e.Record + env["$labels"] = e.Labels + env["$timestamp"] = e.Timestamp + env["$tags"] = e.Tags + + return env +} + +func PutExprEnv(e map[string]interface{}) { + envPool.Put(e) +} diff --git a/plugin/helper/expr_string_test.go b/plugin/helper/expr_string_test.go index 3f2ddafc3..a04755124 100644 --- a/plugin/helper/expr_string_test.go +++ b/plugin/helper/expr_string_test.go @@ -1,6 +1,7 @@ package helper import ( + "os" "strconv" "testing" @@ -9,6 +10,9 @@ import ( ) func TestExprString(t *testing.T) { + os.Setenv("TEST_EXPR_STRING_ENV", "foo") + defer os.Unsetenv("TEST_EXPR_STRING_ENV") + exampleEntry := func() *entry.Entry { e := entry.New() e.Record = map[string]interface{}{ @@ -69,6 +73,10 @@ func TestExprString(t *testing.T) { "my EXPR($.test)", "my value", }, + { + "my EXPR(env('TEST_EXPR_STRING_ENV'))", + "my foo", + }, } for i, tc := range cases { @@ -76,9 +84,9 @@ func TestExprString(t *testing.T) { exprString, err := tc.config.Build() require.NoError(t, err) - env := map[string]interface{}{ - "$": exampleEntry().Record, - } + env := GetExprEnv(exampleEntry()) + defer PutExprEnv(env) + result, err := exprString.Render(env) require.NoError(t, err) require.Equal(t, tc.expected, result)