Skip to content

Commit

Permalink
Don't emit Kubernetes autodiscover events without host (#7235)
Browse files Browse the repository at this point in the history
Some kubernetes events are generated while the container is being
configured. It doesn't make sense to emit an event yet, as the host will
be update right afterwards.

This PR changes the code to ignore Pod events without an IP address.
  • Loading branch information
exekias authored and ruflin committed Jun 4, 2018
1 parent d2a0de2 commit 7b5a7e4
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
- Negotiate Docker API version from our client instead of using a hardcoded one. {pull}7165[7165]
- Fix delays on autodiscovery events handling caused by blocking runner stops. {pull}7170[7170]
- Error out on invalid Autodiscover template conditions settings. {pull}7200[7200]
- Do not emit Kubernetes autodiscover events for Pods without IP address. {pull}7235[7235]

*Auditbeat*

Expand Down
8 changes: 8 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,16 @@ func AutodiscoverBuilder(bus bus.Bus, c *common.Config) (autodiscover.Provider,

watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj kubernetes.Resource) {
logp.Debug("kubernetes", "Watcher Pod add: %+v", obj)
p.emit(obj.(*kubernetes.Pod), "start")
},
UpdateFunc: func(obj kubernetes.Resource) {
logp.Debug("kubernetes", "Watcher Pod update: %+v", obj)
p.emit(obj.(*kubernetes.Pod), "stop")
p.emit(obj.(*kubernetes.Pod), "start")
},
DeleteFunc: func(obj kubernetes.Resource) {
logp.Debug("kubernetes", "Watcher Pod delete: %+v", obj)
time.AfterFunc(config.CleanupTimeout, func() { p.emit(obj.(*kubernetes.Pod), "stop") })
},
})
Expand All @@ -120,6 +123,11 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []kub
containerstatuses []kubernetes.PodContainerStatus) {
host := pod.Status.PodIP

// Do not emit events without host (container is still being configured)
if host == "" {
return
}

// Collect all container IDs and runtimes from status information.
containerIDs := map[string]string{}
runtimes := map[string]string{}
Expand Down
137 changes: 137 additions & 0 deletions libbeat/autodiscover/providers/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package kubernetes

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/autodiscover/template"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/bus"
"github.com/elastic/beats/libbeat/common/kubernetes"
)

func TestGenerateHints(t *testing.T) {
Expand Down Expand Up @@ -129,6 +132,140 @@ func TestGenerateHints(t *testing.T) {
}
}

func TestEmitEvent(t *testing.T) {
tests := []struct {
Message string
Flag string
Pod *kubernetes.Pod
Expected bus.Event
}{
{
Message: "Test common pod start",
Flag: "start",
Pod: &kubernetes.Pod{
Metadata: kubernetes.ObjectMeta{
Name: "filebeat",
Namespace: "default",
Labels: map[string]string{},
Annotations: map[string]string{},
},
Status: kubernetes.PodStatus{
PodIP: "127.0.0.1",
ContainerStatuses: []kubernetes.PodContainerStatus{
{
Name: "filebeat",
ContainerID: "docker://foobar",
},
},
},
Spec: kubernetes.PodSpec{
NodeName: "node",
Containers: []kubernetes.Container{
{
Image: "elastic/filebeat:6.3.0",
Name: "filebeat",
},
},
},
},
Expected: bus.Event{
"start": true,
"host": "127.0.0.1",
"kubernetes": common.MapStr{
"container": common.MapStr{
"id": "foobar",
"name": "filebeat",
"image": "elastic/filebeat:6.3.0",
"runtime": "docker",
},
"pod": common.MapStr{
"name": "filebeat",
},
"node": common.MapStr{
"name": "node",
},
"namespace": "default",
"annotations": common.MapStr{},
},
"meta": common.MapStr{
"kubernetes": common.MapStr{
"namespace": "default",
"container": common.MapStr{
"name": "filebeat",
}, "pod": common.MapStr{
"name": "filebeat",
}, "node": common.MapStr{
"name": "node",
},
},
},
},
},
{
Message: "Test pod without host",
Flag: "start",
Pod: &kubernetes.Pod{
Metadata: kubernetes.ObjectMeta{
Name: "filebeat",
Namespace: "default",
Labels: map[string]string{},
Annotations: map[string]string{},
},
Status: kubernetes.PodStatus{
ContainerStatuses: []kubernetes.PodContainerStatus{
{
Name: "filebeat",
ContainerID: "docker://foobar",
},
},
},
Spec: kubernetes.PodSpec{
NodeName: "node",
Containers: []kubernetes.Container{
{
Image: "elastic/filebeat:6.3.0",
Name: "filebeat",
},
},
},
},
Expected: nil,
},
}

for _, test := range tests {
mapper, err := template.NewConfigMapper(nil)
if err != nil {
t.Fatal(err)
}

metaGen, err := kubernetes.NewMetaGenerator(common.NewConfig())
if err != nil {
t.Fatal(err)
}

p := &Provider{
config: defaultConfig(),
bus: bus.New("test"),
metagen: metaGen,
templates: mapper,
}

listener := p.bus.Subscribe()

p.emit(test.Pod, test.Flag)

select {
case event := <-listener.Events():
assert.Equal(t, test.Expected, event)
case <-time.After(2 * time.Second):
if test.Expected != nil {
t.Fatal("Timeout while waiting for event")
}
}
}
}

func getNestedAnnotations(in common.MapStr) common.MapStr {
out := common.MapStr{}

Expand Down

0 comments on commit 7b5a7e4

Please sign in to comment.