diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 1d3d2f99f50..c342ab40c3d 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -122,6 +122,7 @@ https://github.com/elastic/beats/compare/v6.2.3...v6.3.0[View commits] - Ensure that the dashboard zip files can't contain files outside of the kibana directory. {pull}6921[6921] - Fix map overwrite panics by cloning shared structs before doing the update. {pull}6947[6947] - Fix delays on autodiscovery events handling caused by blocking runner stops. {pull}7170[7170] +- Do not emit Kubernetes autodiscover events for Pods without IP address. {pull}7235[7235] *Auditbeat* diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index 6ce757ac0ee..ac337e7181b 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -84,13 +84,16 @@ func AutodiscoverBuilder(bus bus.Bus, c *common.Config) (autodiscover.Provider, watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{ AddFunc: func(obj kubernetes.Resource) { + logp.Debug("kubernetes", "Watcher Pod add: %+v", obj) p.emit(obj.(*kubernetes.Pod), "start") }, UpdateFunc: func(obj kubernetes.Resource) { + logp.Debug("kubernetes", "Watcher Pod update: %+v", obj) p.emit(obj.(*kubernetes.Pod), "stop") p.emit(obj.(*kubernetes.Pod), "start") }, DeleteFunc: func(obj kubernetes.Resource) { + logp.Debug("kubernetes", "Watcher Pod delete: %+v", obj) time.AfterFunc(config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") }) }, }) @@ -117,6 +120,11 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []kub containerstatuses []kubernetes.PodContainerStatus) { host := pod.Status.PodIP + // Do not emit events without host (container is still being configured) + if host == "" { + return + } + // Collect all container IDs and runtimes from status information. containerIDs := map[string]string{} runtimes := map[string]string{} diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go b/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go index 810692eff25..b2dcfaebf22 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes_test.go @@ -2,11 +2,14 @@ package kubernetes import ( "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/elastic/beats/libbeat/autodiscover/template" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/bus" + "github.com/elastic/beats/libbeat/common/kubernetes" ) func TestGenerateHints(t *testing.T) { @@ -129,6 +132,136 @@ func TestGenerateHints(t *testing.T) { } } +func TestEmitEvent(t *testing.T) { + tests := []struct { + Message string + Flag string + Pod *kubernetes.Pod + Expected bus.Event + }{ + { + Message: "Test common pod start", + Flag: "start", + Pod: &kubernetes.Pod{ + Metadata: kubernetes.ObjectMeta{ + Name: "filebeat", + Namespace: "default", + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + Status: kubernetes.PodStatus{ + PodIP: "127.0.0.1", + ContainerStatuses: []kubernetes.PodContainerStatus{ + { + Name: "filebeat", + ContainerID: "docker://foobar", + }, + }, + }, + Spec: kubernetes.PodSpec{ + NodeName: "node", + Containers: []kubernetes.Container{ + { + Image: "elastic/filebeat:6.3.0", + Name: "filebeat", + }, + }, + }, + }, + Expected: bus.Event{ + "start": true, + "host": "127.0.0.1", + "kubernetes": common.MapStr{ + "container": common.MapStr{ + "id": "foobar", + "name": "filebeat", + "image": "elastic/filebeat:6.3.0", + "runtime": "docker", + }, + "pod": common.MapStr{ + "name": "filebeat", + }, + "node": common.MapStr{ + "name": "node", + }, + "namespace": "default", + "annotations": common.MapStr{}, + }, + "meta": common.MapStr{ + "kubernetes": common.MapStr{ + "namespace": "default", + "container": common.MapStr{ + "name": "filebeat", + }, "pod": common.MapStr{ + "name": "filebeat", + }, "node": common.MapStr{ + "name": "node", + }, + }, + }, + }, + }, + { + Message: "Test pod without host", + Flag: "start", + Pod: &kubernetes.Pod{ + Metadata: kubernetes.ObjectMeta{ + Name: "filebeat", + Namespace: "default", + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + Status: kubernetes.PodStatus{ + ContainerStatuses: []kubernetes.PodContainerStatus{ + { + Name: "filebeat", + ContainerID: "docker://foobar", + }, + }, + }, + Spec: kubernetes.PodSpec{ + NodeName: "node", + Containers: []kubernetes.Container{ + { + Image: "elastic/filebeat:6.3.0", + Name: "filebeat", + }, + }, + }, + }, + Expected: nil, + }, + } + + for _, test := range tests { + mapper, err := template.NewConfigMapper(nil) + if err != nil { + t.Fatal(err) + } + + metaGen := kubernetes.NewMetaGenerator(nil, nil, nil) + p := &Provider{ + config: defaultConfig(), + bus: bus.New("test"), + metagen: metaGen, + templates: mapper, + } + + listener := p.bus.Subscribe() + + p.emit(test.Pod, test.Flag) + + select { + case event := <-listener.Events(): + assert.Equal(t, test.Expected, event) + case <-time.After(2 * time.Second): + if test.Expected != nil { + t.Fatal("Timeout while waiting for event") + } + } + } +} + func getNestedAnnotations(in common.MapStr) common.MapStr { out := common.MapStr{}