From a323872be01eeaa523060370f8f1ab54563e8f2a Mon Sep 17 00:00:00 2001 From: chrismark Date: Thu, 21 Jan 2021 11:51:09 +0200 Subject: [PATCH 1/4] Add deployment name ancestor of pod in meta Signed-off-by: chrismark --- .../common/kubernetes/metadata/metadata.go | 2 +- libbeat/common/kubernetes/metadata/pod.go | 41 ++++++++++++++++++- libbeat/common/kubernetes/watcher.go | 8 ++++ .../module/kubernetes/util/kubernetes.go | 4 +- 4 files changed, 51 insertions(+), 4 deletions(-) diff --git a/libbeat/common/kubernetes/metadata/metadata.go b/libbeat/common/kubernetes/metadata/metadata.go index 897f6c4bc84e..057a3b0caf57 100644 --- a/libbeat/common/kubernetes/metadata/metadata.go +++ b/libbeat/common/kubernetes/metadata/metadata.go @@ -67,7 +67,7 @@ func GetPodMetaGen( if namespaceWatcher != nil && metaConf.Namespace.Enabled() { namespaceMetaGen = NewNamespaceMetadataGenerator(metaConf.Namespace, namespaceWatcher.Store()) } - metaGen := NewPodMetadataGenerator(cfg, podWatcher.Store(), nodeMetaGen, namespaceMetaGen) + metaGen := NewPodMetadataGenerator(cfg, podWatcher.Store(), podWatcher.Client(), nodeMetaGen, namespaceMetaGen) return metaGen } diff --git a/libbeat/common/kubernetes/metadata/pod.go b/libbeat/common/kubernetes/metadata/pod.go index 9234f54533b7..5e2d2439151f 100644 --- a/libbeat/common/kubernetes/metadata/pod.go +++ b/libbeat/common/kubernetes/metadata/pod.go @@ -18,6 +18,10 @@ package metadata import ( + "context" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8s "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "github.com/elastic/beats/v7/libbeat/common" @@ -26,18 +30,25 @@ import ( type pod struct { store cache.Store + client k8s.Interface node MetaGen namespace MetaGen resource *Resource } // NewPodMetadataGenerator creates a metagen for pod resources -func NewPodMetadataGenerator(cfg *common.Config, pods cache.Store, node MetaGen, namespace MetaGen) MetaGen { +func NewPodMetadataGenerator( + cfg *common.Config, + pods cache.Store, + client k8s.Interface, + node MetaGen, + namespace MetaGen) MetaGen { return &pod{ resource: NewResourceMetadataGenerator(cfg), store: pods, node: node, namespace: namespace, + client: client, } } @@ -50,6 +61,16 @@ func (p *pod) Generate(obj kubernetes.Resource, opts ...FieldOptions) common.Map out := p.resource.Generate("pod", obj, opts...) + // check if Pod is handled by a ReplicaSet which is controlled by a Deployment + rsName, _ := out.GetValue("replicaset.name") + if rsName != nil { + rsName := rsName.(string) + dep := p.getRSDeployment(rsName, po.GetNamespace()) + if dep != "" { + out.Put("deployment.name", dep) + } + } + if p.node != nil { meta := p.node.GenerateFromName(po.Spec.NodeName, WithLabels("node")) if meta != nil { @@ -89,3 +110,21 @@ func (p *pod) GenerateFromName(name string, opts ...FieldOptions) common.MapStr return nil } + +// getRSDeployment return the name of the Deployment object that +// owns the ReplicaSet with the given name under the given Namespace +func (p *pod) getRSDeployment(rsName string, ns string) string { + rs, err := p.client.AppsV1().ReplicaSets(ns).Get(context.TODO(), rsName, metav1.GetOptions{}) + if err != nil { + return "" + } + for _, ref := range rs.GetOwnerReferences() { + if ref.Controller != nil && *ref.Controller { + switch ref.Kind { + case "Deployment": + return ref.Name + } + } + } + return "" +} diff --git a/libbeat/common/kubernetes/watcher.go b/libbeat/common/kubernetes/watcher.go index f72a98de0a7e..a7e2a6319774 100644 --- a/libbeat/common/kubernetes/watcher.go +++ b/libbeat/common/kubernetes/watcher.go @@ -56,6 +56,9 @@ type Watcher interface { // Store returns the store object for the watcher Store() cache.Store + + // Client returns the kubernetes client object used by the watcher + Client() kubernetes.Interface } // WatchOptions controls watch behaviors @@ -165,6 +168,11 @@ func (w *watcher) Store() cache.Store { return w.store } +// Client returns the kubernetes client object used by the watcher +func (w *watcher) Client() kubernetes.Interface { + return w.client +} + // Start watching pods func (w *watcher) Start() error { go w.informer.Run(w.ctx.Done()) diff --git a/metricbeat/module/kubernetes/util/kubernetes.go b/metricbeat/module/kubernetes/util/kubernetes.go index 7f8317daa351..c0c180ca5af3 100644 --- a/metricbeat/module/kubernetes/util/kubernetes.go +++ b/metricbeat/module/kubernetes/util/kubernetes.go @@ -129,7 +129,7 @@ func NewResourceMetadataEnricher( cfg, _ := common.NewConfigFrom(&metaConfig) metaGen := metadata.NewResourceMetadataGenerator(cfg) - podMetaGen := metadata.NewPodMetadataGenerator(cfg, nil, nil, nil) + podMetaGen := metadata.NewPodMetadataGenerator(cfg, nil, watcher.Client(), nil, nil) enricher := buildMetadataEnricher(watcher, // update func(m map[string]common.MapStr, r kubernetes.Resource) { @@ -213,7 +213,7 @@ func NewContainerMetadataEnricher( cfg, _ := common.NewConfigFrom(&metaConfig) - metaGen := metadata.NewPodMetadataGenerator(cfg, nil, nil, nil) + metaGen := metadata.NewPodMetadataGenerator(cfg, nil, watcher.Client(), nil, nil) enricher := buildMetadataEnricher(watcher, // update func(m map[string]common.MapStr, r kubernetes.Resource) { From 41e9de05a5b224eff4da38a085f28323a7e999b4 Mon Sep 17 00:00:00 2001 From: chrismark Date: Thu, 21 Jan 2021 13:00:52 +0200 Subject: [PATCH 2/4] Fix and add tests Signed-off-by: chrismark --- .../providers/kubernetes/pod_test.go | 2 +- .../common/kubernetes/metadata/pod_test.go | 114 +++++++++++++++++- .../add_kubernetes_metadata/indexers_test.go | 8 +- .../module/kubernetes/util/kubernetes_test.go | 5 + 4 files changed, 121 insertions(+), 8 deletions(-) diff --git a/libbeat/autodiscover/providers/kubernetes/pod_test.go b/libbeat/autodiscover/providers/kubernetes/pod_test.go index 1673d295ffb1..8217cb4bcda9 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod_test.go +++ b/libbeat/autodiscover/providers/kubernetes/pod_test.go @@ -1497,7 +1497,7 @@ func TestEmitEvent(t *testing.T) { t.Fatal(err) } - metaGen := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil) + metaGen := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil, nil) p := &Provider{ config: defaultConfig(), bus: bus.New(logp.NewLogger("bus"), "test"), diff --git a/libbeat/common/kubernetes/metadata/pod_test.go b/libbeat/common/kubernetes/metadata/pod_test.go index afc3db487dec..47041f6896bf 100644 --- a/libbeat/common/kubernetes/metadata/pod_test.go +++ b/libbeat/common/kubernetes/metadata/pod_test.go @@ -18,15 +18,18 @@ package metadata import ( + "context" "fmt" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + k8sfake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" "github.com/elastic/beats/v7/libbeat/common" @@ -34,10 +37,61 @@ import ( ) func TestPod_Generate(t *testing.T) { + client := k8sfake.NewSimpleClientset() uid := "005f3b90-4b9d-12f8-acf0-31020a840133" namespace := "default" name := "obj" boolean := true + rs := &appsv1.ReplicaSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "nginx-rs", + Namespace: namespace, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps", + Kind: "Deployment", + Name: "nginx-deployment", + UID: "005f3b90-4b9d-12f8-acf0-31020a840144", + Controller: &boolean, + }, + }, + }, + Spec: appsv1.ReplicaSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "demo", + }, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "demo", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "nginx", + Image: "nginx:1.12", + Ports: []v1.ContainerPort{ + { + Name: "http", + Protocol: v1.ProtocolTCP, + ContainerPort: 80, + }, + }, + }, + }, + }, + }, + }, + } + + _, err := client.AppsV1().ReplicaSets(namespace).Create(context.TODO(), rs, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create k8s deployment: %v", err) + } + tests := []struct { input kubernetes.Resource output common.MapStr @@ -133,6 +187,60 @@ func TestPod_Generate(t *testing.T) { }, }, }, + { + name: "test object with owner reference to replicaset", + input: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + Namespace: namespace, + Labels: map[string]string{ + "foo": "bar", + }, + Annotations: map[string]string{ + "app": "production", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps", + Kind: "ReplicaSet", + Name: "nginx-rs", + UID: "005f3b90-4b9d-12f8-acf0-31020a8409087", + Controller: &boolean, + }, + }, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + Spec: v1.PodSpec{ + NodeName: "testnode", + }, + }, + output: common.MapStr{ + "pod": common.MapStr{ + "name": "obj", + "uid": uid, + }, + "namespace": "default", + "deployment": common.MapStr{ + "name": "nginx-deployment", + }, + "replicaset": common.MapStr{ + "name": "nginx-rs", + }, + "node": common.MapStr{ + "name": "testnode", + }, + "labels": common.MapStr{ + "foo": "bar", + }, + "annotations": common.MapStr{ + "app": "production", + }, + }, + }, } config, err := common.NewConfigFrom(map[string]interface{}{ @@ -140,7 +248,7 @@ func TestPod_Generate(t *testing.T) { }) assert.NoError(t, err) - metagen := NewPodMetadataGenerator(config, nil, nil, nil) + metagen := NewPodMetadataGenerator(config, nil, client, nil, nil) for _, test := range tests { t.Run(test.name, func(t *testing.T) { assert.Equal(t, test.output, metagen.Generate(test.input)) @@ -257,7 +365,7 @@ func TestPod_GenerateFromName(t *testing.T) { assert.NoError(t, err) pods := cache.NewStore(cache.MetaNamespaceKeyFunc) pods.Add(test.input) - metagen := NewPodMetadataGenerator(config, pods, nil, nil) + metagen := NewPodMetadataGenerator(config, pods, nil, nil, nil) accessor, err := meta.Accessor(test.input) require.NoError(t, err) @@ -376,7 +484,7 @@ func TestPod_GenerateWithNodeNamespace(t *testing.T) { namespaces.Add(test.namespace) nsMeta := NewNamespaceMetadataGenerator(config, namespaces) - metagen := NewPodMetadataGenerator(config, pods, nodeMeta, nsMeta) + metagen := NewPodMetadataGenerator(config, pods, nil, nodeMeta, nsMeta) t.Run(test.name, func(t *testing.T) { assert.Equal(t, test.output, metagen.Generate(test.input)) }) diff --git a/libbeat/processors/add_kubernetes_metadata/indexers_test.go b/libbeat/processors/add_kubernetes_metadata/indexers_test.go index e1d33f1a401e..e15105686137 100644 --- a/libbeat/processors/add_kubernetes_metadata/indexers_test.go +++ b/libbeat/processors/add_kubernetes_metadata/indexers_test.go @@ -32,7 +32,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common/kubernetes" ) -var metagen = metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil) +var metagen = metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil, nil) func TestPodIndexer(t *testing.T) { var testConfig = common.NewConfig() @@ -86,7 +86,7 @@ func TestPodIndexer(t *testing.T) { func TestPodUIDIndexer(t *testing.T) { var testConfig = common.NewConfig() - metaGenWithPodUID := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil) + metaGenWithPodUID := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, nil, nil, nil) podUIDIndexer, err := NewPodUIDIndexer(*testConfig, metaGenWithPodUID) assert.NoError(t, err) @@ -287,7 +287,7 @@ func TestFilteredGenMeta(t *testing.T) { }) assert.NoError(t, err) - filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil) + filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil, nil) podIndexer, err = NewPodNameIndexer(*testConfig, filteredGen) assert.NoError(t, err) @@ -324,7 +324,7 @@ func TestFilteredGenMetaExclusion(t *testing.T) { }) assert.NoError(t, err) - filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil) + filteredGen := metadata.NewPodMetadataGenerator(config, nil, nil, nil, nil) podIndexer, err := NewPodNameIndexer(*testConfig, filteredGen) assert.NoError(t, err) diff --git a/metricbeat/module/kubernetes/util/kubernetes_test.go b/metricbeat/module/kubernetes/util/kubernetes_test.go index bab34ef6cd30..86c28fc0798a 100644 --- a/metricbeat/module/kubernetes/util/kubernetes_test.go +++ b/metricbeat/module/kubernetes/util/kubernetes_test.go @@ -21,6 +21,7 @@ import ( "testing" "k8s.io/client-go/tools/cache" + k8s "k8s.io/client-go/kubernetes" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" @@ -157,3 +158,7 @@ func (m *mockWatcher) AddEventHandler(r kubernetes.ResourceEventHandler) { func (m *mockWatcher) Store() cache.Store { return nil } + +func (m *mockWatcher) Client() k8s.Interface { + return nil +} From 83138c9b689232da8dd5aa560d01f6c07a601c70 Mon Sep 17 00:00:00 2001 From: chrismark Date: Thu, 21 Jan 2021 14:16:32 +0200 Subject: [PATCH 3/4] lint metricbeat Signed-off-by: chrismark --- metricbeat/module/kubernetes/util/kubernetes_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metricbeat/module/kubernetes/util/kubernetes_test.go b/metricbeat/module/kubernetes/util/kubernetes_test.go index 86c28fc0798a..27ea47f9b212 100644 --- a/metricbeat/module/kubernetes/util/kubernetes_test.go +++ b/metricbeat/module/kubernetes/util/kubernetes_test.go @@ -20,8 +20,8 @@ package util import ( "testing" - "k8s.io/client-go/tools/cache" k8s "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" From 10ea526a9e1fe1e75d68cb36ab39ad2834208de1 Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 26 Jan 2021 22:29:27 +0200 Subject: [PATCH 4/4] fixes and changelog Signed-off-by: chrismark --- CHANGELOG.next.asciidoc | 1 + libbeat/common/kubernetes/metadata/pod.go | 6 ++++-- libbeat/common/kubernetes/metadata/pod_test.go | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 0480d59e246f..6fc3100acd8e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -598,6 +598,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Update the baseline version of Sarama (Kafka support library) to 1.27.2. {pull}23595[23595] - Add kubernetes.volume.fs.used.pct field. {pull}23564[23564] - Add the `enable_krb5_fast` flag to the Kafka output to explicitly opt-in to FAST authentication. {pull}23629[23629] +- Add deployment name in pod's meta. {pull}23610[23610] *Auditbeat* diff --git a/libbeat/common/kubernetes/metadata/pod.go b/libbeat/common/kubernetes/metadata/pod.go index 5e2d2439151f..991d946be750 100644 --- a/libbeat/common/kubernetes/metadata/pod.go +++ b/libbeat/common/kubernetes/metadata/pod.go @@ -63,8 +63,7 @@ func (p *pod) Generate(obj kubernetes.Resource, opts ...FieldOptions) common.Map // check if Pod is handled by a ReplicaSet which is controlled by a Deployment rsName, _ := out.GetValue("replicaset.name") - if rsName != nil { - rsName := rsName.(string) + if rsName, ok := rsName.(string); ok { dep := p.getRSDeployment(rsName, po.GetNamespace()) if dep != "" { out.Put("deployment.name", dep) @@ -114,6 +113,9 @@ func (p *pod) GenerateFromName(name string, opts ...FieldOptions) common.MapStr // getRSDeployment return the name of the Deployment object that // owns the ReplicaSet with the given name under the given Namespace func (p *pod) getRSDeployment(rsName string, ns string) string { + if p.client == nil { + return "" + } rs, err := p.client.AppsV1().ReplicaSets(ns).Get(context.TODO(), rsName, metav1.GetOptions{}) if err != nil { return "" diff --git a/libbeat/common/kubernetes/metadata/pod_test.go b/libbeat/common/kubernetes/metadata/pod_test.go index 47041f6896bf..52186a8ae727 100644 --- a/libbeat/common/kubernetes/metadata/pod_test.go +++ b/libbeat/common/kubernetes/metadata/pod_test.go @@ -87,7 +87,7 @@ func TestPod_Generate(t *testing.T) { }, } - _, err := client.AppsV1().ReplicaSets(namespace).Create(context.TODO(), rs, metav1.CreateOptions{}) + _, err := client.AppsV1().ReplicaSets(namespace).Create(context.Background(), rs, metav1.CreateOptions{}) if err != nil { t.Fatalf("failed to create k8s deployment: %v", err) }