Skip to content

Commit

Permalink
Automatically enrich Kubernetes module events (#7470)
Browse files Browse the repository at this point in the history
This PR adds automatically enriching of `kubernetes` module metricsets. It will behave as `add_kubernetes_metadata`, but enrich all events coming out of this module by default. This will not only add labels annotations to Pods but any other Resource, like nodes, containers, deployments...

It will be on by default, some configurations are allowed:

```
- module: kubernetes
  metricsets:
    - pod
    - node
    ...
  in_cluster: false
  kube_config: /home/exekias/.kube/config
  host: minikube
  include_labels:
    - tier
  include_annotations:
    - ...
```

Closes #7148
  • Loading branch information
exekias authored and ruflin committed Jul 13, 2018
1 parent d0d9561 commit 98e488a
Show file tree
Hide file tree
Showing 36 changed files with 783 additions and 68 deletions.
2 changes: 1 addition & 1 deletion auditbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -3312,7 +3312,7 @@ Kubernetes pod name
--
type: keyword
Kubernetes pod uid
Kubernetes Pod UID
--
Expand Down
2 changes: 1 addition & 1 deletion auditbeat/include/fields.go

Large diffs are not rendered by default.

17 changes: 12 additions & 5 deletions deploy/kubernetes/metricbeat-kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,22 @@ metadata:
labels:
k8s-app: metricbeat
rules:
- apiGroups: [""] # "" indicates the core API group
- apiGroups: [""]
resources:
- nodes
- namespaces
- events
- pods
verbs:
- get
- watch
- list
verbs: ["get", "list", "watch"]
- apiGroups: ["extensions"]
resources:
- deployments
- replicasets
verbs: ["get", "list", "watch"]
- apiGroups: ["apps"]
resources:
- statefulsets
verbs: ["get", "list", "watch"]
---
apiVersion: v1
kind: ServiceAccount
Expand Down
17 changes: 12 additions & 5 deletions deploy/kubernetes/metricbeat/metricbeat-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,19 @@ metadata:
labels:
k8s-app: metricbeat
rules:
- apiGroups: [""] # "" indicates the core API group
- apiGroups: [""]
resources:
- nodes
- namespaces
- events
- pods
verbs:
- get
- watch
- list
verbs: ["get", "list", "watch"]
- apiGroups: ["extensions"]
resources:
- deployments
- replicasets
verbs: ["get", "list", "watch"]
- apiGroups: ["apps"]
resources:
- statefulsets
verbs: ["get", "list", "watch"]
2 changes: 1 addition & 1 deletion filebeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2343,7 +2343,7 @@ Kubernetes pod name
--
type: keyword
Kubernetes pod uid
Kubernetes Pod UID
--
Expand Down
2 changes: 1 addition & 1 deletion filebeat/include/fields.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion heartbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ Kubernetes pod name
--
type: keyword
Kubernetes pod uid
Kubernetes Pod UID
--
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/include/fields.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 44 additions & 25 deletions libbeat/common/kubernetes/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,29 @@ import (

// MetaGenerator builds metadata objects for pods and containers
type MetaGenerator interface {
// ResourceMetadata generates metadata for the given kubernetes object taking to account certain filters
ResourceMetadata(obj Resource) common.MapStr

// PodMetadata generates metadata for the given pod taking to account certain filters
PodMetadata(pod *Pod) common.MapStr

// Containermetadata generates metadata for the given container of a pod
ContainerMetadata(pod *Pod, container string) common.MapStr
}

type metaGenerator struct {
IncludeLabels []string `config:"include_labels"`
ExcludeLabels []string `config:"exclude_labels"`
IncludeAnnotations []string `config:"include_annotations"`
IncludePodUID bool `config:"include_pod_uid"`
IncludeCreatorMetadata bool `config:"include_creator_metadata"`
// MetaGeneratorConfig settings
type MetaGeneratorConfig struct {
IncludeLabels []string `config:"include_labels"`
ExcludeLabels []string `config:"exclude_labels"`
IncludeAnnotations []string `config:"include_annotations"`

// Undocumented settings, to be deprecated in favor of `drop_fields` processor:
IncludePodUID bool `config:"include_pod_uid"`
IncludeCreatorMetadata bool `config:"include_creator_metadata"`
}

type metaGenerator = MetaGeneratorConfig

// NewMetaGenerator initializes and returns a new kubernetes metadata generator
func NewMetaGenerator(cfg *common.Config) (MetaGenerator, error) {
// default settings:
Expand All @@ -52,41 +60,37 @@ func NewMetaGenerator(cfg *common.Config) (MetaGenerator, error) {
return &generator, err
}

// PodMetadata generates metadata for the given pod taking to account certain filters
func (g *metaGenerator) PodMetadata(pod *Pod) common.MapStr {
// NewMetaGeneratorFromConfig initializes and returns a new kubernetes metadata generator
func NewMetaGeneratorFromConfig(cfg *MetaGeneratorConfig) MetaGenerator {
return cfg
}

// ResourceMetadata generates metadata for the given kubernetes object taking to account certain filters
func (g *metaGenerator) ResourceMetadata(obj Resource) common.MapStr {
objMeta := obj.GetMetadata()
labelMap := common.MapStr{}
if len(g.IncludeLabels) == 0 {
for k, v := range pod.Metadata.Labels {
for k, v := range obj.GetMetadata().Labels {
safemapstr.Put(labelMap, k, v)
}
} else {
labelMap = generateMapSubset(pod.Metadata.Labels, g.IncludeLabels)
labelMap = generateMapSubset(objMeta.Labels, g.IncludeLabels)
}

// Exclude any labels that are present in the exclude_labels config
for _, label := range g.ExcludeLabels {
delete(labelMap, label)
}

annotationsMap := generateMapSubset(pod.Metadata.Annotations, g.IncludeAnnotations)
meta := common.MapStr{
"pod": common.MapStr{
"name": pod.Metadata.GetName(),
},
"node": common.MapStr{
"name": pod.Spec.GetNodeName(),
},
"namespace": pod.Metadata.GetNamespace(),
}

// Add Pod UID metadata if enabled
if g.IncludePodUID {
safemapstr.Put(meta, "pod.uid", pod.Metadata.GetUid())
annotationsMap := generateMapSubset(objMeta.Annotations, g.IncludeAnnotations)
meta := common.MapStr{}
if objMeta.GetNamespace() != "" {
meta["namespace"] = objMeta.GetNamespace()
}

// Add controller metadata if present
if g.IncludeCreatorMetadata {
for _, ref := range pod.Metadata.OwnerReferences {
for _, ref := range objMeta.OwnerReferences {
if ref.GetController() {
switch ref.GetKind() {
// TODO grow this list as we keep adding more `state_*` metricsets
Expand All @@ -110,6 +114,21 @@ func (g *metaGenerator) PodMetadata(pod *Pod) common.MapStr {
return meta
}

// PodMetadata generates metadata for the given pod taking to account certain filters
func (g *metaGenerator) PodMetadata(pod *Pod) common.MapStr {
podMeta := g.ResourceMetadata(pod)

// Add UID metadata if enabled
if g.IncludePodUID {
safemapstr.Put(podMeta, "pod.uid", pod.GetMetadata().GetUid())
}

safemapstr.Put(podMeta, "pod.name", pod.GetMetadata().GetName())
safemapstr.Put(podMeta, "node.name", pod.Spec.GetNodeName())

return podMeta
}

// Containermetadata generates metadata for the given container of a pod
func (g *metaGenerator) ContainerMetadata(pod *Pod, container string) common.MapStr {
podMeta := g.PodMetadata(pod)
Expand Down
22 changes: 12 additions & 10 deletions libbeat/common/kubernetes/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

func TestPodMetadataDeDot(t *testing.T) {
withPodUID, _ := common.NewConfigFrom(map[string]interface{}{"include_pod_uid": true})
withUID, _ := common.NewConfigFrom(map[string]interface{}{"include_pod_uid": true})

UID := "005f3b90-4b9d-12f8-acf0-31020a840133"
Deployment := "Deployment"
Expand All @@ -44,17 +44,18 @@ func TestPodMetadataDeDot(t *testing.T) {
{
pod: &Pod{
Metadata: &metav1.ObjectMeta{
Labels: map[string]string{"a.key": "foo", "a": "bar"},
Uid: &UID,
Labels: map[string]string{"a.key": "foo", "a": "bar"},
Uid: &UID,
Namespace: &test,
},
Spec: &v1.PodSpec{
NodeName: &test,
},
},
meta: common.MapStr{
"pod": common.MapStr{"name": ""},
"namespace": "",
"node": common.MapStr{"name": "test"},
"namespace": "test",
"labels": common.MapStr{"a": common.MapStr{"value": "bar", "key": "foo"}},
},
config: common.NewConfig(),
Expand All @@ -70,12 +71,14 @@ func TestPodMetadataDeDot(t *testing.T) {
},
},
meta: common.MapStr{
"pod": common.MapStr{"name": "", "uid": "005f3b90-4b9d-12f8-acf0-31020a840133"},
"namespace": "",
"node": common.MapStr{"name": "test"},
"labels": common.MapStr{"a": common.MapStr{"value": "bar", "key": "foo"}},
"pod": common.MapStr{
"name": "",
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133",
},
"node": common.MapStr{"name": "test"},
"labels": common.MapStr{"a": common.MapStr{"value": "bar", "key": "foo"}},
},
config: withPodUID,
config: withUID,
},
{
pod: &Pod{
Expand All @@ -101,7 +104,6 @@ func TestPodMetadataDeDot(t *testing.T) {
},
meta: common.MapStr{
"pod": common.MapStr{"name": ""},
"namespace": "",
"node": common.MapStr{"name": "test"},
"labels": common.MapStr{"a": common.MapStr{"value": "bar", "key": "foo"}},
"deployment": common.MapStr{"name": "test"},
Expand Down
14 changes: 14 additions & 0 deletions libbeat/common/kubernetes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
"time"

"github.com/ericchiang/k8s"
appsv1 "github.com/ericchiang/k8s/apis/apps/v1beta1"
"github.com/ericchiang/k8s/apis/core/v1"
extv1 "github.com/ericchiang/k8s/apis/extensions/v1beta1"
metav1 "github.com/ericchiang/k8s/apis/meta/v1"
)

Expand All @@ -46,6 +48,9 @@ type PodSpec = v1.PodSpec
// PodStatus data
type PodStatus = v1.PodStatus

// Node data
type Node = v1.Node

// Container data
type Container = v1.Container

Expand All @@ -58,6 +63,15 @@ type Event = v1.Event
// PodContainerStatus data
type PodContainerStatus = v1.ContainerStatus

// Deployment data
type Deployment = appsv1.Deployment

// ReplicaSet data
type ReplicaSet = extv1.ReplicaSet

// StatefulSet data
type StatefulSet = appsv1.StatefulSet

// Time extracts time from k8s.Time type
func Time(t *metav1.Time) time.Time {
return time.Unix(t.GetSeconds(), int64(t.GetNanos()))
Expand Down
47 changes: 46 additions & 1 deletion libbeat/common/kubernetes/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"time"

"github.com/ericchiang/k8s"
appsv1 "github.com/ericchiang/k8s/apis/apps/v1beta1"
"github.com/ericchiang/k8s/apis/core/v1"
extv1 "github.com/ericchiang/k8s/apis/extensions/v1beta1"

"github.com/elastic/beats/libbeat/logp"
)
Expand Down Expand Up @@ -109,6 +111,50 @@ func NewWatcher(client *k8s.Client, resource Resource, options WatchOptions) (Wa
}
return rs
}
case *Node:
list := &v1.NodeList{}
w.resourceList = list
w.k8sResourceFactory = func() k8s.Resource { return &v1.Node{} }
w.items = func() []k8s.Resource {
rs := make([]k8s.Resource, 0, len(list.Items))
for _, item := range list.Items {
rs = append(rs, item)
}
return rs
}
case *Deployment:
list := &appsv1.DeploymentList{}
w.resourceList = list
w.k8sResourceFactory = func() k8s.Resource { return &appsv1.Deployment{} }
w.items = func() []k8s.Resource {
rs := make([]k8s.Resource, 0, len(list.Items))
for _, item := range list.Items {
rs = append(rs, item)
}
return rs
}
case *ReplicaSet:
list := &extv1.ReplicaSetList{}
w.resourceList = list
w.k8sResourceFactory = func() k8s.Resource { return &extv1.ReplicaSet{} }
w.items = func() []k8s.Resource {
rs := make([]k8s.Resource, 0, len(list.Items))
for _, item := range list.Items {
rs = append(rs, item)
}
return rs
}
case *StatefulSet:
list := &appsv1.StatefulSetList{}
w.resourceList = list
w.k8sResourceFactory = func() k8s.Resource { return &appsv1.StatefulSet{} }
w.items = func() []k8s.Resource {
rs := make([]k8s.Resource, 0, len(list.Items))
for _, item := range list.Items {
rs = append(rs, item)
}
return rs
}
default:
return nil, fmt.Errorf("unsupported resource type for watching %T", resource)
}
Expand Down Expand Up @@ -163,7 +209,6 @@ func (w *watcher) onDelete(obj Resource) {

// Start watching pods
func (w *watcher) Start() error {

// Make sure that events don't flow into the annotator before informer is fully set up
// Sync initial state:
err := w.sync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
- name: pod.uid
type: keyword
description: >
Kubernetes pod uid
Kubernetes Pod UID
- name: namespace
type: keyword
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -6945,7 +6945,7 @@ Kubernetes pod name
--
type: keyword
Kubernetes pod uid
Kubernetes Pod UID
--
Expand Down
Loading

0 comments on commit 98e488a

Please sign in to comment.