Skip to content

Commit

Permalink
[ksm] Add reflector based on Kubelet
Browse files Browse the repository at this point in the history
  • Loading branch information
davidor committed Aug 27, 2024
1 parent c0a5c11 commit d50bae1
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 0 deletions.
86 changes: 86 additions & 0 deletions pkg/kubestatemetrics/builder/kubelet_pods.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build kubeapiserver && kubelet

package builder

import (
"strings"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"

"github.com/DataDog/datadog-agent/pkg/util/kubernetes/kubelet"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

func (b *Builder) startKubeletPodWatcher(store cache.Store, namespace string) {
podWatcher, err := kubelet.NewPodWatcher(15 * time.Second)
if err != nil {
log.Warnf("Failed to create pod watcher: %s", err)
}

ticker := time.NewTicker(5 * time.Second)

go func() {
for {
select {
case <-ticker.C:
pods, err := podWatcher.PullChanges(b.ctx)
if err != nil {
log.Warnf("Failed to pull changes from pod watcher: %s", err)
continue
}

for _, pod := range pods {
if namespace != corev1.NamespaceAll && pod.Metadata.Namespace != namespace {
continue
}

kubePod := kubelet.ConvertKubeletPodToK8sPod(pod)

err = store.Add(kubePod)
if err != nil {
log.Warnf("Failed to add pod to KSM store: %s", err)
}
}

expiredEntities, err := podWatcher.Expire()
if err != nil {
log.Warnf("Failed to expire pods: %s", err)
continue
}

for _, expiredEntity := range expiredEntities {
// Expire() returns both pods and containers, we only care
// about pods
if !strings.HasPrefix(expiredEntity, kubelet.KubePodPrefix) {
continue
}

// Only the UID is needed to be able to delete
expiredPod := corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(strings.TrimPrefix(expiredEntity, kubelet.KubePodPrefix)),
},
}

err = store.Delete(&expiredPod)
if err != nil {
log.Warnf("Failed to delete pod from KSM store: %s", err)
}
}

case <-b.ctx.Done():
ticker.Stop()
return
}
}
}()
}
16 changes: 16 additions & 0 deletions pkg/kubestatemetrics/builder/kubelet_pods_stub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build kubeapiserver && !kubelet

package builder

import (
"k8s.io/client-go/tools/cache"
)

func (b *Builder) startKubeletPodWatcher(_ cache.Store, _ string) {
// Do nothing
}

0 comments on commit d50bae1

Please sign in to comment.