From b669e24a8d0bda80abf96ab8216bef9d872f9d60 Mon Sep 17 00:00:00 2001 From: David Ortiz Date: Tue, 27 Aug 2024 15:48:18 +0200 Subject: [PATCH] [ksm] Allow collecting pod metrics from the Kubelet --- .../cluster/ksm/kubernetes_state.go | 72 ++++++++++++++++--- pkg/kubestatemetrics/builder/builder.go | 57 +++++++++++++-- 2 files changed, 117 insertions(+), 12 deletions(-) diff --git a/pkg/collector/corechecks/cluster/ksm/kubernetes_state.go b/pkg/collector/corechecks/cluster/ksm/kubernetes_state.go index 441469999d004a..545c4d46c9eadb 100644 --- a/pkg/collector/corechecks/cluster/ksm/kubernetes_state.go +++ b/pkg/collector/corechecks/cluster/ksm/kubernetes_state.go @@ -28,6 +28,7 @@ import ( configUtils "github.com/DataDog/datadog-agent/pkg/config/utils" kubestatemetrics "github.com/DataDog/datadog-agent/pkg/kubestatemetrics/builder" ksmstore "github.com/DataDog/datadog-agent/pkg/kubestatemetrics/store" + "github.com/DataDog/datadog-agent/pkg/util/flavor" hostnameUtil "github.com/DataDog/datadog-agent/pkg/util/hostname" "github.com/DataDog/datadog-agent/pkg/util/kubernetes" "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver" @@ -149,6 +150,26 @@ type KSMConfig struct { // UseAPIServerCache enables the use of the API server cache for the check UseAPIServerCache bool `yaml:"use_apiserver_cache"` + + // CollectPodsFromKubelet enables the collection of pods from the kubelet + // instead of the Kubernetes API server. + // + // This is meant to be enabled when the check is running on the node agent. + // This is useful in clusters with a large number of pods where emitting pod + // metrics from a single instance might be too much. + // + // One thing to note is that when the node agent collects metrics from the + // kubelet and the cluster agent or cluster check runner collects metrics + // for other resources, label joins are not supported for pod metrics if the + // join source is not a pod. + CollectPodsFromKubelet bool `yaml:"collect_pods_from_kubelet"` + + // CollectOnlyUnassignedPods enables pod collection only for unassigned + // pods. This is meant to be enabled when the check is running on the + // cluster agent or the cluster check runner and "CollectPodsFromKubelet" is + // enabled on the node agents, because unassigned pods cannot be collected + // from node agents. + CollectOnlyUnassignedPods bool `yaml:"collect_only_unassigned_pods"` } // KSMCheck wraps the config and the metric stores needed to run the check @@ -160,6 +181,7 @@ type KSMCheck struct { telemetry *telemetryCache cancel context.CancelFunc isCLCRunner bool + isRunningOnNodeAgent bool clusterNameTagValue string clusterNameRFC1123 string metricNamesMapper map[string]string @@ -337,6 +359,32 @@ func (k *KSMCheck) Configure(senderManager sender.SenderManager, integrationConf return err } + if k.instance.CollectPodsFromKubelet { + if k.isRunningOnNodeAgent { + // If the check is running in a node agent, we can collect pods from + // the kubelet but only if it's the only collector enabled. When + // there are more collectors enabled, we need leader election and + // pods would only be collected from one of the agents. + if len(collectors) == 1 && collectors[0] == "pods" { + builder.WithPodCollectionFromKubelet() + } else { + log.Warnf("collect_pods_from_kubelet is enabled but it's only supported when the only collector enabled is pods, " + + "so the check will collect pods from the API server instead of the Kubelet") + } + } else { + log.Warnf("collect_pods_from_kubelet is enabled but KSM is running in the cluster agent or cluster check runner, " + + "so the check will collect pods from the API server instead of the Kubelet") + } + } + + if k.instance.CollectOnlyUnassignedPods { + if k.isRunningOnNodeAgent { + log.Warnf("collect_only_unassigned_pods is enabled but KSM is running in a node agent, so the option will be ignored") + } else { + builder.WithUnassignedPodsCollection() + } + } + // Start the collection process k.allStores = builder.BuildStores() @@ -478,9 +526,16 @@ func (k *KSMCheck) Run() error { // Note that by design, some metrics cannot have hostnames (e.g kubernetes_state.pod.unschedulable) sender.DisableDefaultHostname(true) + // If KSM is running in the node agent, and it's configured to collect only + // pods and from the node agent, we don't need to run leader election, + // because each node agent is responsible for collecting its own pods. + podsFromKubeletInNodeAgent := k.isRunningOnNodeAgent && + k.instance.CollectPodsFromKubelet && + len(k.instance.Collectors) == 1 && k.instance.Collectors[0] == "pods" + // If the check is configured as a cluster check, the cluster check worker needs to skip the leader election section. // we also do a safety check for dedicated runners to avoid trying the leader election - if !k.isCLCRunner || !k.instance.LeaderSkip { + if (!k.isCLCRunner || !k.instance.LeaderSkip) && !podsFromKubeletInNodeAgent { // Only run if Leader Election is enabled. if !ddconfig.Datadog().GetBool("leader_election") { return log.Error("Leader Election not enabled. The cluster-agent will not run the kube-state-metrics core check.") @@ -868,13 +923,14 @@ func KubeStateMetricsFactoryWithParam(labelsMapper map[string]string, labelJoins func newKSMCheck(base core.CheckBase, instance *KSMConfig) *KSMCheck { return &KSMCheck{ - CheckBase: base, - instance: instance, - telemetry: newTelemetryCache(), - isCLCRunner: ddconfig.IsCLCRunner(), - metricNamesMapper: defaultMetricNamesMapper(), - metricAggregators: defaultMetricAggregators(), - metricTransformers: defaultMetricTransformers(), + CheckBase: base, + instance: instance, + telemetry: newTelemetryCache(), + isCLCRunner: ddconfig.IsCLCRunner(), + isRunningOnNodeAgent: flavor.GetFlavor() != flavor.ClusterAgent && !ddconfig.IsCLCRunner(), + metricNamesMapper: defaultMetricNamesMapper(), + metricAggregators: defaultMetricAggregators(), + metricTransformers: defaultMetricTransformers(), // metadata metrics are useful for label joins // but shouldn't be submitted to Datadog diff --git a/pkg/kubestatemetrics/builder/builder.go b/pkg/kubestatemetrics/builder/builder.go index 779269a78798c0..148318bc1d6fb8 100644 --- a/pkg/kubestatemetrics/builder/builder.go +++ b/pkg/kubestatemetrics/builder/builder.go @@ -47,6 +47,9 @@ type Builder struct { metrics *watch.ListWatchMetrics resync time.Duration + + collectPodsFromKubelet bool + collectOnlyUnassignedPods bool } // New returns new Builder instance @@ -135,6 +138,20 @@ func (b *Builder) WithAllowAnnotations(l map[string][]string) { b.ksmBuilder.WithAllowAnnotations(l) } +// WithPodCollectionFromKubelet configures the builder to collect pods from the +// Kubelet instead of the API server. This has no effect if pod collection is +// disabled. +func (b *Builder) WithPodCollectionFromKubelet() { + b.collectPodsFromKubelet = true +} + +// WithUnassignedPodsCollection configures the builder to only collect pods that +// are not assigned to any node. This has no effect if pod collection is +// disabled. +func (b *Builder) WithUnassignedPodsCollection() { + b.collectOnlyUnassignedPods = true +} + // Build initializes and registers all enabled stores. // Returns metric writers. func (b *Builder) Build() metricsstore.MetricsWriterList { @@ -172,8 +189,16 @@ func GenerateStores[T any]( if b.namespaces.IsAllNamespaces() { store := store.NewMetricsStore(composedMetricGenFuncs, reflect.TypeOf(expectedType).String()) - listWatcher := listWatchFunc(client, corev1.NamespaceAll, b.fieldSelectorFilter) - b.startReflector(expectedType, store, listWatcher, useAPIServerCache) + + switch expectedType.(type) { + // Pods are handled differently because depending on the configuration + // they're collected from the API server or the Kubelet. + case *corev1.Pod: + handlePodCollection(b, store, client, listWatchFunc, corev1.NamespaceAll, useAPIServerCache) + default: + listWatcher := listWatchFunc(client, corev1.NamespaceAll, b.fieldSelectorFilter) + b.startReflector(expectedType, store, listWatcher, useAPIServerCache) + } return []cache.Store{store} } @@ -181,8 +206,15 @@ func GenerateStores[T any]( stores := make([]cache.Store, 0, len(b.namespaces)) for _, ns := range b.namespaces { store := store.NewMetricsStore(composedMetricGenFuncs, reflect.TypeOf(expectedType).String()) - listWatcher := listWatchFunc(client, ns, b.fieldSelectorFilter) - b.startReflector(expectedType, store, listWatcher, useAPIServerCache) + switch expectedType.(type) { + // Pods are handled differently because depending on the configuration + // they're collected from the API server or the Kubelet. + case *corev1.Pod: + handlePodCollection(b, store, client, listWatchFunc, ns, useAPIServerCache) + default: + listWatcher := listWatchFunc(client, ns, b.fieldSelectorFilter) + b.startReflector(expectedType, store, listWatcher, useAPIServerCache) + } stores = append(stores, store) } @@ -267,3 +299,20 @@ func (c *cacheEnabledListerWatcher) List(options v1.ListOptions) (runtime.Object return res, err } + +func handlePodCollection[T any](b *Builder, store cache.Store, client T, listWatchFunc func(kubeClient T, ns string, fieldSelector string) cache.ListerWatcher, namespace string, useAPIServerCache bool) { + if b.collectPodsFromKubelet { + b.startKubeletPodWatcher(store, namespace) + return + } + + fieldSelector := b.fieldSelectorFilter + if b.collectOnlyUnassignedPods { + // spec.nodeName is set to empty for unassigned pods. This ignores + // b.fieldSelectorFilter, but I think it's not used. + fieldSelector = "spec.nodeName=" + } + + listWatcher := listWatchFunc(client, namespace, fieldSelector) + b.startReflector(&corev1.Pod{}, store, listWatcher, useAPIServerCache) +}