Skip to content

Commit

Permalink
Add Dedot for Kubernetes labels and annotations (elastic#9939) (elast…
Browse files Browse the repository at this point in the history
…ic#10039)

Fixes: elastic#8384

* Add Dedot for Kubernetes labels and annotations

* Add dedot options in libbeat kubernetes metadata

* Update changelog

* Refactor TestGenerateMapStrFromEvent

* Update shared-autodiscover.asciidoc with dedot params

* Add names for each unit test case in event_test.go

* Fix rebase errors

(cherry picked from commit d5bda76)
  • Loading branch information
kaiyan-sheng authored and bashofmann committed Jan 28, 2019
1 parent 380f748 commit 988c746
Show file tree
Hide file tree
Showing 10 changed files with 342 additions and 20 deletions.
15 changes: 15 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,21 @@ https://github.com/elastic/beats/compare/v6.5.4...6.5[Check the HEAD diff]

*Metricbeat*

- Collect custom cluster `display_name` in `elasticsearch/cluster_stats` metricset. {pull}8445[8445]
- Test etcd module with etcd 3.3. {pull}9068[9068]
- All `elasticsearch` metricsets now have module-level `cluster.id` and `cluster.name` fields. {pull}8770[8770] {pull}8771[8771] {pull}9164[9164] {pull}9165[9165] {pull}9166[9166] {pull}9168[9168]
- All `elasticsearch` node-level metricsets now have `node.id` and `node.name` fields. {pull}9168[9168] {pull}9209[9209]
- Add settings to disable docker and cgroup cpu metrics per core. {issue}9187[9187] {pull}9194[9194] {pull}9589[9589]
- The `elasticsearch/node` metricset now reports the Elasticsearch cluster UUID. {pull}8771[8771]
- Support GET requests in Jolokia module. {issue}8566[8566] {pull}9226[9226]
- Add freebsd support for the uptime metricset. {pull}9413[9413]
- Add `host.os.name` field to add_host_metadata processor. {issue}8948[8948] {pull}9405[9405]
- Add field `event.dataset` which is `{module}.{metricset).
- Add more TCP statuses to `socket_summary` metricset. {pull}9430[9430]
- Remove experimental tag from ceph metricsets. {pull}9708[9708]
- Add `key` metricset to the Redis module. {issue}9582[9582] {pull}9657[9657]
- Add DeDot for kubernetes labels and annotations. {issue}9860[9860] {pull}9939[9939]

*Packetbeat*

*Winlogbeat*
Expand Down
24 changes: 19 additions & 5 deletions libbeat/common/kubernetes/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type MetaGeneratorConfig struct {
// Undocumented settings, to be deprecated in favor of `drop_fields` processor:
IncludePodUID bool `config:"include_pod_uid"`
IncludeCreatorMetadata bool `config:"include_creator_metadata"`
LabelsDedot bool `config:"labels.dedot"`
AnnotationsDedot bool `config:"annotations.dedot"`
}

type metaGenerator = MetaGeneratorConfig
Expand All @@ -54,6 +56,8 @@ func NewMetaGenerator(cfg *common.Config) (MetaGenerator, error) {
// default settings:
generator := metaGenerator{
IncludeCreatorMetadata: true,
LabelsDedot: false,
AnnotationsDedot: false,
}

err := cfg.Unpack(&generator)
Expand All @@ -71,18 +75,23 @@ func (g *metaGenerator) ResourceMetadata(obj Resource) common.MapStr {
labelMap := common.MapStr{}
if len(g.IncludeLabels) == 0 {
for k, v := range obj.GetMetadata().Labels {
safemapstr.Put(labelMap, k, v)
if g.LabelsDedot {
label := common.DeDot(k)
labelMap.Put(label, v)
} else {
safemapstr.Put(labelMap, k, v)
}
}
} else {
labelMap = generateMapSubset(objMeta.Labels, g.IncludeLabels)
labelMap = generateMapSubset(objMeta.Labels, g.IncludeLabels, g.LabelsDedot)
}

// Exclude any labels that are present in the exclude_labels config
for _, label := range g.ExcludeLabels {
delete(labelMap, label)
}

annotationsMap := generateMapSubset(objMeta.Annotations, g.IncludeAnnotations)
annotationsMap := generateMapSubset(objMeta.Annotations, g.IncludeAnnotations, g.AnnotationsDedot)
meta := common.MapStr{}
if objMeta.GetNamespace() != "" {
meta["namespace"] = objMeta.GetNamespace()
Expand Down Expand Up @@ -141,7 +150,7 @@ func (g *metaGenerator) ContainerMetadata(pod *Pod, container string) common.Map
return podMeta
}

func generateMapSubset(input map[string]string, keys []string) common.MapStr {
func generateMapSubset(input map[string]string, keys []string, dedot bool) common.MapStr {
output := common.MapStr{}
if input == nil {
return output
Expand All @@ -150,7 +159,12 @@ func generateMapSubset(input map[string]string, keys []string) common.MapStr {
for _, key := range keys {
value, ok := input[key]
if ok {
safemapstr.Put(output, key, value)
if dedot {
dedotKey := common.DeDot(key)
output.Put(dedotKey, value)
} else {
safemapstr.Put(output, key, value)
}
}
}

Expand Down
87 changes: 86 additions & 1 deletion libbeat/common/kubernetes/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/elastic/beats/libbeat/common"
)

func TestPodMetadataDeDot(t *testing.T) {
func TestPodMetadata(t *testing.T) {
withUID, _ := common.NewConfigFrom(map[string]interface{}{"include_pod_uid": true})

UID := "005f3b90-4b9d-12f8-acf0-31020a840133"
Expand Down Expand Up @@ -120,3 +120,88 @@ func TestPodMetadataDeDot(t *testing.T) {
assert.Equal(t, metaGen.PodMetadata(test.pod), test.meta)
}
}

func TestPodMetadataDeDot(t *testing.T) {
UID := "005f3b90-4b9d-12f8-acf0-31020a840133"
Deployment := "Deployment"
test := "test"
ReplicaSet := "ReplicaSet"
True := true
False := false
tests := []struct {
pod *Pod
meta common.MapStr
config *common.Config
}{
{
pod: &Pod{
Metadata: &metav1.ObjectMeta{
Labels: map[string]string{"a.key": "foo", "a": "bar"},
Uid: &UID,
Namespace: &test,
Annotations: map[string]string{"b.key": "foo", "b": "bar"},
},
Spec: &v1.PodSpec{
NodeName: &test,
},
},
meta: common.MapStr{
"pod": common.MapStr{
"name": "",
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133",
},
"node": common.MapStr{"name": "test"},
"namespace": "test",
"labels": common.MapStr{"a": "bar", "a_key": "foo"},
"annotations": common.MapStr{"b": "bar", "b_key": "foo"},
},
config: common.NewConfig(),
},
{
pod: &Pod{
Metadata: &metav1.ObjectMeta{
Labels: map[string]string{"a.key": "foo", "a": "bar"},
Uid: &UID,
OwnerReferences: []*metav1.OwnerReference{
{
Kind: &Deployment,
Name: &test,
Controller: &True,
},
{
Kind: &ReplicaSet,
Name: &ReplicaSet,
Controller: &False,
},
},
},
Spec: &v1.PodSpec{
NodeName: &test,
},
},
meta: common.MapStr{
"pod": common.MapStr{
"name": "",
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133",
},
"node": common.MapStr{"name": "test"},
"labels": common.MapStr{"a": "bar", "a_key": "foo"},
"deployment": common.MapStr{"name": "test"},
},
config: common.NewConfig(),
},
}

for _, test := range tests {
config, err := common.NewConfigFrom(map[string]interface{}{
"labels.dedot": true,
"annotations.dedot": true,
"include_annotations": []string{"b", "b.key"},
})
metaGen, err := NewMetaGenerator(config)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, metaGen.PodMetadata(test.pod), test.meta)
}
}
12 changes: 12 additions & 0 deletions libbeat/docs/shared-autodiscover.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,18 @@ event:
If the `include_annotations` config is added to the provider config, then the list of annotations present in the config
are added to the event.

If the `include_labels` config is added to the provider config, then the list of labels present in the config
will be added to the event.

If the `exclude_labels` config is added to the provider config, then the list of labels present in the config
will be excluded from the event.

if the `labels.dedot` config is set to be `true` in the provider config, then `.` in labels will be replaced with `_`.

if the `annotations.dedot` config is set to be `true` in the provider config, then `.` in annotations will be replaced
with `_`.


For example:

[source,yaml]
Expand Down
2 changes: 2 additions & 0 deletions metricbeat/module/kubernetes/_meta/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# Enriching parameters:
#add_metadata: true
#in_cluster: true
#labels.dedot: false
#annotations.dedot: false
# When used outside the cluster:
#in_cluster: false
#host: node_name
Expand Down
16 changes: 10 additions & 6 deletions metricbeat/module/kubernetes/event/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import (
)

type kubeEventsConfig struct {
InCluster bool `config:"in_cluster"`
KubeConfig string `config:"kube_config"`
Namespace string `config:"namespace"`
SyncPeriod time.Duration `config:"sync_period"`
InCluster bool `config:"in_cluster"`
KubeConfig string `config:"kube_config"`
Namespace string `config:"namespace"`
SyncPeriod time.Duration `config:"sync_period"`
LabelsDedot bool `config:"labels.dedot"`
AnnotationsDedot bool `config:"annotations.dedot"`
}

type Enabled struct {
Expand All @@ -35,8 +37,10 @@ type Enabled struct {

func defaultKubernetesEventsConfig() kubeEventsConfig {
return kubeEventsConfig{
InCluster: true,
SyncPeriod: 1 * time.Second,
InCluster: true,
SyncPeriod: 1 * time.Second,
LabelsDedot: false,
AnnotationsDedot: false,
}
}

Expand Down
44 changes: 36 additions & 8 deletions metricbeat/module/kubernetes/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,16 @@ func init() {
// MetricSet implements the mb.PushMetricSet interface, and therefore does not rely on polling.
type MetricSet struct {
mb.BaseMetricSet
watcher kubernetes.Watcher
watcher kubernetes.Watcher
watchOptions kubernetes.WatchOptions
dedotConfig dedotConfig
}

// dedotConfig defines LabelsDedot and AnnotationsDedot.
// Default to be false. If set to true, replace dots in labels with `_`.
type dedotConfig struct {
LabelsDedot bool `config:"labels.dedot"`
AnnotationsDedot bool `config:"annotations.dedot"`
}

// New create a new instance of the MetricSet
Expand All @@ -62,17 +71,26 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return nil, fmt.Errorf("fail to get kubernetes client: %s", err.Error())
}

watcher, err := kubernetes.NewWatcher(client, &kubernetes.Event{}, kubernetes.WatchOptions{
watchOptions := kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
})
}

watcher, err := kubernetes.NewWatcher(client, &kubernetes.Event{}, watchOptions)
if err != nil {
return nil, fmt.Errorf("fail to init kubernetes watcher: %s", err.Error())
}

dedotConfig := dedotConfig{
LabelsDedot: config.LabelsDedot,
AnnotationsDedot: config.AnnotationsDedot,
}

return &MetricSet{
BaseMetricSet: base,
dedotConfig: dedotConfig,
watcher: watcher,
watchOptions: watchOptions,
}, nil
}

Expand All @@ -81,10 +99,10 @@ func (m *MetricSet) Run(reporter mb.PushReporter) {
now := time.Now()
handler := kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj kubernetes.Resource) {
reporter.Event(generateMapStrFromEvent(obj.(*kubernetes.Event)))
reporter.Event(generateMapStrFromEvent(obj.(*kubernetes.Event), m.dedotConfig))
},
UpdateFunc: func(obj kubernetes.Resource) {
reporter.Event(generateMapStrFromEvent(obj.(*kubernetes.Event)))
reporter.Event(generateMapStrFromEvent(obj.(*kubernetes.Event), m.dedotConfig))
},
// ignore events that are deleted
DeleteFunc: nil,
Expand All @@ -107,7 +125,7 @@ func (m *MetricSet) Run(reporter mb.PushReporter) {
return
}

func generateMapStrFromEvent(eve *kubernetes.Event) common.MapStr {
func generateMapStrFromEvent(eve *kubernetes.Event, dedotConfig dedotConfig) common.MapStr {
eventMeta := common.MapStr{
"timestamp": common.MapStr{
"created": kubernetes.Time(eve.Metadata.CreationTimestamp).UTC(),
Expand All @@ -123,7 +141,12 @@ func generateMapStrFromEvent(eve *kubernetes.Event) common.MapStr {
if len(eve.Metadata.Labels) != 0 {
labels := make(common.MapStr, len(eve.Metadata.Labels))
for k, v := range eve.Metadata.Labels {
safemapstr.Put(labels, k, v)
if dedotConfig.LabelsDedot {
label := common.DeDot(k)
labels.Put(label, v)
} else {
safemapstr.Put(labels, k, v)
}
}

eventMeta["labels"] = labels
Expand All @@ -132,7 +155,12 @@ func generateMapStrFromEvent(eve *kubernetes.Event) common.MapStr {
if len(eve.Metadata.Annotations) != 0 {
annotations := make(common.MapStr, len(eve.Metadata.Annotations))
for k, v := range eve.Metadata.Annotations {
safemapstr.Put(annotations, k, v)
if dedotConfig.AnnotationsDedot {
annotation := common.DeDot(k)
annotations.Put(annotation, v)
} else {
safemapstr.Put(annotations, k, v)
}
}

eventMeta["annotations"] = annotations
Expand Down
Loading

0 comments on commit 988c746

Please sign in to comment.