Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update k8s autodiscover metadata in nodes earlier #29289

Merged
merged 10 commits into from
Dec 9, 2021
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
@@ -297,6 +297,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add http.pprof.enabled option to libbeat to allow http/pprof endpoints on the socket that libbeat creates for metrics. {issue}21965[21965]
- Support custom analyzers in fields.yml. {issue}28540[28540] {pull}28926[28926]
- SASL/SCRAM in the Kafka output is no longer beta. {pull}29126[29126]
- Discover changes in Kubernetes nodes metadata as soon as they happen. {pull}23139[23139]
- Support self signed certificates on outputs {pull}29229[29229]

*Auditbeat*
207 changes: 24 additions & 183 deletions libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
@@ -33,7 +33,6 @@ import (
"github.com/elastic/beats/v7/libbeat/common/bus"
"github.com/elastic/beats/v7/libbeat/common/kubernetes"
"github.com/elastic/beats/v7/libbeat/common/kubernetes/metadata"
"github.com/elastic/beats/v7/libbeat/common/safemapstr"
"github.com/elastic/beats/v7/libbeat/logp"
)

@@ -125,8 +124,13 @@ func NewPodEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pub

watcher.AddEventHandler(p)

if nodeWatcher != nil && (config.Hints.Enabled() || metaConf.Node.Enabled()) {
updater := kubernetes.NewNodePodUpdater(p.unlockedUpdate, watcher.Store(), &p.crossUpdate)
nodeWatcher.AddEventHandler(updater)
}

if namespaceWatcher != nil && (config.Hints.Enabled() || metaConf.Namespace.Enabled()) {
updater := newNamespacePodUpdater(p.unlockedUpdate, watcher.Store(), &p.crossUpdate)
updater := kubernetes.NewNamespacePodUpdater(p.unlockedUpdate, watcher.Store(), &p.crossUpdate)
namespaceWatcher.AddEventHandler(updater)
}

@@ -256,47 +260,6 @@ func (p *pod) Stop() {
}
}

type containerInPod struct {
id string
runtime string
spec kubernetes.Container
status kubernetes.PodContainerStatus
}

// getContainersInPod returns all the containers defined in a pod and their statuses.
// It includes init and ephemeral containers.
func getContainersInPod(pod *kubernetes.Pod) []*containerInPod {
var containers []*containerInPod
for _, c := range pod.Spec.Containers {
containers = append(containers, &containerInPod{spec: c})
}
for _, c := range pod.Spec.InitContainers {
containers = append(containers, &containerInPod{spec: c})
}
for _, c := range pod.Spec.EphemeralContainers {
c := kubernetes.Container(c.EphemeralContainerCommon)
containers = append(containers, &containerInPod{spec: c})
}

statuses := make(map[string]*kubernetes.PodContainerStatus)
mapStatuses := func(s []kubernetes.PodContainerStatus) {
for i := range s {
statuses[s[i].Name] = &s[i]
}
}
mapStatuses(pod.Status.ContainerStatuses)
mapStatuses(pod.Status.InitContainerStatuses)
mapStatuses(pod.Status.EphemeralContainerStatuses)
for _, c := range containers {
if s, ok := statuses[c.spec.Name]; ok {
c.id, c.runtime = kubernetes.ContainerIDWithRuntime(*s)
c.status = *s
}
}

return containers
}

// emit emits the events for the given pod according to its state and
// the given flag.
// It emits a pod event if the pod has at least a running container,
@@ -312,15 +275,15 @@ func getContainersInPod(pod *kubernetes.Pod) []*containerInPod {
// Network information is only included in events for running containers
// and for pods with at least one running container.
func (p *pod) emit(pod *kubernetes.Pod, flag string) {
annotations := podAnnotations(pod)
namespaceAnnotations := podNamespaceAnnotations(pod, p.namespaceWatcher)
annotations := kubernetes.PodAnnotations(pod)
namespaceAnnotations := kubernetes.PodNamespaceAnnotations(pod, p.namespaceWatcher)

eventList := make([][]bus.Event, 0)
portsMap := common.MapStr{}
containers := getContainersInPod(pod)
containers := kubernetes.GetContainersInPod(pod)
anyContainerRunning := false
for _, c := range containers {
if c.status.State.Running != nil {
if c.Status.State.Running != nil {
anyContainerRunning = true
}

@@ -339,7 +302,7 @@ func (p *pod) emit(pod *kubernetes.Pod, flag string) {
eventList = append([][]bus.Event{{event}}, eventList...)
}

delay := (flag == "stop" && podTerminated(pod, containers))
delay := (flag == "stop" && kubernetes.PodTerminated(pod, containers))
p.publishAll(eventList, delay)
}

@@ -350,22 +313,22 @@ func (p *pod) emit(pod *kubernetes.Pod, flag string) {
// running.
// If the container ID is unknown, only "stop" events are generated.
// It also returns a map with the named ports.
func (p *pod) containerPodEvents(flag string, pod *kubernetes.Pod, c *containerInPod, annotations, namespaceAnnotations common.MapStr) ([]bus.Event, common.MapStr) {
if c.id == "" && flag != "stop" {
func (p *pod) containerPodEvents(flag string, pod *kubernetes.Pod, c *kubernetes.ContainerInPod, annotations, namespaceAnnotations common.MapStr) ([]bus.Event, common.MapStr) {
if c.ID == "" && flag != "stop" {
return nil, nil
}

// This must be an id that doesn't depend on the state of the container
// so it works also on `stop` if containers have been already deleted.
eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.spec.Name)
eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Spec.Name)

meta := p.metagen.Generate(pod, metadata.WithFields("container.name", c.spec.Name))
meta := p.metagen.Generate(pod, metadata.WithFields("container.name", c.Spec.Name))

cmeta := common.MapStr{
"id": c.id,
"runtime": c.runtime,
"id": c.ID,
"runtime": c.Runtime,
"image": common.MapStr{
"name": c.spec.Image,
"name": c.Spec.Image,
},
}

@@ -375,16 +338,16 @@ func (p *pod) containerPodEvents(flag string, pod *kubernetes.Pod, c *containerI
kubemeta = kubemeta.Clone()
kubemeta["annotations"] = annotations
kubemeta["container"] = common.MapStr{
"id": c.id,
"name": c.spec.Name,
"image": c.spec.Image,
"runtime": c.runtime,
"id": c.ID,
"name": c.Spec.Name,
"image": c.Spec.Image,
"runtime": c.Runtime,
}
if len(namespaceAnnotations) != 0 {
kubemeta["namespace_annotations"] = namespaceAnnotations
}

ports := c.spec.Ports
ports := c.Spec.Ports
if len(ports) == 0 {
// Ensure that at least one event is generated for this container.
// Set port to zero to signify that the event is from a container
@@ -408,7 +371,7 @@ func (p *pod) containerPodEvents(flag string, pod *kubernetes.Pod, c *containerI
}
// Include network information only if the container is running,
// so templates that need network don't generate a config.
if c.status.State.Running != nil {
if c.Status.State.Running != nil {
if port.Name != "" && port.ContainerPort != 0 {
portsMap[port.Name] = port.ContainerPort
}
@@ -457,71 +420,6 @@ func (p *pod) podEvent(flag string, pod *kubernetes.Pod, ports common.MapStr, in
return event
}

// podAnnotations returns the annotations in a pod
func podAnnotations(pod *kubernetes.Pod) common.MapStr {
annotations := common.MapStr{}
for k, v := range pod.GetObjectMeta().GetAnnotations() {
safemapstr.Put(annotations, k, v)
}
return annotations
}

// podNamespaceAnnotations returns the annotations of the namespace of the pod
func podNamespaceAnnotations(pod *kubernetes.Pod, watcher kubernetes.Watcher) common.MapStr {
if watcher == nil {
return nil
}

rawNs, ok, err := watcher.Store().GetByKey(pod.Namespace)
if !ok || err != nil {
return nil
}

namespace, ok := rawNs.(*kubernetes.Namespace)
if !ok {
return nil
}

annotations := common.MapStr{}
for k, v := range namespace.GetAnnotations() {
safemapstr.Put(annotations, k, v)
}
return annotations
}

// podTerminating returns true if a pod is marked for deletion or is in a phase beyond running.
func podTerminating(pod *kubernetes.Pod) bool {
if pod.GetObjectMeta().GetDeletionTimestamp() != nil {
return true
}

switch pod.Status.Phase {
case kubernetes.PodRunning, kubernetes.PodPending:
default:
return true
}

return false
}

// podTerminated returns true if a pod is terminated, this method considers a
// pod as terminated if none of its containers are running (or going to be running).
func podTerminated(pod *kubernetes.Pod, containers []*containerInPod) bool {
// Pod is not marked for termination, so it is not terminated.
if !podTerminating(pod) {
return false
}

// If any container is running, the pod is not terminated yet.
for _, container := range containers {
if container.status.State.Running != nil {
return false
}
}

return true
}

// publishAll publishes all events in the event list in the same order. If delay is true
// publishAll schedules the publication of the events after the configured `CleanupPeriod`
// and returns inmediatelly.
@@ -540,60 +438,3 @@ func (p *pod) publishAll(eventList [][]bus.Event, delay bool) {
p.publishFunc(events)
}
}

// podUpdaterHandlerFunc is a function that handles pod updater notifications.
type podUpdaterHandlerFunc func(interface{})

// podUpdaterStore is the interface that an object needs to implement to be
// used as a pod updater store.
type podUpdaterStore interface {
List() []interface{}
}

// namespacePodUpdater notifies updates on pods when their namespaces are updated.
type namespacePodUpdater struct {
handler podUpdaterHandlerFunc
store podUpdaterStore
locker sync.Locker
}

// newNamespacePodUpdater creates a namespacePodUpdater
func newNamespacePodUpdater(handler podUpdaterHandlerFunc, store podUpdaterStore, locker sync.Locker) *namespacePodUpdater {
return &namespacePodUpdater{
handler: handler,
store: store,
locker: locker,
}
}

// OnUpdate handles update events on namespaces.
func (n *namespacePodUpdater) OnUpdate(obj interface{}) {
ns, ok := obj.(*kubernetes.Namespace)
if !ok {
return
}

// n.store.List() returns a snapshot at this point. If a delete is received
// from the main watcher, this loop may generate an update event after the
// delete is processed, leaving configurations that would never be deleted.
// Also this loop can miss updates, what could leave outdated configurations.
// Avoid these issues by locking the processing of events from the main watcher.
if n.locker != nil {
n.locker.Lock()
defer n.locker.Unlock()
}
for _, pod := range n.store.List() {
pod, ok := pod.(*kubernetes.Pod)
if ok && pod.Namespace == ns.Name {
n.handler(pod)
}
}
}

// OnAdd handles add events on namespaces. Nothing to do, if pods are added to this
// namespace they will generate their own add events.
func (*namespacePodUpdater) OnAdd(interface{}) {}

// OnDelete handles delete events on namespaces. Nothing to do, if pods are deleted from this
// namespace they will generate their own delete events.
func (*namespacePodUpdater) OnDelete(interface{}) {}
57 changes: 55 additions & 2 deletions libbeat/autodiscover/providers/kubernetes/pod_test.go
Original file line number Diff line number Diff line change
@@ -1900,14 +1900,15 @@ func TestPod_EmitEvent(t *testing.T) {
}

client := k8sfake.NewSimpleClientset()
addResourceMetadata := metadata.GetDefaultResourceMetadataConfig()
for _, test := range tests {
t.Run(test.Message, func(t *testing.T) {
mapper, err := template.NewConfigMapper(nil, nil, nil)
if err != nil {
t.Fatal(err)
}

metaGen := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, client, nil, nil, nil)
metaGen := metadata.NewPodMetadataGenerator(common.NewConfig(), nil, client, nil, nil, addResourceMetadata)
p := &Provider{
config: defaultConfig(),
bus: bus.New(logp.NewLogger("bus"), "test"),
@@ -1986,7 +1987,7 @@ func TestNamespacePodUpdater(t *testing.T) {
t.Run(title, func(t *testing.T) {
handler := &mockUpdaterHandler{}
store := &mockUpdaterStore{objects: c.pods}
updater := newNamespacePodUpdater(handler.OnUpdate, store, &sync.Mutex{})
updater := kubernetes.NewNamespacePodUpdater(handler.OnUpdate, store, &sync.Mutex{})

namespace := &kubernetes.Namespace{
ObjectMeta: metav1.ObjectMeta{
@@ -2000,6 +2001,58 @@ func TestNamespacePodUpdater(t *testing.T) {
}
}

func TestNodePodUpdater(t *testing.T) {
pod := func(name, node string) *kubernetes.Pod {
return &kubernetes.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1.PodSpec{
NodeName: node,
},
}
}

cases := map[string]struct {
pods []interface{}
expected []interface{}
}{
"no pods": {},
"two pods but only one in node": {
pods: []interface{}{
pod("onepod", "foo"),
pod("onepod", "bar"),
},
expected: []interface{}{
pod("onepod", "foo"),
},
},
"two pods but none in node": {
pods: []interface{}{
pod("onepod", "bar"),
pod("otherpod", "bar"),
},
},
}

for title, c := range cases {
t.Run(title, func(t *testing.T) {
handler := &mockUpdaterHandler{}
store := &mockUpdaterStore{objects: c.pods}
updater := kubernetes.NewNodePodUpdater(handler.OnUpdate, store, &sync.Mutex{})

node := &kubernetes.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},
}
updater.OnUpdate(node)

assert.EqualValues(t, c.expected, handler.objects)
})
}
}

type mockUpdaterHandler struct {
objects []interface{}
}
Loading