From 9610ef9ed94a362b9319819a03e325e76708a14e Mon Sep 17 00:00:00 2001 From: David Ortiz Date: Tue, 10 Sep 2024 14:27:54 +0200 Subject: [PATCH] [ksm] Init a single kubelet reflector for all pod collectors (#29145) --- pkg/kubestatemetrics/builder/builder.go | 30 ++- pkg/kubestatemetrics/builder/kubelet_pods.go | 92 ++++++-- .../builder/kubelet_pods_stub.go | 19 +- .../builder/kubelet_pods_test.go | 200 +++++++++++------- 4 files changed, 247 insertions(+), 94 deletions(-) diff --git a/pkg/kubestatemetrics/builder/builder.go b/pkg/kubestatemetrics/builder/builder.go index 148318bc1d6fb..56fa8ff3e0c19 100644 --- a/pkg/kubestatemetrics/builder/builder.go +++ b/pkg/kubestatemetrics/builder/builder.go @@ -50,6 +50,7 @@ type Builder struct { collectPodsFromKubelet bool collectOnlyUnassignedPods bool + KubeletReflector *kubeletReflector } // New returns new Builder instance @@ -161,7 +162,17 @@ func (b *Builder) Build() metricsstore.MetricsWriterList { // BuildStores initializes and registers all enabled stores. // It returns metric cache stores. func (b *Builder) BuildStores() [][]cache.Store { - return b.ksmBuilder.BuildStores() + stores := b.ksmBuilder.BuildStores() + + if b.KubeletReflector != nil { + // Starting the reflector here allows us to start just one for all stores. + err := b.KubeletReflector.start(b.ctx) + if err != nil { + log.Errorf("Failed to start the kubelet reflector: %s", err) + } + } + + return stores } // WithResync is used if a resync period is configured @@ -302,7 +313,22 @@ func (c *cacheEnabledListerWatcher) List(options v1.ListOptions) (runtime.Object func handlePodCollection[T any](b *Builder, store cache.Store, client T, listWatchFunc func(kubeClient T, ns string, fieldSelector string) cache.ListerWatcher, namespace string, useAPIServerCache bool) { if b.collectPodsFromKubelet { - b.startKubeletPodWatcher(store, namespace) + if b.KubeletReflector == nil { + kr, err := newKubeletReflector(b.namespaces) + if err != nil { + log.Errorf("Failed to create kubeletReflector: %s", err) + return + } + b.KubeletReflector = &kr + } + + err := b.KubeletReflector.addStore(store) + if err != nil { + log.Errorf("Failed to add store to kubeletReflector: %s", err) + return + } + + // The kubelet reflector will be started when all stores are added. return } diff --git a/pkg/kubestatemetrics/builder/kubelet_pods.go b/pkg/kubestatemetrics/builder/kubelet_pods.go index ce7af8ce6683c..c0a50018c110a 100644 --- a/pkg/kubestatemetrics/builder/kubelet_pods.go +++ b/pkg/kubestatemetrics/builder/kubelet_pods.go @@ -10,6 +10,7 @@ package builder import ( "context" "fmt" + "slices" "strings" "time" @@ -22,57 +23,107 @@ import ( "github.com/DataDog/datadog-agent/pkg/util/log" ) -// PodWatcher is an interface for a component that watches for changes in pods -type PodWatcher interface { +const ( + podWatcherExpiryDuration = 15 * time.Second + updateStoresPeriod = 5 * time.Second +) + +// podWatcher is an interface for a component that watches for changes in pods +type podWatcher interface { PullChanges(ctx context.Context) ([]*kubelet.Pod, error) Expire() ([]string, error) } -func (b *Builder) startKubeletPodWatcher(store cache.Store, namespace string) { - podWatcher, err := kubelet.NewPodWatcher(15 * time.Second) +type kubeletReflector struct { + namespaces []string + watchAllNamespaces bool + podWatcher podWatcher + + // Having an array of stores allows us to have a single watcher for all the + // collectors configured (by default it's the pods one plus "pods_extended") + stores []cache.Store + + started bool +} + +func newKubeletReflector(namespaces []string) (kubeletReflector, error) { + watcher, err := kubelet.NewPodWatcher(podWatcherExpiryDuration) if err != nil { - log.Warnf("Failed to create pod watcher: %s", err) + return kubeletReflector{}, fmt.Errorf("failed to create kubelet-based reflector: %w", err) + } + + watchAllNamespaces := slices.Contains(namespaces, corev1.NamespaceAll) + + return kubeletReflector{ + namespaces: namespaces, + watchAllNamespaces: watchAllNamespaces, + podWatcher: watcher, + }, nil +} + +func (kr *kubeletReflector) addStore(store cache.Store) error { + if kr.started { + return fmt.Errorf("cannot add store after reflector has started") } - ticker := time.NewTicker(5 * time.Second) + kr.stores = append(kr.stores, store) + + return nil +} + +// start starts the reflector. It should be called only once after all the +// stores have been added +func (kr *kubeletReflector) start(context context.Context) error { + if kr.started { + return fmt.Errorf("reflector already started") + } + + kr.started = true + + ticker := time.NewTicker(updateStoresPeriod) go func() { for { select { case <-ticker.C: - err = updateStore(b.ctx, store, podWatcher, namespace) + err := kr.updateStores(context) if err != nil { - log.Errorf("Failed to update store: %s", err) + log.Errorf("Failed to update stores: %s", err) } - case <-b.ctx.Done(): + case <-context.Done(): ticker.Stop() return } } }() + + return nil } -func updateStore(ctx context.Context, store cache.Store, podWatcher PodWatcher, namespace string) error { - pods, err := podWatcher.PullChanges(ctx) +func (kr *kubeletReflector) updateStores(ctx context.Context) error { + pods, err := kr.podWatcher.PullChanges(ctx) if err != nil { return fmt.Errorf("failed to pull changes from pod watcher: %w", err) } for _, pod := range pods { - if namespace != corev1.NamespaceAll && pod.Metadata.Namespace != namespace { + if !kr.watchAllNamespaces && !slices.Contains(kr.namespaces, pod.Metadata.Namespace) { continue } kubePod := kubelet.ConvertKubeletPodToK8sPod(pod) - err = store.Add(kubePod) - if err != nil { - log.Warnf("Failed to add pod to KSM store: %s", err) + for _, store := range kr.stores { + err := store.Add(kubePod) + if err != nil { + // log instead of returning error to continue updating other stores + log.Warnf("Failed to add pod to store: %s", err) + } } } - expiredEntities, err := podWatcher.Expire() + expiredEntities, err := kr.podWatcher.Expire() if err != nil { return fmt.Errorf("failed to expire pods: %w", err) } @@ -91,9 +142,12 @@ func updateStore(ctx context.Context, store cache.Store, podWatcher PodWatcher, }, } - err = store.Delete(&expiredPod) - if err != nil { - log.Warnf("Failed to delete pod from KSM store: %s", err) + for _, store := range kr.stores { + err := store.Delete(&expiredPod) + if err != nil { + // log instead of returning error to continue updating other stores + log.Warnf("Failed to delete pod from store: %s", err) + } } } diff --git a/pkg/kubestatemetrics/builder/kubelet_pods_stub.go b/pkg/kubestatemetrics/builder/kubelet_pods_stub.go index b4da17ab6227d..7682655232056 100644 --- a/pkg/kubestatemetrics/builder/kubelet_pods_stub.go +++ b/pkg/kubestatemetrics/builder/kubelet_pods_stub.go @@ -8,9 +8,24 @@ package builder import ( + "context" + "k8s.io/client-go/tools/cache" ) -func (b *Builder) startKubeletPodWatcher(_ cache.Store, _ string) { - // Do nothing +// When the Kubelet flag is not set, we don't need a kubeletReflector, so we can +// return a struct that does nothing + +type kubeletReflector struct{} + +func newKubeletReflector(_ []string) (kubeletReflector, error) { + return kubeletReflector{}, nil +} + +func (kr *kubeletReflector) addStore(_ cache.Store) error { + return nil +} + +func (kr *kubeletReflector) start(_ context.Context) error { + return nil } diff --git a/pkg/kubestatemetrics/builder/kubelet_pods_test.go b/pkg/kubestatemetrics/builder/kubelet_pods_test.go index 94f5f26a798ee..a9020b2143549 100644 --- a/pkg/kubestatemetrics/builder/kubelet_pods_test.go +++ b/pkg/kubestatemetrics/builder/kubelet_pods_test.go @@ -9,10 +9,11 @@ package builder import ( "context" + "slices" "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -83,83 +84,140 @@ func (m *MockStore) Resync() error { return nil } -func TestUpdateStore_AddPodToStore(t *testing.T) { - store := new(MockStore) - podWatcher := new(MockPodWatcher) - - kubeletPod := &kubelet.Pod{ - Metadata: kubelet.PodMetadata{ - Name: "test-pod", - Namespace: "default", - UID: "12345", +func TestUpdateStores_AddPods(t *testing.T) { + tests := []struct { + name string + reflectorNamespaces []string + addedPodNamespace string + podShouldBeAdded bool + }{ + { + name: "add pod in watched namespace", + reflectorNamespaces: []string{"default"}, + addedPodNamespace: "default", + podShouldBeAdded: true, }, - } - - kubernetesPod := kubelet.ConvertKubeletPodToK8sPod(kubeletPod) - - podWatcher.On("PullChanges", mock.Anything).Return([]*kubelet.Pod{kubeletPod}, nil) - podWatcher.On("Expire").Return([]string{}, nil) - store.On("Add", kubernetesPod).Return(nil) - - err := updateStore(context.TODO(), store, podWatcher, "default") - assert.NoError(t, err) - - store.AssertCalled(t, "Add", kubernetesPod) -} - -func TestUpdateStore_FilterPodsByNamespace(t *testing.T) { - store := new(MockStore) - podWatcher := new(MockPodWatcher) - - kubeletPod := &kubelet.Pod{ - Metadata: kubelet.PodMetadata{ - Name: "test-pod", - Namespace: "other-namespace", - UID: "12345", + { + name: "add pod in non-watched namespace", + reflectorNamespaces: []string{"default"}, + addedPodNamespace: "other-namespace", + podShouldBeAdded: false, + }, + { + name: "reflector watches all pods", + reflectorNamespaces: []string{corev1.NamespaceAll}, + addedPodNamespace: "default", + podShouldBeAdded: true, }, } - store.On("Add", mock.Anything).Return(nil) - podWatcher.On("PullChanges", mock.Anything).Return([]*kubelet.Pod{kubeletPod}, nil) - podWatcher.On("Expire").Return([]string{}, nil) - - err := updateStore(context.TODO(), store, podWatcher, "default") - assert.NoError(t, err) - - // Add() shouldn't be called because the pod is in a different namespace - store.AssertNotCalled(t, "Add", mock.Anything) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + stores := []*MockStore{ + new(MockStore), + new(MockStore), + } + for _, store := range stores { + store.On("Add", mock.Anything).Return(nil) + } + + watcher := new(MockPodWatcher) + + kubeletPod := &kubelet.Pod{ + Metadata: kubelet.PodMetadata{ + Namespace: test.addedPodNamespace, + Name: "test-pod", + UID: "12345", + }, + } + + kubernetesPod := kubelet.ConvertKubeletPodToK8sPod(kubeletPod) + + watcher.On("PullChanges", mock.Anything).Return([]*kubelet.Pod{kubeletPod}, nil) + watcher.On("Expire").Return([]string{}, nil) + + reflector := kubeletReflector{ + namespaces: test.reflectorNamespaces, + watchAllNamespaces: slices.Contains(test.reflectorNamespaces, corev1.NamespaceAll), + podWatcher: watcher, + } + + for _, store := range stores { + err := reflector.addStore(store) + require.NoError(t, err) + } + + err := reflector.updateStores(context.TODO()) + require.NoError(t, err) + + if test.podShouldBeAdded { + for _, store := range stores { + store.AssertCalled(t, "Add", kubernetesPod) + } + } else { + for _, store := range stores { + store.AssertNotCalled(t, "Add", mock.Anything) + } + } + }) + } } -func TestUpdateStore_HandleExpiredPods(t *testing.T) { - store := new(MockStore) - podWatcher := new(MockPodWatcher) - podUID := "kubernetes_pod://pod-12345" - kubernetesPod := corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - UID: types.UID("pod-12345"), +func TestUpdateStores_HandleExpired(t *testing.T) { + tests := []struct { + name string + expiredUID string + expectedPodToBeDeleted *corev1.Pod + }{ + { + name: "expired pod", + expiredUID: "kubernetes_pod://pod-12345", + expectedPodToBeDeleted: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID("pod-12345"), + }, + }, + }, + { + name: "expired container", + expiredUID: "container-12345", + expectedPodToBeDeleted: nil, }, } - podWatcher.On("PullChanges", mock.Anything).Return([]*kubelet.Pod{}, nil) - podWatcher.On("Expire").Return([]string{podUID}, nil) - store.On("Delete", &kubernetesPod).Return(nil) - - err := updateStore(context.TODO(), store, podWatcher, "default") - assert.NoError(t, err) - - store.AssertCalled(t, "Delete", &kubernetesPod) -} - -func TestUpdateStore_HandleExpiredContainers(t *testing.T) { - store := new(MockStore) - podWatcher := new(MockPodWatcher) - - podWatcher.On("PullChanges", mock.Anything).Return([]*kubelet.Pod{}, nil) - podWatcher.On("Expire").Return([]string{"container-12345"}, nil) - - err := updateStore(context.TODO(), store, podWatcher, "default") - assert.NoError(t, err) - - // Delete() shouldn't be called because the expired entity is not a pod - store.AssertNotCalled(t, "Delete", mock.Anything) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + stores := []*MockStore{ + new(MockStore), + new(MockStore), + } + for _, store := range stores { + store.On("Delete", mock.Anything).Return(nil) + } + + watcher := new(MockPodWatcher) + watcher.On("PullChanges", mock.Anything).Return([]*kubelet.Pod{}, nil) + watcher.On("Expire").Return([]string{test.expiredUID}, nil) + + reflector := kubeletReflector{ + namespaces: []string{"default"}, + podWatcher: watcher, + } + for _, store := range stores { + err := reflector.addStore(store) + require.NoError(t, err) + } + + err := reflector.updateStores(context.TODO()) + require.NoError(t, err) + + for _, store := range stores { + if test.expectedPodToBeDeleted != nil { + store.AssertCalled(t, "Delete", test.expectedPodToBeDeleted) + } else { + store.AssertNotCalled(t, "Delete", mock.Anything) + } + } + }) + } }