diff --git a/CHANGELOG.md b/CHANGELOG.md index 98d19382f..e9c63f559 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased +### Added +- Support for accessing the resource with fields +- Support for using fields to select keys that contain dots like `$record['field.with.dots']` +- `google_cloud_output` will use resource create a monitored resource for supported resource types (currently only k8s resources) +### Changed +- The operators `host_metadata`, `k8s_event_input`, and `k8s_metadata_decorator` will now use the top-level resource field + ## [0.9.12] - 2020-08-25 ### Changed - Agent is now embeddable with a default output diff --git a/docs/types/field.md b/docs/types/field.md index 57756fe76..d04f2b600 100644 --- a/docs/types/field.md +++ b/docs/types/field.md @@ -3,7 +3,9 @@ _Fields_ are the primary way to tell stanza which values of an entry to use in its operators. Most often, these will be things like fields to parse for a parser operator, or the field to write a new value to. -Fields are `.`-delimited strings which allow you to select labels or records on the entry. Fields can currently be used to select labels or values on a record. To select a label, prefix your field with `$label.` such as with `$label.my_label`. For values on the record, use the prefix `$record.` such as `$record.my_value`. +Fields are `.`-delimited strings which allow you to select labels or records on the entry. Fields can currently be used to select labels, values on a record, or resource values. To select a label, prefix your field with `$label` such as with `$label.my_label`. For values on the record, use the prefix `$record` such as `$record.my_value`. For resource values, use the prefix `$resource`. + +If a key contains a dot in it, a field can alternatively use bracket syntax for traversing through a map. For example, to select the key `k8s.cluster.name` on the entry's record, you can use the field `$record["k8s.cluster.name"]`. Record fields can be nested arbitrarily deeply, such as `$record.my_value.my_nested_value`. diff --git a/entry/field.go b/entry/field.go index 43e412072..efa4373b6 100644 --- a/entry/field.go +++ b/entry/field.go @@ -3,7 +3,6 @@ package entry import ( "encoding/json" "fmt" - "strings" ) // Field represents a potential field on an entry. @@ -44,7 +43,10 @@ func (f *Field) UnmarshalYAML(unmarshal func(interface{}) error) error { } func fieldFromString(s string) (Field, error) { - split := strings.Split(s, ".") + split, err := splitField(s) + if err != nil { + return Field{}, fmt.Errorf("splitting field: %s", err) + } switch split[0] { case "$labels": @@ -52,6 +54,11 @@ func fieldFromString(s string) (Field, error) { return Field{}, fmt.Errorf("labels cannot be nested") } return Field{LabelField{split[1]}}, nil + case "$resource": + if len(split) != 2 { + return Field{}, fmt.Errorf("resource fields cannot be nested") + } + return Field{ResourceField{split[1]}}, nil case "$record", "$": return Field{RecordField{split[1:]}}, nil default: @@ -61,10 +68,90 @@ func fieldFromString(s string) (Field, error) { // MarshalJSON will marshal a field into JSON func (f Field) MarshalJSON() ([]byte, error) { - return []byte(fmt.Sprintf("\"%s\"", f.String())), nil + return []byte(fmt.Sprintf(`"%s"`, f.String())), nil } // MarshalYAML will marshal a field into YAML func (f Field) MarshalYAML() (interface{}, error) { return f.String(), nil } + +type splitState uint + +const ( + BEGIN splitState = iota + IN_BRACKET + IN_QUOTE + OUT_QUOTE + OUT_BRACKET + IN_UNBRACKETED_TOKEN +) + +func splitField(s string) ([]string, error) { + fields := make([]string, 0, 1) + + state := BEGIN + var quoteChar rune + var tokenStart int + + for i, c := range s { + switch state { + case BEGIN: + if c == '[' { + state = IN_BRACKET + continue + } + tokenStart = i + state = IN_UNBRACKETED_TOKEN + case IN_BRACKET: + if !(c == '\'' || c == '"') { + return nil, fmt.Errorf("strings in brackets must be surrounded by quotes") + } + state = IN_QUOTE + quoteChar = c + tokenStart = i + 1 + case IN_QUOTE: + if c == quoteChar { + fields = append(fields, s[tokenStart:i]) + state = OUT_QUOTE + } + case OUT_QUOTE: + if c != ']' { + return nil, fmt.Errorf("found characters between closed quote and closing bracket") + } + state = OUT_BRACKET + case OUT_BRACKET: + if c == '.' { + state = IN_UNBRACKETED_TOKEN + tokenStart = i + 1 + } else if c == '[' { + state = IN_BRACKET + } else { + return nil, fmt.Errorf("bracketed access must be followed by a dot or another bracketed access") + } + case IN_UNBRACKETED_TOKEN: + if c == '.' { + fields = append(fields, s[tokenStart:i]) + tokenStart = i + 1 + } else if c == '[' { + fields = append(fields, s[tokenStart:i]) + state = IN_BRACKET + } + } + } + + switch state { + case IN_BRACKET, OUT_QUOTE: + return nil, fmt.Errorf("found unclosed left bracket") + case IN_QUOTE: + if quoteChar == '"' { + return nil, fmt.Errorf("found unclosed double quote") + } else { + return nil, fmt.Errorf("found unclosed single quote") + } + case IN_UNBRACKETED_TOKEN: + fields = append(fields, s[tokenStart:]) + } + + return fields, nil +} diff --git a/entry/field_test.go b/entry/field_test.go index 9b2bcc63c..44acf1b55 100644 --- a/entry/field_test.go +++ b/entry/field_test.go @@ -118,22 +118,57 @@ func TestFieldMarshalYAML(t *testing.T) { cases := []struct { name string input interface{} - expected []byte + expected string }{ { "SimpleField", NewRecordField("test1"), - []byte("test1\n"), + "test1\n", }, { "ComplexField", NewRecordField("test1", "test2"), - []byte("test1.test2\n"), + "test1.test2\n", }, { "EmptyField", NewRecordField(), - []byte("$record\n"), + "$record\n", + }, + { + "FieldWithDots", + NewRecordField("test.1"), + "$record['test.1']\n", + }, + { + "FieldWithDotsThenNone", + NewRecordField("test.1", "test2"), + "$record['test.1']['test2']\n", + }, + { + "FieldWithNoDotsThenDots", + NewRecordField("test1", "test.2"), + "$record['test1']['test.2']\n", + }, + { + "LabelField", + NewLabelField("test1"), + "$labels.test1\n", + }, + { + "LabelFieldWithDots", + NewLabelField("test.1"), + "$labels['test.1']\n", + }, + { + "ResourceField", + NewResourceField("test1"), + "$resource.test1\n", + }, + { + "ResourceFieldWithDots", + NewResourceField("test.1"), + "$resource['test.1']\n", }, } @@ -142,7 +177,48 @@ func TestFieldMarshalYAML(t *testing.T) { res, err := yaml.Marshal(tc.input) require.NoError(t, err) - require.Equal(t, tc.expected, res) + require.Equal(t, tc.expected, string(res)) + }) + } +} + +func TestSplitField(t *testing.T) { + cases := []struct { + name string + input string + output []string + expectErr bool + }{ + {"Simple", "test", []string{"test"}, false}, + {"Sub", "test.case", []string{"test", "case"}, false}, + {"Root", "$", []string{"$"}, false}, + {"RootWithSub", "$record.field", []string{"$record", "field"}, false}, + {"RootWithTwoSub", "$record.field1.field2", []string{"$record", "field1", "field2"}, false}, + {"BracketSyntaxSingleQuote", "['test']", []string{"test"}, false}, + {"BracketSyntaxDoubleQuote", `["test"]`, []string{"test"}, false}, + {"RootSubBracketSyntax", `$record["test"]`, []string{"$record", "test"}, false}, + {"BracketThenDot", `$record["test1"].test2`, []string{"$record", "test1", "test2"}, false}, + {"BracketThenBracket", `$record["test1"]["test2"]`, []string{"$record", "test1", "test2"}, false}, + {"DotThenBracket", `$record.test1["test2"]`, []string{"$record", "test1", "test2"}, false}, + {"DotsInBrackets", `$record["test1.test2"]`, []string{"$record", "test1.test2"}, false}, + {"UnclosedBrackets", `$record["test1.test2"`, nil, true}, + {"UnclosedQuotes", `$record["test1.test2]`, nil, true}, + {"UnmatchedQuotes", `$record["test1.test2']`, nil, true}, + {"BracketAtEnd", `$record[`, nil, true}, + {"SingleQuoteAtEnd", `$record['`, nil, true}, + {"DoubleQuoteAtEnd", `$record["`, nil, true}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + s, err := splitField(tc.input) + if tc.expectErr { + require.Error(t, err) + return + } + require.NoError(t, err) + + require.Equal(t, tc.output, s) }) } } diff --git a/entry/label_field.go b/entry/label_field.go index 2a623eace..e2f7e31be 100644 --- a/entry/label_field.go +++ b/entry/label_field.go @@ -1,6 +1,9 @@ package entry -import "fmt" +import ( + "fmt" + "strings" +) // LabelField is the path to an entry label type LabelField struct { @@ -42,6 +45,9 @@ func (l LabelField) Delete(entry *Entry) (interface{}, bool) { } func (l LabelField) String() string { + if strings.Contains(l.key, ".") { + return fmt.Sprintf(`$labels['%s']`, l.key) + } return "$labels." + l.key } diff --git a/entry/record_field.go b/entry/record_field.go index 7979b6bdc..9834e1b8b 100644 --- a/entry/record_field.go +++ b/entry/record_field.go @@ -210,7 +210,32 @@ func toJSONDot(field RecordField) string { return "$record" } - return strings.Join(field.Keys, ".") + containsDots := false + for _, key := range field.Keys { + if strings.Contains(key, ".") { + containsDots = true + } + } + + var b strings.Builder + if containsDots { + b.WriteString("$record") + for _, key := range field.Keys { + b.WriteString(`['`) + b.WriteString(key) + b.WriteString(`']`) + } + } else { + for i, key := range field.Keys { + if i != 0 { + b.WriteString(".") + } + b.WriteString(key) + } + + } + + return b.String() } // NewRecordField creates a new field from an ordered array of keys. diff --git a/entry/resource_field.go b/entry/resource_field.go new file mode 100644 index 000000000..9399b6c00 --- /dev/null +++ b/entry/resource_field.go @@ -0,0 +1,57 @@ +package entry + +import ( + "fmt" + "strings" +) + +// ResourceField is the path to an entry's resource key +type ResourceField struct { + key string +} + +// Get will return the resource value and a boolean indicating if it exists +func (r ResourceField) Get(entry *Entry) (interface{}, bool) { + if entry.Resource == nil { + return "", false + } + val, ok := entry.Resource[r.key] + return val, ok +} + +// Set will set the resource value on an entry +func (r ResourceField) Set(entry *Entry, val interface{}) error { + if entry.Resource == nil { + entry.Resource = make(map[string]string, 1) + } + + str, ok := val.(string) + if !ok { + return fmt.Errorf("cannot set a resource to a non-string value") + } + entry.Resource[r.key] = str + return nil +} + +// Delete will delete a resource key from an entry +func (r ResourceField) Delete(entry *Entry) (interface{}, bool) { + if entry.Resource == nil { + return "", false + } + + val, ok := entry.Resource[r.key] + delete(entry.Resource, r.key) + return val, ok +} + +func (r ResourceField) String() string { + if strings.Contains(r.key, ".") { + return fmt.Sprintf(`$resource['%s']`, r.key) + } + return "$resource." + r.key +} + +// NewResourceField will creat a new resource field from a key +func NewResourceField(key string) Field { + return Field{ResourceField{key}} +} diff --git a/entry/resource_field_test.go b/entry/resource_field_test.go new file mode 100644 index 000000000..0605cf946 --- /dev/null +++ b/entry/resource_field_test.go @@ -0,0 +1,202 @@ +package entry + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestResourceFieldGet(t *testing.T) { + cases := []struct { + name string + resources map[string]string + field Field + expected interface{} + expectedOK bool + }{ + { + "Simple", + map[string]string{ + "test": "val", + }, + NewResourceField("test"), + "val", + true, + }, + { + "NonexistentKey", + map[string]string{ + "test": "val", + }, + NewResourceField("nonexistent"), + "", + false, + }, + { + "NilMap", + nil, + NewResourceField("nonexistent"), + "", + false, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + entry := New() + entry.Resource = tc.resources + val, ok := entry.Get(tc.field) + require.Equal(t, tc.expectedOK, ok) + require.Equal(t, tc.expected, val) + }) + } + +} + +func TestResourceFieldDelete(t *testing.T) { + cases := []struct { + name string + resources map[string]string + field Field + expected interface{} + expectedOK bool + expectedResources map[string]string + }{ + { + "Simple", + map[string]string{ + "test": "val", + }, + NewResourceField("test"), + "val", + true, + map[string]string{}, + }, + { + "NonexistentKey", + map[string]string{ + "test": "val", + }, + NewResourceField("nonexistent"), + "", + false, + map[string]string{ + "test": "val", + }, + }, + { + "NilMap", + nil, + NewResourceField("nonexistent"), + "", + false, + nil, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + entry := New() + entry.Resource = tc.resources + val, ok := entry.Delete(tc.field) + require.Equal(t, tc.expectedOK, ok) + require.Equal(t, tc.expected, val) + }) + } + +} + +func TestResourceFieldSet(t *testing.T) { + cases := []struct { + name string + resources map[string]string + field Field + val interface{} + expected map[string]string + expectedErr bool + }{ + { + "Simple", + map[string]string{}, + NewResourceField("test"), + "val", + map[string]string{ + "test": "val", + }, + false, + }, + { + "Overwrite", + map[string]string{ + "test": "original", + }, + NewResourceField("test"), + "val", + map[string]string{ + "test": "val", + }, + false, + }, + { + "NilMap", + nil, + NewResourceField("test"), + "val", + map[string]string{ + "test": "val", + }, + false, + }, + { + "NonString", + map[string]string{}, + NewResourceField("test"), + 123, + map[string]string{ + "test": "val", + }, + true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + entry := New() + entry.Resource = tc.resources + err := entry.Set(tc.field, tc.val) + if tc.expectedErr { + require.Error(t, err) + return + } + + require.Equal(t, tc.expected, entry.Resource) + }) + } + +} + +func TestResourceFieldString(t *testing.T) { + cases := []struct { + name string + field ResourceField + expected string + }{ + { + "Simple", + ResourceField{"foo"}, + "$resource.foo", + }, + { + "Empty", + ResourceField{""}, + "$resource.", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.expected, tc.field.String()) + }) + } + +} diff --git a/go.sum b/go.sum index a31d2fde4..85a1c5662 100644 --- a/go.sum +++ b/go.sum @@ -300,6 +300,7 @@ github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kyoh86/exportloopref v0.1.7 h1:u+iHuTbkbTS2D/JP7fCuZDo/t3rBVGo3Hf58Rc+lQVY= github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8= @@ -358,6 +359,7 @@ github.com/nakabonne/nestif v0.3.0 h1:+yOViDGhg8ygGrmII72nV9B/zGxY188TYpfolntsaP github.com/nakabonne/nestif v0.3.0/go.mod h1:dI314BppzXjJ4HsCnbo7XzrJHPszZsjnk5wEBSYHI2c= github.com/nbutton23/zxcvbn-go v0.0.0-20180912185939-ae427f1e4c1d h1:AREM5mwr4u1ORQBMvzfzBgpsctsbQikCVpvC+tX285E= github.com/nbutton23/zxcvbn-go v0.0.0-20180912185939-ae427f1e4c1d/go.mod h1:o96djdrsSGy3AWPyBgZMAGfxZNfgntdJG+11KU4QvbU= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nishanths/exhaustive v0.0.0-20200708172631-8866003e3856 h1:W3KBC2LFyfgd+wNudlfgCCsTo4q97MeNWrfz8/wSdSc= github.com/nishanths/exhaustive v0.0.0-20200708172631-8866003e3856/go.mod h1:wBEpHwM2OdmeNpdCvRPUlkEbBuaFmcK4Wv8Q7FuGW3c= @@ -538,6 +540,7 @@ golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522 h1:OeRHuibLsmZkFj773W4LcfAGsSxJgfPONhr8cmO+eLA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= golang.org/x/exp v0.0.0-20190829153037-c13cbed26979/go.mod h1:86+5VVa7VpoJ4kLfm080zCjGlMRFzhUhsZKEZO7MGek= +golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136 h1:A1gGSx58LAGVHUUsOf7IiR0u8Xb6W51gRwfDBhkdcaw= golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= @@ -751,6 +754,7 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= @@ -783,6 +787,7 @@ k8s.io/api v0.18.4 h1:8x49nBRxuXGUlDlwlWd3RMY1SayZrzFfxea3UZSkFw4= k8s.io/api v0.18.4/go.mod h1:lOIQAKYgai1+vz9J7YcDZwC26Z0zQewYOGWdyIPUUQ4= k8s.io/api v0.18.6 h1:osqrAXbOQjkKIWDTjrqxWQ3w0GkKb1KA1XkUGHHYpeE= k8s.io/api v0.18.6/go.mod h1:eeyxr+cwCjMdLAmr2W3RyDI0VvTawSg/3RFFBEnmZGI= +k8s.io/api v0.19.0 h1:XyrFIJqTYZJ2DU7FBE/bSPz7b1HvbVBuBf07oeo6eTc= k8s.io/apimachinery v0.18.4 h1:ST2beySjhqwJoIFk6p7Hp5v5O0hYY6Gngq/gUYXTPIA= k8s.io/apimachinery v0.18.4/go.mod h1:OaXp26zu/5J7p0f92ASynJa1pZo06YlV9fG7BoWbCko= k8s.io/apimachinery v0.18.6 h1:RtFHnfGNfd1N0LeSrKCUznz5xtUP1elRGvHJbL3Ntag= diff --git a/operator/builtin/input/k8s_events.go b/operator/builtin/input/k8s_events.go index 52d0fa135..32b76699f 100644 --- a/operator/builtin/input/k8s_events.go +++ b/operator/builtin/input/k8s_events.go @@ -7,6 +7,7 @@ import ( "time" backoff "github.com/cenkalti/backoff/v4" + "github.com/observiq/stanza/entry" "github.com/observiq/stanza/errors" "github.com/observiq/stanza/operator" "github.com/observiq/stanza/operator/helper" @@ -175,11 +176,55 @@ func (k *K8sEvents) consumeWatchEvents(ctx context.Context, events <-chan watch. continue } - entry.Timestamp = typedEvent.LastTimestamp.Time + // Prioritize EventTime > LastTimestamp > FirstTimestamp + switch { + case typedEvent.EventTime.Time != time.Time{}: + entry.Timestamp = typedEvent.EventTime.Time + case typedEvent.LastTimestamp.Time != time.Time{}: + entry.Timestamp = typedEvent.LastTimestamp.Time + case typedEvent.FirstTimestamp.Time != time.Time{}: + entry.Timestamp = typedEvent.FirstTimestamp.Time + } + entry.AddLabel("event_type", string(event.Type)) + k.populateResource(typedEvent, entry) k.Write(ctx, entry) case <-ctx.Done(): return } } } + +// populateResource uses the keys from Event.ObjectMeta to populate the resource of the entry +func (k *K8sEvents) populateResource(event *apiv1.Event, entry *entry.Entry) { + io := event.InvolvedObject + + entry.AddResourceKey("k8s.cluster.name", event.ClusterName) + entry.AddResourceKey("k8s.namespace.name", io.Namespace) + + switch io.Kind { + case "Pod": + entry.AddResourceKey("k8s.pod.uid", string(io.UID)) + entry.AddResourceKey("k8s.pod.name", io.Name) + case "Container": + entry.AddResourceKey("k8s.container.name", io.Name) + case "ReplicaSet": + entry.AddResourceKey("k8s.replicaset.uid", string(io.UID)) + entry.AddResourceKey("k8s.replicaset.name", io.Name) + case "Deployment": + entry.AddResourceKey("k8s.deployment.uid", string(io.UID)) + entry.AddResourceKey("k8s.deployment.name", io.Name) + case "StatefulSet": + entry.AddResourceKey("k8s.statefulset.uid", string(io.UID)) + entry.AddResourceKey("k8s.statefulset.name", io.Name) + case "DaemonSet": + entry.AddResourceKey("k8s.daemonset.uid", string(io.UID)) + entry.AddResourceKey("k8s.daemonset.name", io.Name) + case "Job": + entry.AddResourceKey("k8s.job.uid", string(io.UID)) + entry.AddResourceKey("k8s.job.name", io.Name) + case "CronJob": + entry.AddResourceKey("k8s.cronjob.uid", string(io.UID)) + entry.AddResourceKey("k8s.cronjob.name", io.Name) + } +} diff --git a/operator/builtin/input/k8s_events_test.go b/operator/builtin/input/k8s_events_test.go index b592615a0..0b53c9802 100644 --- a/operator/builtin/input/k8s_events_test.go +++ b/operator/builtin/input/k8s_events_test.go @@ -12,6 +12,7 @@ import ( apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" watch "k8s.io/apimachinery/pkg/watch" fakev1 "k8s.io/client-go/kubernetes/typed/core/v1/fake" fakeTest "k8s.io/client-go/testing" @@ -27,6 +28,15 @@ func (f *fakeWatch) ResultChan() <-chan watch.Event { ch <- watch.Event{ Type: "ADDED", Object: (&apiv1.Event{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + }, + InvolvedObject: apiv1.ObjectReference{ + Kind: "Pod", + Name: "testpodname", + UID: types.UID("testuid"), + Namespace: "testnamespace", + }, LastTimestamp: metav1.Time{ Time: fakeTime, }, @@ -63,6 +73,9 @@ func TestWatchNamespace(t *testing.T) { select { case entry := <-fake.Received: require.Equal(t, entry.Timestamp, fakeTime) + require.Equal(t, entry.Resource["k8s.namespace.name"], "testnamespace") + require.Equal(t, entry.Resource["k8s.pod.uid"], "testuid") + require.Equal(t, entry.Resource["k8s.pod.name"], "testpodname") case <-time.After(time.Second): require.FailNow(t, "Timed out waiting for entry") } diff --git a/operator/builtin/output/google_cloud.go b/operator/builtin/output/googlecloud/google_cloud.go similarity index 51% rename from operator/builtin/output/google_cloud.go rename to operator/builtin/output/googlecloud/google_cloud.go index a7efe45a2..d6495cfa6 100644 --- a/operator/builtin/output/google_cloud.go +++ b/operator/builtin/output/googlecloud/google_cloud.go @@ -1,16 +1,14 @@ -package output +package googlecloud import ( "context" "fmt" "io/ioutil" "net/url" - "reflect" "time" vkit "cloud.google.com/go/logging/apiv2" "github.com/golang/protobuf/ptypes" - structpb "github.com/golang/protobuf/ptypes/struct" "github.com/observiq/stanza/entry" "github.com/observiq/stanza/errors" "github.com/observiq/stanza/internal/version" @@ -21,7 +19,6 @@ import ( "golang.org/x/oauth2/google" "google.golang.org/api/option" mrpb "google.golang.org/genproto/googleapis/api/monitoredres" - sev "google.golang.org/genproto/googleapis/logging/type" logpb "google.golang.org/genproto/googleapis/logging/v2" "google.golang.org/grpc" "google.golang.org/grpc/encoding/gzip" @@ -194,7 +191,7 @@ func (p *GoogleCloudOutput) ProcessMulti(ctx context.Context, entries []*entry.E req := logpb.WriteLogEntriesRequest{ LogName: p.toLogNamePath("default"), Entries: pbEntries, - Resource: globalResource(p.projectID), + Resource: p.defaultResource(), } ctx, cancel := context.WithTimeout(ctx, p.timeout) @@ -207,6 +204,15 @@ func (p *GoogleCloudOutput) ProcessMulti(ctx context.Context, entries []*entry.E return nil } +func (p *GoogleCloudOutput) defaultResource() *mrpb.MonitoredResource { + return &mrpb.MonitoredResource{ + Type: "global", + Labels: map[string]string{ + "project_id": p.projectID, + }, + } +} + func (p *GoogleCloudOutput) toLogNamePath(logName string) string { return fmt.Sprintf("projects/%s/logs/%s", p.projectID, url.PathEscape(logName)) } @@ -251,225 +257,13 @@ func (p *GoogleCloudOutput) createProtobufEntry(e *entry.Entry) (newEntry *logpb } } - newEntry.Severity = interpretSeverity(e.Severity) + newEntry.Severity = convertSeverity(e.Severity) err = setPayload(newEntry, e.Record) if err != nil { return nil, errors.Wrap(err, "set entry payload") } - return newEntry, nil -} - -func setPayload(entry *logpb.LogEntry, record interface{}) (err error) { - // Protect against the panic condition inside `jsonValueToStructValue` - defer func() { - if r := recover(); r != nil { - err = fmt.Errorf(r.(string)) - } - }() - switch p := record.(type) { - case string: - entry.Payload = &logpb.LogEntry_TextPayload{TextPayload: p} - case []byte: - entry.Payload = &logpb.LogEntry_TextPayload{TextPayload: string(p)} - case map[string]interface{}: - s := jsonMapToProtoStruct(p) - entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s} - case map[string]string: - fields := map[string]*structpb.Value{} - for k, v := range p { - fields[k] = jsonValueToStructValue(v) - } - entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: &structpb.Struct{Fields: fields}} - default: - return fmt.Errorf("cannot convert record of type %T to a protobuf representation", record) - } - - return nil -} - -func jsonMapToProtoStruct(m map[string]interface{}) *structpb.Struct { - fields := map[string]*structpb.Value{} - for k, v := range m { - fields[k] = jsonValueToStructValue(v) - } - return &structpb.Struct{Fields: fields} -} - -func jsonValueToStructValue(v interface{}) *structpb.Value { - switch x := v.(type) { - case bool: - return &structpb.Value{Kind: &structpb.Value_BoolValue{BoolValue: x}} - case float32: - return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(x)}} - case float64: - return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: x}} - case int: - return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(x)}} - case int8: - return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(x)}} - case int16: - return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(x)}} - case int32: - return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(x)}} - case int64: - return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(x)}} - case uint: - return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(x)}} - case uint8: - return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(x)}} - case uint16: - return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(x)}} - case uint32: - return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(x)}} - case uint64: - return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(x)}} - case string: - return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: x}} - case nil: - return &structpb.Value{Kind: &structpb.Value_NullValue{}} - case map[string]interface{}: - return &structpb.Value{Kind: &structpb.Value_StructValue{StructValue: jsonMapToProtoStruct(x)}} - case map[string]map[string]string: - fields := map[string]*structpb.Value{} - for k, v := range x { - fields[k] = jsonValueToStructValue(v) - } - return &structpb.Value{Kind: &structpb.Value_StructValue{StructValue: &structpb.Struct{Fields: fields}}} - case map[string]string: - fields := map[string]*structpb.Value{} - for k, v := range x { - fields[k] = &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: v}} - } - return &structpb.Value{Kind: &structpb.Value_StructValue{StructValue: &structpb.Struct{Fields: fields}}} - case []interface{}: - var vals []*structpb.Value - for _, e := range x { - vals = append(vals, jsonValueToStructValue(e)) - } - return &structpb.Value{Kind: &structpb.Value_ListValue{ListValue: &structpb.ListValue{Values: vals}}} - case []string: - var vals []*structpb.Value - for _, e := range x { - vals = append(vals, jsonValueToStructValue(e)) - } - return &structpb.Value{Kind: &structpb.Value_ListValue{ListValue: &structpb.ListValue{Values: vals}}} - default: - // Fallback to reflection for other types - return reflectToValue(reflect.ValueOf(v)) - } -} - -func reflectToValue(v reflect.Value) *structpb.Value { - switch v.Kind() { - case reflect.Bool: - return &structpb.Value{Kind: &structpb.Value_BoolValue{BoolValue: v.Bool()}} - case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: - return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(v.Int())}} - case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr: - return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(v.Uint())}} - case reflect.Float32, reflect.Float64: - return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: v.Float()}} - case reflect.Ptr: - if v.IsNil() { - return nil - } - return reflectToValue(reflect.Indirect(v)) - case reflect.Array, reflect.Slice: - size := v.Len() - if size == 0 { - return nil - } - values := make([]*structpb.Value, size) - for i := 0; i < size; i++ { - values[i] = reflectToValue(v.Index(i)) - } - return &structpb.Value{Kind: &structpb.Value_ListValue{ListValue: &structpb.ListValue{Values: values}}} - case reflect.Struct: - t := v.Type() - size := v.NumField() - if size == 0 { - return nil - } - fields := make(map[string]*structpb.Value, size) - for i := 0; i < size; i++ { - name := t.Field(i).Name - // Better way? - if len(name) > 0 && 'A' <= name[0] && name[0] <= 'Z' { - fields[name] = reflectToValue(v.Field(i)) - } - } - if len(fields) == 0 { - return nil - } - return &structpb.Value{Kind: &structpb.Value_StructValue{StructValue: &structpb.Struct{Fields: fields}}} - case reflect.Map: - keys := v.MapKeys() - if len(keys) == 0 { - return nil - } - fields := make(map[string]*structpb.Value, len(keys)) - for _, k := range keys { - if k.Kind() == reflect.String { - fields[k.String()] = reflectToValue(v.MapIndex(k)) - } - } - if len(fields) == 0 { - return nil - } - return &structpb.Value{Kind: &structpb.Value_StructValue{StructValue: &structpb.Struct{Fields: fields}}} - default: - // Last resort - return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: fmt.Sprint(v)}} - } -} - -func globalResource(projectID string) *mrpb.MonitoredResource { - return &mrpb.MonitoredResource{ - Type: "global", - Labels: map[string]string{ - "project_id": projectID, - }, - } -} - -var fastSev = map[entry.Severity]sev.LogSeverity{ - entry.Catastrophe: sev.LogSeverity_EMERGENCY, - entry.Emergency: sev.LogSeverity_EMERGENCY, - entry.Alert: sev.LogSeverity_ALERT, - entry.Critical: sev.LogSeverity_CRITICAL, - entry.Error: sev.LogSeverity_ERROR, - entry.Warning: sev.LogSeverity_WARNING, - entry.Notice: sev.LogSeverity_NOTICE, - entry.Info: sev.LogSeverity_INFO, - entry.Debug: sev.LogSeverity_DEBUG, - entry.Trace: sev.LogSeverity_DEBUG, - entry.Default: sev.LogSeverity_DEFAULT, -} - -func interpretSeverity(s entry.Severity) sev.LogSeverity { - if logSev, ok := fastSev[s]; ok { - return logSev - } + newEntry.Resource = getResource(e) - switch { - case s >= entry.Emergency: - return sev.LogSeverity_EMERGENCY - case s >= entry.Alert: - return sev.LogSeverity_ALERT - case s >= entry.Critical: - return sev.LogSeverity_CRITICAL - case s >= entry.Error: - return sev.LogSeverity_ERROR - case s >= entry.Warning: - return sev.LogSeverity_WARNING - case s >= entry.Notice: - return sev.LogSeverity_NOTICE - case s >= entry.Info: - return sev.LogSeverity_INFO - case s > entry.Default: - return sev.LogSeverity_DEBUG - default: - return sev.LogSeverity_DEFAULT - } + return newEntry, nil } diff --git a/operator/builtin/output/google_cloud_test.go b/operator/builtin/output/googlecloud/google_cloud_test.go similarity index 99% rename from operator/builtin/output/google_cloud_test.go rename to operator/builtin/output/googlecloud/google_cloud_test.go index 8b4b75d56..64abd4149 100644 --- a/operator/builtin/output/google_cloud_test.go +++ b/operator/builtin/output/googlecloud/google_cloud_test.go @@ -1,4 +1,4 @@ -package output +package googlecloud import ( "context" diff --git a/operator/builtin/output/googlecloud/payload.go b/operator/builtin/output/googlecloud/payload.go new file mode 100644 index 000000000..aeecf63d3 --- /dev/null +++ b/operator/builtin/output/googlecloud/payload.go @@ -0,0 +1,173 @@ +package googlecloud + +import ( + "fmt" + "reflect" + + structpb "github.com/golang/protobuf/ptypes/struct" + logpb "google.golang.org/genproto/googleapis/logging/v2" +) + +func setPayload(entry *logpb.LogEntry, record interface{}) (err error) { + // Protect against the panic condition inside `jsonValueToStructValue` + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf(r.(string)) + } + }() + switch p := record.(type) { + case string: + entry.Payload = &logpb.LogEntry_TextPayload{TextPayload: p} + case []byte: + entry.Payload = &logpb.LogEntry_TextPayload{TextPayload: string(p)} + case map[string]interface{}: + s := jsonMapToProtoStruct(p) + entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s} + case map[string]string: + fields := map[string]*structpb.Value{} + for k, v := range p { + fields[k] = jsonValueToStructValue(v) + } + entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: &structpb.Struct{Fields: fields}} + default: + return fmt.Errorf("cannot convert record of type %T to a protobuf representation", record) + } + + return nil +} + +func jsonMapToProtoStruct(m map[string]interface{}) *structpb.Struct { + fields := map[string]*structpb.Value{} + for k, v := range m { + fields[k] = jsonValueToStructValue(v) + } + return &structpb.Struct{Fields: fields} +} + +func jsonValueToStructValue(v interface{}) *structpb.Value { + switch x := v.(type) { + case bool: + return &structpb.Value{Kind: &structpb.Value_BoolValue{BoolValue: x}} + case float32: + return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(x)}} + case float64: + return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: x}} + case int: + return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(x)}} + case int8: + return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(x)}} + case int16: + return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(x)}} + case int32: + return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(x)}} + case int64: + return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(x)}} + case uint: + return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(x)}} + case uint8: + return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(x)}} + case uint16: + return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(x)}} + case uint32: + return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(x)}} + case uint64: + return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(x)}} + case string: + return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: x}} + case nil: + return &structpb.Value{Kind: &structpb.Value_NullValue{}} + case map[string]interface{}: + return &structpb.Value{Kind: &structpb.Value_StructValue{StructValue: jsonMapToProtoStruct(x)}} + case map[string]map[string]string: + fields := map[string]*structpb.Value{} + for k, v := range x { + fields[k] = jsonValueToStructValue(v) + } + return &structpb.Value{Kind: &structpb.Value_StructValue{StructValue: &structpb.Struct{Fields: fields}}} + case map[string]string: + fields := map[string]*structpb.Value{} + for k, v := range x { + fields[k] = &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: v}} + } + return &structpb.Value{Kind: &structpb.Value_StructValue{StructValue: &structpb.Struct{Fields: fields}}} + case []interface{}: + var vals []*structpb.Value + for _, e := range x { + vals = append(vals, jsonValueToStructValue(e)) + } + return &structpb.Value{Kind: &structpb.Value_ListValue{ListValue: &structpb.ListValue{Values: vals}}} + case []string: + var vals []*structpb.Value + for _, e := range x { + vals = append(vals, jsonValueToStructValue(e)) + } + return &structpb.Value{Kind: &structpb.Value_ListValue{ListValue: &structpb.ListValue{Values: vals}}} + default: + // Fallback to reflection for other types + return reflectToValue(reflect.ValueOf(v)) + } +} + +func reflectToValue(v reflect.Value) *structpb.Value { + switch v.Kind() { + case reflect.Bool: + return &structpb.Value{Kind: &structpb.Value_BoolValue{BoolValue: v.Bool()}} + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(v.Int())}} + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr: + return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(v.Uint())}} + case reflect.Float32, reflect.Float64: + return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: v.Float()}} + case reflect.Ptr: + if v.IsNil() { + return nil + } + return reflectToValue(reflect.Indirect(v)) + case reflect.Array, reflect.Slice: + size := v.Len() + if size == 0 { + return nil + } + values := make([]*structpb.Value, size) + for i := 0; i < size; i++ { + values[i] = reflectToValue(v.Index(i)) + } + return &structpb.Value{Kind: &structpb.Value_ListValue{ListValue: &structpb.ListValue{Values: values}}} + case reflect.Struct: + t := v.Type() + size := v.NumField() + if size == 0 { + return nil + } + fields := make(map[string]*structpb.Value, size) + for i := 0; i < size; i++ { + name := t.Field(i).Name + // Better way? + if len(name) > 0 && 'A' <= name[0] && name[0] <= 'Z' { + fields[name] = reflectToValue(v.Field(i)) + } + } + if len(fields) == 0 { + return nil + } + return &structpb.Value{Kind: &structpb.Value_StructValue{StructValue: &structpb.Struct{Fields: fields}}} + case reflect.Map: + keys := v.MapKeys() + if len(keys) == 0 { + return nil + } + fields := make(map[string]*structpb.Value, len(keys)) + for _, k := range keys { + if k.Kind() == reflect.String { + fields[k.String()] = reflectToValue(v.MapIndex(k)) + } + } + if len(fields) == 0 { + return nil + } + return &structpb.Value{Kind: &structpb.Value_StructValue{StructValue: &structpb.Struct{Fields: fields}}} + default: + // Last resort + return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: fmt.Sprint(v)}} + } +} diff --git a/operator/builtin/output/googlecloud/resource.go b/operator/builtin/output/googlecloud/resource.go new file mode 100644 index 000000000..f16dd913e --- /dev/null +++ b/operator/builtin/output/googlecloud/resource.go @@ -0,0 +1,114 @@ +package googlecloud + +import ( + "github.com/observiq/stanza/entry" + mrpb "google.golang.org/genproto/googleapis/api/monitoredres" +) + +// For more about monitored resources, see: +// https://cloud.google.com/logging/docs/api/v2/resource-list#resource-types + +func getResource(e *entry.Entry) *mrpb.MonitoredResource { + rt := detectResourceType(e) + if rt == "" { + return nil + } + + switch rt { + case "k8s_pod": + return k8sPodResource(e) + case "k8s_container": + return k8sContainerResource(e) + case "k8s_node": + return k8sNodeResource(e) + case "k8s_cluster": + return k8sClusterResource(e) + case "generic_node": + return genericNodeResource(e) + } + + return nil +} + +func detectResourceType(e *entry.Entry) string { + if hasResource("k8s.pod.name", e) { + if hasResource("container.name", e) { + return "k8s_container" + } + return "k8s_pod" + } + + if hasResource("k8s.cluster.name", e) { + if hasResource("host.name", e) { + return "k8s_node" + } + return "k8s_cluster" + } + + if hasResource("host.name", e) { + return "generic_node" + } + + return "" +} + +func hasResource(key string, e *entry.Entry) bool { + _, ok := e.Resource[key] + return ok +} + +func k8sPodResource(e *entry.Entry) *mrpb.MonitoredResource { + return &mrpb.MonitoredResource{ + Type: "k8s_pod", + Labels: map[string]string{ + "pod_name": e.Resource["k8s.pod.name"], + "namespace_name": e.Resource["k8s.namespace.name"], + "cluster_name": e.Resource["k8s.cluster.name"], + // TODO project id + }, + } +} + +func k8sContainerResource(e *entry.Entry) *mrpb.MonitoredResource { + return &mrpb.MonitoredResource{ + Type: "k8s_container", + Labels: map[string]string{ + "container_name": e.Resource["container.name"], + "pod_name": e.Resource["k8s.pod.name"], + "namespace_name": e.Resource["k8s.namespace.name"], + "cluster_name": e.Resource["k8s.cluster.name"], + // TODO project id + }, + } +} + +func k8sNodeResource(e *entry.Entry) *mrpb.MonitoredResource { + return &mrpb.MonitoredResource{ + Type: "k8s_node", + Labels: map[string]string{ + "cluster_name": e.Resource["k8s.cluster.name"], + "node_name": e.Resource["host.name"], + // TODO project id + }, + } +} + +func k8sClusterResource(e *entry.Entry) *mrpb.MonitoredResource { + return &mrpb.MonitoredResource{ + Type: "k8s_cluster", + Labels: map[string]string{ + "cluster_name": e.Resource["k8s.cluster.name"], + // TODO project id + }, + } +} + +func genericNodeResource(e *entry.Entry) *mrpb.MonitoredResource { + return &mrpb.MonitoredResource{ + Type: "generic_node", + Labels: map[string]string{ + "node_id": e.Resource["host.name"], + // TODO project id + }, + } +} diff --git a/operator/builtin/output/googlecloud/severity.go b/operator/builtin/output/googlecloud/severity.go new file mode 100644 index 000000000..df7984ff8 --- /dev/null +++ b/operator/builtin/output/googlecloud/severity.go @@ -0,0 +1,47 @@ +package googlecloud + +import ( + "github.com/observiq/stanza/entry" + sev "google.golang.org/genproto/googleapis/logging/type" +) + +var fastSev = map[entry.Severity]sev.LogSeverity{ + entry.Catastrophe: sev.LogSeverity_EMERGENCY, + entry.Emergency: sev.LogSeverity_EMERGENCY, + entry.Alert: sev.LogSeverity_ALERT, + entry.Critical: sev.LogSeverity_CRITICAL, + entry.Error: sev.LogSeverity_ERROR, + entry.Warning: sev.LogSeverity_WARNING, + entry.Notice: sev.LogSeverity_NOTICE, + entry.Info: sev.LogSeverity_INFO, + entry.Debug: sev.LogSeverity_DEBUG, + entry.Trace: sev.LogSeverity_DEBUG, + entry.Default: sev.LogSeverity_DEFAULT, +} + +func convertSeverity(s entry.Severity) sev.LogSeverity { + if logSev, ok := fastSev[s]; ok { + return logSev + } + + switch { + case s >= entry.Emergency: + return sev.LogSeverity_EMERGENCY + case s >= entry.Alert: + return sev.LogSeverity_ALERT + case s >= entry.Critical: + return sev.LogSeverity_CRITICAL + case s >= entry.Error: + return sev.LogSeverity_ERROR + case s >= entry.Warning: + return sev.LogSeverity_WARNING + case s >= entry.Notice: + return sev.LogSeverity_NOTICE + case s >= entry.Info: + return sev.LogSeverity_INFO + case s > entry.Default: + return sev.LogSeverity_DEBUG + default: + return sev.LogSeverity_DEFAULT + } +} diff --git a/operator/builtin/output/output.go b/operator/builtin/output/output.go new file mode 100644 index 000000000..5c4a29cbd --- /dev/null +++ b/operator/builtin/output/output.go @@ -0,0 +1,5 @@ +package output + +import ( + _ "github.com/observiq/stanza/operator/builtin/output/googlecloud" +) diff --git a/operator/builtin/transformer/k8s_metadata_decorator.go b/operator/builtin/transformer/k8s_metadata_decorator.go index f1bb6c1c8..9983a5d3e 100644 --- a/operator/builtin/transformer/k8s_metadata_decorator.go +++ b/operator/builtin/transformer/k8s_metadata_decorator.go @@ -21,8 +21,8 @@ func init() { func NewK8sMetadataDecoratorConfig(operatorID string) *K8sMetadataDecoratorConfig { return &K8sMetadataDecoratorConfig{ TransformerConfig: helper.NewTransformerConfig(operatorID, "k8s_metadata_decorator"), - PodNameField: entry.NewRecordField("pod_name"), - NamespaceField: entry.NewRecordField("namespace"), + PodNameField: entry.NewResourceField("k8s.pod.name"), + NamespaceField: entry.NewResourceField("k8s.namespace.name"), CacheTTL: operator.Duration{Duration: 10 * time.Minute}, Timeout: operator.Duration{Duration: 10 * time.Second}, } @@ -69,6 +69,8 @@ type K8sMetadataDecorator struct { // MetadataCacheEntry is an entry in the metadata cache type MetadataCacheEntry struct { + ClusterName string + UID string ExpirationTime time.Time Labels map[string]string Annotations map[string]string @@ -198,7 +200,9 @@ func (k *K8sMetadataDecorator) refreshNamespaceMetadata(ctx context.Context, nam // Cache the results cacheEntry := MetadataCacheEntry{ + ClusterName: namespaceResponse.ClusterName, ExpirationTime: time.Now().Add(k.cacheTTL), + UID: string(namespaceResponse.UID), Labels: namespaceResponse.Labels, Annotations: namespaceResponse.Annotations, } @@ -229,6 +233,8 @@ func (k *K8sMetadataDecorator) refreshPodMetadata(ctx context.Context, namespace // Cache the results cacheEntry := MetadataCacheEntry{ + ClusterName: podResponse.ClusterName, + UID: string(podResponse.UID), ExpirationTime: time.Now().Add(k.cacheTTL), Labels: podResponse.Labels, Annotations: podResponse.Annotations, @@ -244,24 +250,30 @@ func (k *K8sMetadataDecorator) decorateEntryWithNamespaceMetadata(nsMeta Metadat } for k, v := range nsMeta.Annotations { - entry.Labels["k8s_ns_annotation/"+k] = v + entry.Labels["k8s-ns-annotation/"+k] = v } for k, v := range nsMeta.Labels { - entry.Labels["k8s_ns_label/"+k] = v + entry.Labels["k8s-ns/"+k] = v } + + entry.Resource["k8s.namespace.uid"] = nsMeta.UID + entry.Resource["k8s.cluster.name"] = nsMeta.ClusterName } -func (k *K8sMetadataDecorator) decorateEntryWithPodMetadata(nsMeta MetadataCacheEntry, entry *entry.Entry) { +func (k *K8sMetadataDecorator) decorateEntryWithPodMetadata(podMeta MetadataCacheEntry, entry *entry.Entry) { if entry.Labels == nil { entry.Labels = make(map[string]string) } - for k, v := range nsMeta.Annotations { - entry.Labels["k8s_pod_annotation/"+k] = v + for k, v := range podMeta.Annotations { + entry.Labels["k8s-pod-annotation/"+k] = v } - for k, v := range nsMeta.Labels { - entry.Labels["k8s_pod_label/"+k] = v + for k, v := range podMeta.Labels { + entry.Labels["k8s-pod/"+k] = v } + + entry.Resource["k8s.pod.uid"] = podMeta.UID + entry.Resource["k8s.cluster.name"] = podMeta.ClusterName } diff --git a/operator/builtin/transformer/k8s_metadata_decorator_test.go b/operator/builtin/transformer/k8s_metadata_decorator_test.go index 83a8f43f8..fb5b4cece 100644 --- a/operator/builtin/transformer/k8s_metadata_decorator_test.go +++ b/operator/builtin/transformer/k8s_metadata_decorator_test.go @@ -50,8 +50,8 @@ func TestK8sMetadataDecoratorBuildDefault(t *testing.T) { }, OnError: "send", }, - podNameField: entry.NewRecordField("pod_name"), - namespaceField: entry.NewRecordField("namespace"), + podNameField: entry.NewResourceField("k8s.pod.name"), + namespaceField: entry.NewResourceField("k8s.namespace.name"), cacheTTL: 10 * time.Minute, timeout: 10 * time.Second, } @@ -96,10 +96,10 @@ func TestK8sMetadataDecoratorCachedMetadata(t *testing.T) { expected := entry.Entry{ Labels: map[string]string{ - "k8s_pod_label/podlabel1": "podlab1", - "k8s_ns_label/label1": "lab1", - "k8s_pod_annotation/podannotation1": "podann1", - "k8s_ns_annotation/annotation1": "ann1", + "k8s-pod/podlabel1": "podlab1", + "k8s-ns/label1": "lab1", + "k8s-pod-annotation/podannotation1": "podann1", + "k8s-ns-annotation/annotation1": "ann1", }, } @@ -109,9 +109,9 @@ func TestK8sMetadataDecoratorCachedMetadata(t *testing.T) { }).Return(nil) e := &entry.Entry{ - Record: map[string]interface{}{ - "pod_name": "testpodname", - "namespace": "testnamespace", + Resource: map[string]string{ + "k8s.pod.name": "testpodname", + "k8s.namespace.name": "testnamespace", }, } err = pg.Process(context.Background(), e) diff --git a/operator/helper/host_identifier.go b/operator/helper/host_identifier.go index ec45209bb..0b7f12ec6 100644 --- a/operator/helper/host_identifier.go +++ b/operator/helper/host_identifier.go @@ -116,10 +116,10 @@ type HostIdentifier struct { // Identify will add host related metadata to an entry's resource func (h *HostIdentifier) Identify(entry *entry.Entry) { if h.includeHostname { - entry.AddResourceKey("hostname", h.hostname) + entry.AddResourceKey("host.name", h.hostname) } if h.includeIP { - entry.AddResourceKey("ip", h.ip) + entry.AddResourceKey("host.ip", h.ip) } } diff --git a/operator/helper/host_identifier_test.go b/operator/helper/host_identifier_test.go index d369653dc..07d795dda 100644 --- a/operator/helper/host_identifier_test.go +++ b/operator/helper/host_identifier_test.go @@ -26,22 +26,22 @@ func TestHostLabeler(t *testing.T) { "HostnameAndIP", MockHostIdentifierConfig(true, true, "ip", "hostname"), map[string]string{ - "hostname": "hostname", - "ip": "ip", + "host.name": "hostname", + "host.ip": "ip", }, }, { "HostnameNoIP", MockHostIdentifierConfig(false, true, "ip", "hostname"), map[string]string{ - "hostname": "hostname", + "host.name": "hostname", }, }, { "IPNoHostname", MockHostIdentifierConfig(true, false, "ip", "hostname"), map[string]string{ - "ip": "ip", + "host.ip": "ip", }, }, { diff --git a/pipeline/config.go b/pipeline/config.go index 26c1287b0..f8b5faa49 100644 --- a/pipeline/config.go +++ b/pipeline/config.go @@ -240,6 +240,7 @@ func (p Params) buildPlugin(pluginRegistry operator.PluginRegistry, namespace st templateParams["input"] = p.TemplateInput(namespace) templateParams["output"] = p.TemplateOutput(namespace, defaultOutput) + templateParams["id"] = p.ID() config, err := pluginRegistry.Render(p.Type(), templateParams) if err != nil {