Skip to content

Commit

Permalink
[ksm] Init a single kubelet reflector for all pod collectors (#29145)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidor authored Sep 10, 2024
1 parent 31d57c7 commit 9610ef9
Show file tree
Hide file tree
Showing 4 changed files with 247 additions and 94 deletions.
30 changes: 28 additions & 2 deletions pkg/kubestatemetrics/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Builder struct {

collectPodsFromKubelet bool
collectOnlyUnassignedPods bool
KubeletReflector *kubeletReflector
}

// New returns new Builder instance
Expand Down Expand Up @@ -161,7 +162,17 @@ func (b *Builder) Build() metricsstore.MetricsWriterList {
// BuildStores initializes and registers all enabled stores.
// It returns metric cache stores.
func (b *Builder) BuildStores() [][]cache.Store {
return b.ksmBuilder.BuildStores()
stores := b.ksmBuilder.BuildStores()

if b.KubeletReflector != nil {
// Starting the reflector here allows us to start just one for all stores.
err := b.KubeletReflector.start(b.ctx)
if err != nil {
log.Errorf("Failed to start the kubelet reflector: %s", err)
}
}

return stores
}

// WithResync is used if a resync period is configured
Expand Down Expand Up @@ -302,7 +313,22 @@ func (c *cacheEnabledListerWatcher) List(options v1.ListOptions) (runtime.Object

func handlePodCollection[T any](b *Builder, store cache.Store, client T, listWatchFunc func(kubeClient T, ns string, fieldSelector string) cache.ListerWatcher, namespace string, useAPIServerCache bool) {
if b.collectPodsFromKubelet {
b.startKubeletPodWatcher(store, namespace)
if b.KubeletReflector == nil {
kr, err := newKubeletReflector(b.namespaces)
if err != nil {
log.Errorf("Failed to create kubeletReflector: %s", err)
return
}
b.KubeletReflector = &kr
}

err := b.KubeletReflector.addStore(store)
if err != nil {
log.Errorf("Failed to add store to kubeletReflector: %s", err)
return
}

// The kubelet reflector will be started when all stores are added.
return
}

Expand Down
92 changes: 73 additions & 19 deletions pkg/kubestatemetrics/builder/kubelet_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package builder
import (
"context"
"fmt"
"slices"
"strings"
"time"

Expand All @@ -22,57 +23,107 @@ import (
"github.com/DataDog/datadog-agent/pkg/util/log"
)

// PodWatcher is an interface for a component that watches for changes in pods
type PodWatcher interface {
const (
podWatcherExpiryDuration = 15 * time.Second
updateStoresPeriod = 5 * time.Second
)

// podWatcher is an interface for a component that watches for changes in pods
type podWatcher interface {
PullChanges(ctx context.Context) ([]*kubelet.Pod, error)
Expire() ([]string, error)
}

func (b *Builder) startKubeletPodWatcher(store cache.Store, namespace string) {
podWatcher, err := kubelet.NewPodWatcher(15 * time.Second)
type kubeletReflector struct {
namespaces []string
watchAllNamespaces bool
podWatcher podWatcher

// Having an array of stores allows us to have a single watcher for all the
// collectors configured (by default it's the pods one plus "pods_extended")
stores []cache.Store

started bool
}

func newKubeletReflector(namespaces []string) (kubeletReflector, error) {
watcher, err := kubelet.NewPodWatcher(podWatcherExpiryDuration)
if err != nil {
log.Warnf("Failed to create pod watcher: %s", err)
return kubeletReflector{}, fmt.Errorf("failed to create kubelet-based reflector: %w", err)
}

watchAllNamespaces := slices.Contains(namespaces, corev1.NamespaceAll)

return kubeletReflector{
namespaces: namespaces,
watchAllNamespaces: watchAllNamespaces,
podWatcher: watcher,
}, nil
}

func (kr *kubeletReflector) addStore(store cache.Store) error {
if kr.started {
return fmt.Errorf("cannot add store after reflector has started")
}

ticker := time.NewTicker(5 * time.Second)
kr.stores = append(kr.stores, store)

return nil
}

// start starts the reflector. It should be called only once after all the
// stores have been added
func (kr *kubeletReflector) start(context context.Context) error {
if kr.started {
return fmt.Errorf("reflector already started")
}

kr.started = true

ticker := time.NewTicker(updateStoresPeriod)

go func() {
for {
select {
case <-ticker.C:
err = updateStore(b.ctx, store, podWatcher, namespace)
err := kr.updateStores(context)
if err != nil {
log.Errorf("Failed to update store: %s", err)
log.Errorf("Failed to update stores: %s", err)
}

case <-b.ctx.Done():
case <-context.Done():
ticker.Stop()
return
}
}
}()

return nil
}

func updateStore(ctx context.Context, store cache.Store, podWatcher PodWatcher, namespace string) error {
pods, err := podWatcher.PullChanges(ctx)
func (kr *kubeletReflector) updateStores(ctx context.Context) error {
pods, err := kr.podWatcher.PullChanges(ctx)
if err != nil {
return fmt.Errorf("failed to pull changes from pod watcher: %w", err)
}

for _, pod := range pods {
if namespace != corev1.NamespaceAll && pod.Metadata.Namespace != namespace {
if !kr.watchAllNamespaces && !slices.Contains(kr.namespaces, pod.Metadata.Namespace) {
continue
}

kubePod := kubelet.ConvertKubeletPodToK8sPod(pod)

err = store.Add(kubePod)
if err != nil {
log.Warnf("Failed to add pod to KSM store: %s", err)
for _, store := range kr.stores {
err := store.Add(kubePod)
if err != nil {
// log instead of returning error to continue updating other stores
log.Warnf("Failed to add pod to store: %s", err)
}
}
}

expiredEntities, err := podWatcher.Expire()
expiredEntities, err := kr.podWatcher.Expire()
if err != nil {
return fmt.Errorf("failed to expire pods: %w", err)
}
Expand All @@ -91,9 +142,12 @@ func updateStore(ctx context.Context, store cache.Store, podWatcher PodWatcher,
},
}

err = store.Delete(&expiredPod)
if err != nil {
log.Warnf("Failed to delete pod from KSM store: %s", err)
for _, store := range kr.stores {
err := store.Delete(&expiredPod)
if err != nil {
// log instead of returning error to continue updating other stores
log.Warnf("Failed to delete pod from store: %s", err)
}
}
}

Expand Down
19 changes: 17 additions & 2 deletions pkg/kubestatemetrics/builder/kubelet_pods_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,24 @@
package builder

import (
"context"

"k8s.io/client-go/tools/cache"
)

func (b *Builder) startKubeletPodWatcher(_ cache.Store, _ string) {
// Do nothing
// When the Kubelet flag is not set, we don't need a kubeletReflector, so we can
// return a struct that does nothing

type kubeletReflector struct{}

func newKubeletReflector(_ []string) (kubeletReflector, error) {
return kubeletReflector{}, nil
}

func (kr *kubeletReflector) addStore(_ cache.Store) error {
return nil
}

func (kr *kubeletReflector) start(_ context.Context) error {
return nil
}
Loading

0 comments on commit 9610ef9

Please sign in to comment.