Skip to content

Commit

Permalink
Don't emit Kubernetes autodiscover events without host (elastic#7235)
Browse files Browse the repository at this point in the history
Some kubernetes events are generated while the container is being
configured. It doesn't make sense to emit an event yet, as the host will
be update right afterwards.

This PR changes the code to ignore Pod events without an IP address.

(cherry picked from commit 7b5a7e4)
  • Loading branch information
exekias authored and Carlos Pérez-Aradros Herce committed Jun 8, 2018
1 parent 349a426 commit 7a3934c
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ https://github.com/elastic/beats/compare/v6.2.3...v6.3.0[View commits]
- Ensure that the dashboard zip files can't contain files outside of the kibana directory. {pull}6921[6921]
- Fix map overwrite panics by cloning shared structs before doing the update. {pull}6947[6947]
- Fix delays on autodiscovery events handling caused by blocking runner stops. {pull}7170[7170]
- Do not emit Kubernetes autodiscover events for Pods without IP address. {pull}7235[7235]
*Auditbeat*
Expand Down
8 changes: 8 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,16 @@ func AutodiscoverBuilder(bus bus.Bus, c *common.Config) (autodiscover.Provider,

watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj kubernetes.Resource) {
logp.Debug("kubernetes", "Watcher Pod add: %+v", obj)
p.emit(obj.(*kubernetes.Pod), "start")
},
UpdateFunc: func(obj kubernetes.Resource) {
logp.Debug("kubernetes", "Watcher Pod update: %+v", obj)
p.emit(obj.(*kubernetes.Pod), "stop")
p.emit(obj.(*kubernetes.Pod), "start")
},
DeleteFunc: func(obj kubernetes.Resource) {
logp.Debug("kubernetes", "Watcher Pod delete: %+v", obj)
time.AfterFunc(config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") })
},
})
Expand All @@ -117,6 +120,11 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []kub
containerstatuses []kubernetes.PodContainerStatus) {
host := pod.Status.PodIP

// Do not emit events without host (container is still being configured)
if host == "" {
return
}

// Collect all container IDs and runtimes from status information.
containerIDs := map[string]string{}
runtimes := map[string]string{}
Expand Down
133 changes: 133 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package kubernetes

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/autodiscover/template"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/bus"
"github.com/elastic/beats/libbeat/common/kubernetes"
)

func TestGenerateHints(t *testing.T) {
Expand Down Expand Up @@ -129,6 +132,136 @@ func TestGenerateHints(t *testing.T) {
}
}

func TestEmitEvent(t *testing.T) {
tests := []struct {
Message string
Flag string
Pod *kubernetes.Pod
Expected bus.Event
}{
{
Message: "Test common pod start",
Flag: "start",
Pod: &kubernetes.Pod{
Metadata: kubernetes.ObjectMeta{
Name: "filebeat",
Namespace: "default",
Labels: map[string]string{},
Annotations: map[string]string{},
},
Status: kubernetes.PodStatus{
PodIP: "127.0.0.1",
ContainerStatuses: []kubernetes.PodContainerStatus{
{
Name: "filebeat",
ContainerID: "docker://foobar",
},
},
},
Spec: kubernetes.PodSpec{
NodeName: "node",
Containers: []kubernetes.Container{
{
Image: "elastic/filebeat:6.3.0",
Name: "filebeat",
},
},
},
},
Expected: bus.Event{
"start": true,
"host": "127.0.0.1",
"kubernetes": common.MapStr{
"container": common.MapStr{
"id": "foobar",
"name": "filebeat",
"image": "elastic/filebeat:6.3.0",
"runtime": "docker",
},
"pod": common.MapStr{
"name": "filebeat",
},
"node": common.MapStr{
"name": "node",
},
"namespace": "default",
"annotations": common.MapStr{},
},
"meta": common.MapStr{
"kubernetes": common.MapStr{
"namespace": "default",
"container": common.MapStr{
"name": "filebeat",
}, "pod": common.MapStr{
"name": "filebeat",
}, "node": common.MapStr{
"name": "node",
},
},
},
},
},
{
Message: "Test pod without host",
Flag: "start",
Pod: &kubernetes.Pod{
Metadata: kubernetes.ObjectMeta{
Name: "filebeat",
Namespace: "default",
Labels: map[string]string{},
Annotations: map[string]string{},
},
Status: kubernetes.PodStatus{
ContainerStatuses: []kubernetes.PodContainerStatus{
{
Name: "filebeat",
ContainerID: "docker://foobar",
},
},
},
Spec: kubernetes.PodSpec{
NodeName: "node",
Containers: []kubernetes.Container{
{
Image: "elastic/filebeat:6.3.0",
Name: "filebeat",
},
},
},
},
Expected: nil,
},
}

for _, test := range tests {
mapper, err := template.NewConfigMapper(nil)
if err != nil {
t.Fatal(err)
}

metaGen := kubernetes.NewMetaGenerator(nil, nil, nil)
p := &Provider{
config: defaultConfig(),
bus: bus.New("test"),
metagen: metaGen,
templates: mapper,
}

listener := p.bus.Subscribe()

p.emit(test.Pod, test.Flag)

select {
case event := <-listener.Events():
assert.Equal(t, test.Expected, event)
case <-time.After(2 * time.Second):
if test.Expected != nil {
t.Fatal("Timeout while waiting for event")
}
}
}
}

func getNestedAnnotations(in common.MapStr) common.MapStr {
out := common.MapStr{}

Expand Down

0 comments on commit 7a3934c

Please sign in to comment.