Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix stopping of modules started by kubernetes autodiscover #10476

Merged
merged 11 commits into from
Feb 7, 2019
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix TLS certificate DoS vulnerability. {pull}10302[10302]
- Fix panic and file unlock in spool on atomic operation (arm, x86-32). File lock was not released when panic occurs, leading to the beat deadlocking on startup. {pull}10289[10289]
- Fix encoding of timestamps when using disk spool. {issue}10099[10099]
- Fix stopping of modules started by kubernetes autodiscover. {pull}10476[10476]
- Fix a issue when remote and local configuration didn't match when fetching configuration from Central Management. {issue}10587[10587]

*Auditbeat*
Expand Down
28 changes: 19 additions & 9 deletions libbeat/autodiscover/providers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kubernetes

import (
"fmt"
"time"

"github.com/gofrs/uuid"
Expand Down Expand Up @@ -144,12 +145,16 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []*ku
containerstatuses []*kubernetes.PodContainerStatus) {
host := pod.Status.GetPodIP()

// Do not emit events without host (container is still being configured)
if host == "" {
// If the container doesn't exist in the runtime or its network
// is not configured, it won't have an IP. Skip it as we cannot
// generate configs without host, and an update will arrive when
// the container is ready.
// If stopping, emit the event in any case to ensure cleanup.
if host == "" && flag != "stop" {
return
}

// Collect all container IDs and runtimes from status information.
// Collect all runtimes from status information.
containerIDs := map[string]string{}
runtimes := map[string]string{}
for _, c := range containerstatuses {
Expand All @@ -160,13 +165,18 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []*ku

// Emit container and port information
for _, c := range containers {
// If it doesn't have an ID, container doesn't exist in
// the runtime, emit only an event if we are stopping, so
// we are sure of cleaning up configurations.
cid := containerIDs[c.GetName()]

// If there is a container ID that is empty then ignore it. It either means that the container is still starting
// up or the container is shutting down.
if cid == "" {
if cid == "" && flag != "stop" {
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about the case where the container is still starting?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that just checking for the ID doesn't ensure that the container is ready, so it wouldn't matter so much. But the truth is that there is no need to change this here. I will keep the check for non-stop events.

Thanks for taking the time to review this!

}

// This must be an id that doesn't depend on the state of the container
// so it works also on `stop` if containers have been already deleted.
eventId := fmt.Sprintf("%s.%s", pod.Metadata.GetUid(), c.GetName())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var eventId should be eventID


cmeta := common.MapStr{
"id": cid,
"name": c.GetName(),
Expand All @@ -190,7 +200,7 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []*ku
if len(c.Ports) == 0 {
event := bus.Event{
"provider": p.uuid,
"id": cid,
"id": eventId,
flag: true,
"host": host,
"kubernetes": kubemeta,
Expand All @@ -204,7 +214,7 @@ func (p *Provider) emitEvents(pod *kubernetes.Pod, flag string, containers []*ku
for _, port := range c.Ports {
event := bus.Event{
"provider": p.uuid,
"id": cid,
"id": eventId,
flag: true,
"host": host,
"port": port.GetContainerPort(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func TestEmitEvent(t *testing.T) {
uid := "005f3b90-4b9d-12f8-acf0-31020a840133"
containerImage := "elastic/filebeat:6.3.0"
node := "node"
cid := "005f3b90-4b9d-12f8-acf0-31020a840133.filebeat"
UUID, err := uuid.NewV4()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -204,7 +205,7 @@ func TestEmitEvent(t *testing.T) {
Expected: bus.Event{
"start": true,
"host": "127.0.0.1",
"id": "foobar",
"id": cid,
"provider": UUID,
"kubernetes": common.MapStr{
"container": common.MapStr{
Expand Down