Skip to content

Commit

Permalink
[ksm] Allow collecting pod metrics from the Kubelet (#28811)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidor authored Sep 2, 2024
1 parent 2d91aa4 commit 2e7d2bb
Show file tree
Hide file tree
Showing 11 changed files with 1,131 additions and 24 deletions.
4 changes: 2 additions & 2 deletions pkg/collector/corechecks/cluster/ksm/customresources/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,15 @@ func (f *extendedPodFactory) ExpectedType() interface{} {
}

// ListWatch returns a ListerWatcher for v1.Pod
//
//nolint:revive // TODO(CINT) Fix revive linter
func (f *extendedPodFactory) ListWatch(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcher {
client := customResourceClient.(clientset.Interface)
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
opts.FieldSelector = fieldSelector
return client.CoreV1().Pods(ns).List(context.TODO(), opts)
},
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
opts.FieldSelector = fieldSelector
return client.CoreV1().Pods(ns).Watch(context.TODO(), opts)
},
}
Expand Down
98 changes: 90 additions & 8 deletions pkg/collector/corechecks/cluster/ksm/kubernetes_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
configUtils "github.com/DataDog/datadog-agent/pkg/config/utils"
kubestatemetrics "github.com/DataDog/datadog-agent/pkg/kubestatemetrics/builder"
ksmstore "github.com/DataDog/datadog-agent/pkg/kubestatemetrics/store"
"github.com/DataDog/datadog-agent/pkg/util/flavor"
hostnameUtil "github.com/DataDog/datadog-agent/pkg/util/hostname"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver"
Expand Down Expand Up @@ -67,6 +68,37 @@ var extendedCollectors = map[string]string{

var matchAllCap = regexp.MustCompile("([a-z0-9])([A-Z])")

type podCollectionMode string

const (
// defaultPodCollection is the default mode where pods are collected from
// the API server.
defaultPodCollection podCollectionMode = "default"

// nodeKubeletPodCollection is the mode where pods are collected from the
// kubelet.
//
// This is meant to be enabled when the check is running on the node agent.
// This is useful in clusters with a large number of pods where emitting pod
// metrics from a single instance might be too much and cause performance
// issues.
//
// One thing to note is that when the node agent collects metrics from the
// kubelet and the cluster agent or cluster check runner collects metrics
// for other resources, label joins are not supported for pod metrics if the
// join source is not a pod.
nodeKubeletPodCollection podCollectionMode = "node_kubelet"

// clusterUnassignedPodCollection is the mode where pods are collected from
// the API server but only unassigned pods.
//
// This is meant to be enabled when the check is running on the cluster
// agent or the cluster check runner and "nodeKubeletPodCollection" is
// enabled on the node agents, because unassigned pods cannot be collected
// from node agents.
clusterUnassignedPodCollection podCollectionMode = "cluster_unassigned"
)

// KSMConfig contains the check config parameters
type KSMConfig struct {
// Collectors defines the resource type collectors.
Expand Down Expand Up @@ -149,6 +181,10 @@ type KSMConfig struct {

// UseAPIServerCache enables the use of the API server cache for the check
UseAPIServerCache bool `yaml:"use_apiserver_cache"`

// PodCollectionMode defines how pods are collected.
// Accepted values are: "default", "node_kubelet", and "cluster_unassigned".
PodCollectionMode podCollectionMode `yaml:"pod_collection_mode"`
}

// KSMCheck wraps the config and the metric stores needed to run the check
Expand All @@ -160,6 +196,7 @@ type KSMCheck struct {
telemetry *telemetryCache
cancel context.CancelFunc
isCLCRunner bool
isRunningOnNodeAgent bool
clusterNameTagValue string
clusterNameRFC1123 string
metricNamesMapper map[string]string
Expand Down Expand Up @@ -337,6 +374,8 @@ func (k *KSMCheck) Configure(senderManager sender.SenderManager, integrationConf
return err
}

k.configurePodCollection(builder, collectors)

// Start the collection process
k.allStores = builder.BuildStores()

Expand Down Expand Up @@ -478,9 +517,14 @@ func (k *KSMCheck) Run() error {
// Note that by design, some metrics cannot have hostnames (e.g kubernetes_state.pod.unschedulable)
sender.DisableDefaultHostname(true)

// If KSM is running in the node agent, and it's configured to collect only
// pods and from the node agent, we don't need to run leader election,
// because each node agent is responsible for collecting its own pods.
podsFromKubeletInNodeAgent := k.isRunningOnNodeAgent && k.instance.PodCollectionMode == nodeKubeletPodCollection

// If the check is configured as a cluster check, the cluster check worker needs to skip the leader election section.
// we also do a safety check for dedicated runners to avoid trying the leader election
if !k.isCLCRunner || !k.instance.LeaderSkip {
if (!k.isCLCRunner || !k.instance.LeaderSkip) && !podsFromKubeletInNodeAgent {
// Only run if Leader Election is enabled.
if !ddconfig.Datadog().GetBool("leader_election") {
return log.Error("Leader Election not enabled. The cluster-agent will not run the kube-state-metrics core check.")
Expand Down Expand Up @@ -795,6 +839,43 @@ func (k *KSMCheck) initTags() {
}
}

func (k *KSMCheck) configurePodCollection(builder *kubestatemetrics.Builder, collectors []string) {
switch k.instance.PodCollectionMode {
case "":
k.instance.PodCollectionMode = defaultPodCollection
case defaultPodCollection:
// No need to do anything
case nodeKubeletPodCollection:
if k.isRunningOnNodeAgent {
// If the check is running in a node agent, we can collect pods from
// the kubelet but only if it's the only collector enabled. When
// there are more collectors enabled, we need leader election and
// pods would only be collected from one of the agents.
if len(collectors) == 1 && collectors[0] == "pods" {
builder.WithPodCollectionFromKubelet()
} else {
log.Warnf("pod collection from the Kubelet is enabled but it's only supported when the only collector enabled is pods, " +
"so the check will collect pods from the API server instead of the Kubelet")
k.instance.PodCollectionMode = defaultPodCollection
}
} else {
log.Warnf("pod collection from the Kubelet is enabled but KSM is running in the cluster agent or cluster check runner, " +
"so the check will collect pods from the API server instead of the Kubelet")
k.instance.PodCollectionMode = defaultPodCollection
}
case clusterUnassignedPodCollection:
if k.isRunningOnNodeAgent {
log.Warnf("collection of unassigned pods is enabled but KSM is running in a node agent, so the option will be ignored")
k.instance.PodCollectionMode = defaultPodCollection
} else {
builder.WithUnassignedPodsCollection()
}
default:
log.Warnf("invalid pod collection mode %q, falling back to the default mode", k.instance.PodCollectionMode)
k.instance.PodCollectionMode = defaultPodCollection
}
}

// processTelemetry accumulates the telemetry metric values, it can be called multiple times
// during a check run then sendTelemetry should be called to forward the calculated values
func (k *KSMCheck) processTelemetry(metrics map[string][]ksmstore.DDMetricsFam) {
Expand Down Expand Up @@ -868,13 +949,14 @@ func KubeStateMetricsFactoryWithParam(labelsMapper map[string]string, labelJoins

func newKSMCheck(base core.CheckBase, instance *KSMConfig) *KSMCheck {
return &KSMCheck{
CheckBase: base,
instance: instance,
telemetry: newTelemetryCache(),
isCLCRunner: ddconfig.IsCLCRunner(),
metricNamesMapper: defaultMetricNamesMapper(),
metricAggregators: defaultMetricAggregators(),
metricTransformers: defaultMetricTransformers(),
CheckBase: base,
instance: instance,
telemetry: newTelemetryCache(),
isCLCRunner: ddconfig.IsCLCRunner(),
isRunningOnNodeAgent: flavor.GetFlavor() != flavor.ClusterAgent && !ddconfig.IsCLCRunner(),
metricNamesMapper: defaultMetricNamesMapper(),
metricAggregators: defaultMetricAggregators(),
metricTransformers: defaultMetricTransformers(),

// metadata metrics are useful for label joins
// but shouldn't be submitted to Datadog
Expand Down
57 changes: 53 additions & 4 deletions pkg/kubestatemetrics/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type Builder struct {
metrics *watch.ListWatchMetrics

resync time.Duration

collectPodsFromKubelet bool
collectOnlyUnassignedPods bool
}

// New returns new Builder instance
Expand Down Expand Up @@ -135,6 +138,20 @@ func (b *Builder) WithAllowAnnotations(l map[string][]string) {
b.ksmBuilder.WithAllowAnnotations(l)
}

// WithPodCollectionFromKubelet configures the builder to collect pods from the
// Kubelet instead of the API server. This has no effect if pod collection is
// disabled.
func (b *Builder) WithPodCollectionFromKubelet() {
b.collectPodsFromKubelet = true
}

// WithUnassignedPodsCollection configures the builder to only collect pods that
// are not assigned to any node. This has no effect if pod collection is
// disabled.
func (b *Builder) WithUnassignedPodsCollection() {
b.collectOnlyUnassignedPods = true
}

// Build initializes and registers all enabled stores.
// Returns metric writers.
func (b *Builder) Build() metricsstore.MetricsWriterList {
Expand Down Expand Up @@ -172,17 +189,32 @@ func GenerateStores[T any](

if b.namespaces.IsAllNamespaces() {
store := store.NewMetricsStore(composedMetricGenFuncs, reflect.TypeOf(expectedType).String())
listWatcher := listWatchFunc(client, corev1.NamespaceAll, b.fieldSelectorFilter)
b.startReflector(expectedType, store, listWatcher, useAPIServerCache)

switch expectedType.(type) {
// Pods are handled differently because depending on the configuration
// they're collected from the API server or the Kubelet.
case *corev1.Pod:
handlePodCollection(b, store, client, listWatchFunc, corev1.NamespaceAll, useAPIServerCache)
default:
listWatcher := listWatchFunc(client, corev1.NamespaceAll, b.fieldSelectorFilter)
b.startReflector(expectedType, store, listWatcher, useAPIServerCache)
}
return []cache.Store{store}

}

stores := make([]cache.Store, 0, len(b.namespaces))
for _, ns := range b.namespaces {
store := store.NewMetricsStore(composedMetricGenFuncs, reflect.TypeOf(expectedType).String())
listWatcher := listWatchFunc(client, ns, b.fieldSelectorFilter)
b.startReflector(expectedType, store, listWatcher, useAPIServerCache)
switch expectedType.(type) {
// Pods are handled differently because depending on the configuration
// they're collected from the API server or the Kubelet.
case *corev1.Pod:
handlePodCollection(b, store, client, listWatchFunc, ns, useAPIServerCache)
default:
listWatcher := listWatchFunc(client, ns, b.fieldSelectorFilter)
b.startReflector(expectedType, store, listWatcher, useAPIServerCache)
}
stores = append(stores, store)
}

Expand Down Expand Up @@ -267,3 +299,20 @@ func (c *cacheEnabledListerWatcher) List(options v1.ListOptions) (runtime.Object

return res, err
}

func handlePodCollection[T any](b *Builder, store cache.Store, client T, listWatchFunc func(kubeClient T, ns string, fieldSelector string) cache.ListerWatcher, namespace string, useAPIServerCache bool) {
if b.collectPodsFromKubelet {
b.startKubeletPodWatcher(store, namespace)
return
}

fieldSelector := b.fieldSelectorFilter
if b.collectOnlyUnassignedPods {
// spec.nodeName is set to empty for unassigned pods. This ignores
// b.fieldSelectorFilter, but I think it's not used.
fieldSelector = "spec.nodeName="
}

listWatcher := listWatchFunc(client, namespace, fieldSelector)
b.startReflector(&corev1.Pod{}, store, listWatcher, useAPIServerCache)
}
101 changes: 101 additions & 0 deletions pkg/kubestatemetrics/builder/kubelet_pods.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build kubeapiserver && kubelet

package builder

import (
"context"
"fmt"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"

"github.com/DataDog/datadog-agent/pkg/util/kubernetes/kubelet"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

// PodWatcher is an interface for a component that watches for changes in pods
type PodWatcher interface {
PullChanges(ctx context.Context) ([]*kubelet.Pod, error)
Expire() ([]string, error)
}

func (b *Builder) startKubeletPodWatcher(store cache.Store, namespace string) {
podWatcher, err := kubelet.NewPodWatcher(15 * time.Second)
if err != nil {
log.Warnf("Failed to create pod watcher: %s", err)
}

ticker := time.NewTicker(5 * time.Second)

go func() {
for {
select {
case <-ticker.C:
err = updateStore(b.ctx, store, podWatcher, namespace)
if err != nil {
log.Errorf("Failed to update store: %s", err)
}

case <-b.ctx.Done():
ticker.Stop()
return
}
}
}()
}

func updateStore(ctx context.Context, store cache.Store, podWatcher PodWatcher, namespace string) error {
pods, err := podWatcher.PullChanges(ctx)
if err != nil {
return fmt.Errorf("failed to pull changes from pod watcher: %w", err)
}

for _, pod := range pods {
if namespace != corev1.NamespaceAll && pod.Metadata.Namespace != namespace {
continue
}

kubePod := kubelet.ConvertKubeletPodToK8sPod(pod)

err = store.Add(kubePod)
if err != nil {
log.Warnf("Failed to add pod to KSM store: %s", err)
}
}

expiredEntities, err := podWatcher.Expire()
if err != nil {
return fmt.Errorf("failed to expire pods: %w", err)
}

for _, expiredEntity := range expiredEntities {
// Expire() returns both pods and containers, we only care
// about pods
if !strings.HasPrefix(expiredEntity, kubelet.KubePodPrefix) {
continue
}

// Only the UID is needed to be able to delete
expiredPod := corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(strings.TrimPrefix(expiredEntity, kubelet.KubePodPrefix)),
},
}

err = store.Delete(&expiredPod)
if err != nil {
log.Warnf("Failed to delete pod from KSM store: %s", err)
}
}

return nil
}
16 changes: 16 additions & 0 deletions pkg/kubestatemetrics/builder/kubelet_pods_stub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build kubeapiserver && !kubelet

package builder

import (
"k8s.io/client-go/tools/cache"
)

func (b *Builder) startKubeletPodWatcher(_ cache.Store, _ string) {
// Do nothing
}
Loading

0 comments on commit 2e7d2bb

Please sign in to comment.