From 988c7460f4f623848301f2fec0c81ba2828fb171 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Mon, 14 Jan 2019 07:36:25 -0700 Subject: [PATCH] Add Dedot for Kubernetes labels and annotations (#9939) (#10039) Fixes: #8384 * Add Dedot for Kubernetes labels and annotations * Add dedot options in libbeat kubernetes metadata * Update changelog * Refactor TestGenerateMapStrFromEvent * Update shared-autodiscover.asciidoc with dedot params * Add names for each unit test case in event_test.go * Fix rebase errors (cherry picked from commit d5bda769e4eed723fe79d1209959ae15e8c691b4) --- CHANGELOG.next.asciidoc | 15 ++ libbeat/common/kubernetes/metadata.go | 24 ++- libbeat/common/kubernetes/metadata_test.go | 87 +++++++++- libbeat/docs/shared-autodiscover.asciidoc | 12 ++ metricbeat/module/kubernetes/_meta/config.yml | 2 + metricbeat/module/kubernetes/event/config.go | 16 +- metricbeat/module/kubernetes/event/event.go | 44 ++++- .../module/kubernetes/event/event_test.go | 154 ++++++++++++++++++ metricbeat/modules.d/kubernetes.yml.disabled | 2 + x-pack/filebeat/filebeat.reference.yml | 6 + 10 files changed, 342 insertions(+), 20 deletions(-) create mode 100644 metricbeat/module/kubernetes/event/event_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 03baf48f9e5..7bf9d1c1657 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -72,6 +72,21 @@ https://github.com/elastic/beats/compare/v6.5.4...6.5[Check the HEAD diff] *Metricbeat* +- Collect custom cluster `display_name` in `elasticsearch/cluster_stats` metricset. {pull}8445[8445] +- Test etcd module with etcd 3.3. {pull}9068[9068] +- All `elasticsearch` metricsets now have module-level `cluster.id` and `cluster.name` fields. {pull}8770[8770] {pull}8771[8771] {pull}9164[9164] {pull}9165[9165] {pull}9166[9166] {pull}9168[9168] +- All `elasticsearch` node-level metricsets now have `node.id` and `node.name` fields. {pull}9168[9168] {pull}9209[9209] +- Add settings to disable docker and cgroup cpu metrics per core. {issue}9187[9187] {pull}9194[9194] {pull}9589[9589] +- The `elasticsearch/node` metricset now reports the Elasticsearch cluster UUID. {pull}8771[8771] +- Support GET requests in Jolokia module. {issue}8566[8566] {pull}9226[9226] +- Add freebsd support for the uptime metricset. {pull}9413[9413] +- Add `host.os.name` field to add_host_metadata processor. {issue}8948[8948] {pull}9405[9405] +- Add field `event.dataset` which is `{module}.{metricset). +- Add more TCP statuses to `socket_summary` metricset. {pull}9430[9430] +- Remove experimental tag from ceph metricsets. {pull}9708[9708] +- Add `key` metricset to the Redis module. {issue}9582[9582] {pull}9657[9657] +- Add DeDot for kubernetes labels and annotations. {issue}9860[9860] {pull}9939[9939] + *Packetbeat* *Winlogbeat* diff --git a/libbeat/common/kubernetes/metadata.go b/libbeat/common/kubernetes/metadata.go index cc997d5c6d5..f5e8a4152d1 100644 --- a/libbeat/common/kubernetes/metadata.go +++ b/libbeat/common/kubernetes/metadata.go @@ -45,6 +45,8 @@ type MetaGeneratorConfig struct { // Undocumented settings, to be deprecated in favor of `drop_fields` processor: IncludePodUID bool `config:"include_pod_uid"` IncludeCreatorMetadata bool `config:"include_creator_metadata"` + LabelsDedot bool `config:"labels.dedot"` + AnnotationsDedot bool `config:"annotations.dedot"` } type metaGenerator = MetaGeneratorConfig @@ -54,6 +56,8 @@ func NewMetaGenerator(cfg *common.Config) (MetaGenerator, error) { // default settings: generator := metaGenerator{ IncludeCreatorMetadata: true, + LabelsDedot: false, + AnnotationsDedot: false, } err := cfg.Unpack(&generator) @@ -71,10 +75,15 @@ func (g *metaGenerator) ResourceMetadata(obj Resource) common.MapStr { labelMap := common.MapStr{} if len(g.IncludeLabels) == 0 { for k, v := range obj.GetMetadata().Labels { - safemapstr.Put(labelMap, k, v) + if g.LabelsDedot { + label := common.DeDot(k) + labelMap.Put(label, v) + } else { + safemapstr.Put(labelMap, k, v) + } } } else { - labelMap = generateMapSubset(objMeta.Labels, g.IncludeLabels) + labelMap = generateMapSubset(objMeta.Labels, g.IncludeLabels, g.LabelsDedot) } // Exclude any labels that are present in the exclude_labels config @@ -82,7 +91,7 @@ func (g *metaGenerator) ResourceMetadata(obj Resource) common.MapStr { delete(labelMap, label) } - annotationsMap := generateMapSubset(objMeta.Annotations, g.IncludeAnnotations) + annotationsMap := generateMapSubset(objMeta.Annotations, g.IncludeAnnotations, g.AnnotationsDedot) meta := common.MapStr{} if objMeta.GetNamespace() != "" { meta["namespace"] = objMeta.GetNamespace() @@ -141,7 +150,7 @@ func (g *metaGenerator) ContainerMetadata(pod *Pod, container string) common.Map return podMeta } -func generateMapSubset(input map[string]string, keys []string) common.MapStr { +func generateMapSubset(input map[string]string, keys []string, dedot bool) common.MapStr { output := common.MapStr{} if input == nil { return output @@ -150,7 +159,12 @@ func generateMapSubset(input map[string]string, keys []string) common.MapStr { for _, key := range keys { value, ok := input[key] if ok { - safemapstr.Put(output, key, value) + if dedot { + dedotKey := common.DeDot(key) + output.Put(dedotKey, value) + } else { + safemapstr.Put(output, key, value) + } } } diff --git a/libbeat/common/kubernetes/metadata_test.go b/libbeat/common/kubernetes/metadata_test.go index 31e7b3e8c5b..a5c2fe493ef 100644 --- a/libbeat/common/kubernetes/metadata_test.go +++ b/libbeat/common/kubernetes/metadata_test.go @@ -27,7 +27,7 @@ import ( "github.com/elastic/beats/libbeat/common" ) -func TestPodMetadataDeDot(t *testing.T) { +func TestPodMetadata(t *testing.T) { withUID, _ := common.NewConfigFrom(map[string]interface{}{"include_pod_uid": true}) UID := "005f3b90-4b9d-12f8-acf0-31020a840133" @@ -120,3 +120,88 @@ func TestPodMetadataDeDot(t *testing.T) { assert.Equal(t, metaGen.PodMetadata(test.pod), test.meta) } } + +func TestPodMetadataDeDot(t *testing.T) { + UID := "005f3b90-4b9d-12f8-acf0-31020a840133" + Deployment := "Deployment" + test := "test" + ReplicaSet := "ReplicaSet" + True := true + False := false + tests := []struct { + pod *Pod + meta common.MapStr + config *common.Config + }{ + { + pod: &Pod{ + Metadata: &metav1.ObjectMeta{ + Labels: map[string]string{"a.key": "foo", "a": "bar"}, + Uid: &UID, + Namespace: &test, + Annotations: map[string]string{"b.key": "foo", "b": "bar"}, + }, + Spec: &v1.PodSpec{ + NodeName: &test, + }, + }, + meta: common.MapStr{ + "pod": common.MapStr{ + "name": "", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{"name": "test"}, + "namespace": "test", + "labels": common.MapStr{"a": "bar", "a_key": "foo"}, + "annotations": common.MapStr{"b": "bar", "b_key": "foo"}, + }, + config: common.NewConfig(), + }, + { + pod: &Pod{ + Metadata: &metav1.ObjectMeta{ + Labels: map[string]string{"a.key": "foo", "a": "bar"}, + Uid: &UID, + OwnerReferences: []*metav1.OwnerReference{ + { + Kind: &Deployment, + Name: &test, + Controller: &True, + }, + { + Kind: &ReplicaSet, + Name: &ReplicaSet, + Controller: &False, + }, + }, + }, + Spec: &v1.PodSpec{ + NodeName: &test, + }, + }, + meta: common.MapStr{ + "pod": common.MapStr{ + "name": "", + "uid": "005f3b90-4b9d-12f8-acf0-31020a840133", + }, + "node": common.MapStr{"name": "test"}, + "labels": common.MapStr{"a": "bar", "a_key": "foo"}, + "deployment": common.MapStr{"name": "test"}, + }, + config: common.NewConfig(), + }, + } + + for _, test := range tests { + config, err := common.NewConfigFrom(map[string]interface{}{ + "labels.dedot": true, + "annotations.dedot": true, + "include_annotations": []string{"b", "b.key"}, + }) + metaGen, err := NewMetaGenerator(config) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, metaGen.PodMetadata(test.pod), test.meta) + } +} diff --git a/libbeat/docs/shared-autodiscover.asciidoc b/libbeat/docs/shared-autodiscover.asciidoc index e0e63080a4e..68b27e8ea75 100644 --- a/libbeat/docs/shared-autodiscover.asciidoc +++ b/libbeat/docs/shared-autodiscover.asciidoc @@ -129,6 +129,18 @@ event: If the `include_annotations` config is added to the provider config, then the list of annotations present in the config are added to the event. +If the `include_labels` config is added to the provider config, then the list of labels present in the config +will be added to the event. + +If the `exclude_labels` config is added to the provider config, then the list of labels present in the config +will be excluded from the event. + +if the `labels.dedot` config is set to be `true` in the provider config, then `.` in labels will be replaced with `_`. + +if the `annotations.dedot` config is set to be `true` in the provider config, then `.` in annotations will be replaced +with `_`. + + For example: [source,yaml] diff --git a/metricbeat/module/kubernetes/_meta/config.yml b/metricbeat/module/kubernetes/_meta/config.yml index 82288c62012..ec678aa79b5 100644 --- a/metricbeat/module/kubernetes/_meta/config.yml +++ b/metricbeat/module/kubernetes/_meta/config.yml @@ -17,6 +17,8 @@ # Enriching parameters: #add_metadata: true #in_cluster: true + #labels.dedot: false + #annotations.dedot: false # When used outside the cluster: #in_cluster: false #host: node_name diff --git a/metricbeat/module/kubernetes/event/config.go b/metricbeat/module/kubernetes/event/config.go index d2117634028..daabe2b292e 100644 --- a/metricbeat/module/kubernetes/event/config.go +++ b/metricbeat/module/kubernetes/event/config.go @@ -23,10 +23,12 @@ import ( ) type kubeEventsConfig struct { - InCluster bool `config:"in_cluster"` - KubeConfig string `config:"kube_config"` - Namespace string `config:"namespace"` - SyncPeriod time.Duration `config:"sync_period"` + InCluster bool `config:"in_cluster"` + KubeConfig string `config:"kube_config"` + Namespace string `config:"namespace"` + SyncPeriod time.Duration `config:"sync_period"` + LabelsDedot bool `config:"labels.dedot"` + AnnotationsDedot bool `config:"annotations.dedot"` } type Enabled struct { @@ -35,8 +37,10 @@ type Enabled struct { func defaultKubernetesEventsConfig() kubeEventsConfig { return kubeEventsConfig{ - InCluster: true, - SyncPeriod: 1 * time.Second, + InCluster: true, + SyncPeriod: 1 * time.Second, + LabelsDedot: false, + AnnotationsDedot: false, } } diff --git a/metricbeat/module/kubernetes/event/event.go b/metricbeat/module/kubernetes/event/event.go index 6aeab8996f2..a6813624eed 100644 --- a/metricbeat/module/kubernetes/event/event.go +++ b/metricbeat/module/kubernetes/event/event.go @@ -41,7 +41,16 @@ func init() { // MetricSet implements the mb.PushMetricSet interface, and therefore does not rely on polling. type MetricSet struct { mb.BaseMetricSet - watcher kubernetes.Watcher + watcher kubernetes.Watcher + watchOptions kubernetes.WatchOptions + dedotConfig dedotConfig +} + +// dedotConfig defines LabelsDedot and AnnotationsDedot. +// Default to be false. If set to true, replace dots in labels with `_`. +type dedotConfig struct { + LabelsDedot bool `config:"labels.dedot"` + AnnotationsDedot bool `config:"annotations.dedot"` } // New create a new instance of the MetricSet @@ -62,17 +71,26 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return nil, fmt.Errorf("fail to get kubernetes client: %s", err.Error()) } - watcher, err := kubernetes.NewWatcher(client, &kubernetes.Event{}, kubernetes.WatchOptions{ + watchOptions := kubernetes.WatchOptions{ SyncTimeout: config.SyncPeriod, Namespace: config.Namespace, - }) + } + + watcher, err := kubernetes.NewWatcher(client, &kubernetes.Event{}, watchOptions) if err != nil { return nil, fmt.Errorf("fail to init kubernetes watcher: %s", err.Error()) } + dedotConfig := dedotConfig{ + LabelsDedot: config.LabelsDedot, + AnnotationsDedot: config.AnnotationsDedot, + } + return &MetricSet{ BaseMetricSet: base, + dedotConfig: dedotConfig, watcher: watcher, + watchOptions: watchOptions, }, nil } @@ -81,10 +99,10 @@ func (m *MetricSet) Run(reporter mb.PushReporter) { now := time.Now() handler := kubernetes.ResourceEventHandlerFuncs{ AddFunc: func(obj kubernetes.Resource) { - reporter.Event(generateMapStrFromEvent(obj.(*kubernetes.Event))) + reporter.Event(generateMapStrFromEvent(obj.(*kubernetes.Event), m.dedotConfig)) }, UpdateFunc: func(obj kubernetes.Resource) { - reporter.Event(generateMapStrFromEvent(obj.(*kubernetes.Event))) + reporter.Event(generateMapStrFromEvent(obj.(*kubernetes.Event), m.dedotConfig)) }, // ignore events that are deleted DeleteFunc: nil, @@ -107,7 +125,7 @@ func (m *MetricSet) Run(reporter mb.PushReporter) { return } -func generateMapStrFromEvent(eve *kubernetes.Event) common.MapStr { +func generateMapStrFromEvent(eve *kubernetes.Event, dedotConfig dedotConfig) common.MapStr { eventMeta := common.MapStr{ "timestamp": common.MapStr{ "created": kubernetes.Time(eve.Metadata.CreationTimestamp).UTC(), @@ -123,7 +141,12 @@ func generateMapStrFromEvent(eve *kubernetes.Event) common.MapStr { if len(eve.Metadata.Labels) != 0 { labels := make(common.MapStr, len(eve.Metadata.Labels)) for k, v := range eve.Metadata.Labels { - safemapstr.Put(labels, k, v) + if dedotConfig.LabelsDedot { + label := common.DeDot(k) + labels.Put(label, v) + } else { + safemapstr.Put(labels, k, v) + } } eventMeta["labels"] = labels @@ -132,7 +155,12 @@ func generateMapStrFromEvent(eve *kubernetes.Event) common.MapStr { if len(eve.Metadata.Annotations) != 0 { annotations := make(common.MapStr, len(eve.Metadata.Annotations)) for k, v := range eve.Metadata.Annotations { - safemapstr.Put(annotations, k, v) + if dedotConfig.AnnotationsDedot { + annotation := common.DeDot(k) + annotations.Put(annotation, v) + } else { + safemapstr.Put(annotations, k, v) + } } eventMeta["annotations"] = annotations diff --git a/metricbeat/module/kubernetes/event/event_test.go b/metricbeat/module/kubernetes/event/event_test.go new file mode 100644 index 00000000000..5fe869e3694 --- /dev/null +++ b/metricbeat/module/kubernetes/event/event_test.go @@ -0,0 +1,154 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package event + +import ( + "testing" + + "github.com/ericchiang/k8s/apis/core/v1" + k8s_io_apimachinery_pkg_apis_meta_v1 "github.com/ericchiang/k8s/apis/meta/v1" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common" +) + +func TestGenerateMapStrFromEvent(t *testing.T) { + labels := map[string]string{ + "app.kubernetes.io/name": "mysql", + "app.kubernetes.io/version": "5.7.21", + "app.kubernetes.io/component": "database", + } + + annotations := map[string]string{ + "prometheus.io/path": "/metrics", + "prometheus.io/port": "9102", + "prometheus.io/scheme": "http", + "prometheus.io/scrape": "false", + } + + expectedLabelsMapStrWithDot := common.MapStr{ + "app": common.MapStr{ + "kubernetes": common.MapStr{ + "io/version": "5.7.21", + "io/component": "database", + "io/name": "mysql", + }, + }, + } + + expectedLabelsMapStrWithDeDot := common.MapStr{ + "app_kubernetes_io/name": "mysql", + "app_kubernetes_io/version": "5.7.21", + "app_kubernetes_io/component": "database", + } + + expectedAnnotationsMapStrWithDot := common.MapStr{ + "prometheus": common.MapStr{ + "io/path": "/metrics", + "io/port": "9102", + "io/scheme": "http", + "io/scrape": "false", + }, + } + + expectedAnnotationsMapStrWithDeDot := common.MapStr{ + "prometheus_io/path": "/metrics", + "prometheus_io/port": "9102", + "prometheus_io/scheme": "http", + "prometheus_io/scrape": "false", + } + + testCases := map[string]struct { + mockEvent v1.Event + expectedMetadata common.MapStr + dedotConfig dedotConfig + }{ + "no dedots": { + mockEvent: v1.Event{ + Metadata: &k8s_io_apimachinery_pkg_apis_meta_v1.ObjectMeta{ + Labels: labels, + Annotations: annotations, + }, + }, + expectedMetadata: common.MapStr{ + "labels": expectedLabelsMapStrWithDot, + "annotations": expectedAnnotationsMapStrWithDot, + }, + dedotConfig: dedotConfig{ + LabelsDedot: false, + AnnotationsDedot: false, + }, + }, + "dedot labels": { + mockEvent: v1.Event{ + Metadata: &k8s_io_apimachinery_pkg_apis_meta_v1.ObjectMeta{ + Labels: labels, + Annotations: annotations, + }, + }, + expectedMetadata: common.MapStr{ + "labels": expectedLabelsMapStrWithDeDot, + "annotations": expectedAnnotationsMapStrWithDot, + }, + dedotConfig: dedotConfig{ + LabelsDedot: true, + AnnotationsDedot: false, + }, + }, + "dedot annotatoins": { + mockEvent: v1.Event{ + Metadata: &k8s_io_apimachinery_pkg_apis_meta_v1.ObjectMeta{ + Labels: labels, + Annotations: annotations, + }, + }, + expectedMetadata: common.MapStr{ + "labels": expectedLabelsMapStrWithDot, + "annotations": expectedAnnotationsMapStrWithDeDot, + }, + dedotConfig: dedotConfig{ + LabelsDedot: false, + AnnotationsDedot: true, + }, + }, + "dedot both labels and annotations": { + mockEvent: v1.Event{ + Metadata: &k8s_io_apimachinery_pkg_apis_meta_v1.ObjectMeta{ + Labels: labels, + Annotations: annotations, + }, + }, + expectedMetadata: common.MapStr{ + "labels": expectedLabelsMapStrWithDeDot, + "annotations": expectedAnnotationsMapStrWithDeDot, + }, + dedotConfig: dedotConfig{ + LabelsDedot: true, + AnnotationsDedot: true, + }, + }, + } + + for name, test := range testCases { + t.Run(name, func(t *testing.T) { + mapStrOutput := generateMapStrFromEvent(&test.mockEvent, test.dedotConfig) + assert.Equal(t, test.expectedMetadata["labels"], mapStrOutput["metadata"].(common.MapStr)["labels"]) + assert.Equal(t, test.expectedMetadata["annotations"], mapStrOutput["metadata"].(common.MapStr)["annotations"]) + }) + } +} diff --git a/metricbeat/modules.d/kubernetes.yml.disabled b/metricbeat/modules.d/kubernetes.yml.disabled index c54eb96a35a..fd3b3975971 100644 --- a/metricbeat/modules.d/kubernetes.yml.disabled +++ b/metricbeat/modules.d/kubernetes.yml.disabled @@ -20,6 +20,8 @@ # Enriching parameters: #add_metadata: true #in_cluster: true + #labels.dedot: false + #annotations.dedot: false # When used outside the cluster: #in_cluster: false #host: node_name diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index fb80e4aca50..5eabdf8a35d 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -767,6 +767,12 @@ filebeat.inputs: # This option is not supported on Windows. #filebeat.registry_file_permissions: 0600 +# The timeout value that controls when registry entries are written to disk +# (flushed). When an unwritten update exceeds this value, it triggers a write to +# disk. When registry_flush is set to 0s, the registry is written to disk after +# each batch of events has been published successfully. The default value is 0s. +#filebeat.registry_flush: 0s + # By default Ingest pipelines are not updated if a pipeline with the same ID # already exists. If this option is enabled Filebeat overwrites pipelines # everytime a new Elasticsearch connection is established.