Skip to content

Commit

Permalink
[ksm] Allow collecting pod metrics from the Kubelet
Browse files Browse the repository at this point in the history
  • Loading branch information
davidor committed Aug 27, 2024
1 parent d50bae1 commit b669e24
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 12 deletions.
72 changes: 64 additions & 8 deletions pkg/collector/corechecks/cluster/ksm/kubernetes_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
configUtils "github.com/DataDog/datadog-agent/pkg/config/utils"
kubestatemetrics "github.com/DataDog/datadog-agent/pkg/kubestatemetrics/builder"
ksmstore "github.com/DataDog/datadog-agent/pkg/kubestatemetrics/store"
"github.com/DataDog/datadog-agent/pkg/util/flavor"
hostnameUtil "github.com/DataDog/datadog-agent/pkg/util/hostname"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver"
Expand Down Expand Up @@ -149,6 +150,26 @@ type KSMConfig struct {

// UseAPIServerCache enables the use of the API server cache for the check
UseAPIServerCache bool `yaml:"use_apiserver_cache"`

// CollectPodsFromKubelet enables the collection of pods from the kubelet
// instead of the Kubernetes API server.
//
// This is meant to be enabled when the check is running on the node agent.
// This is useful in clusters with a large number of pods where emitting pod
// metrics from a single instance might be too much.
//
// One thing to note is that when the node agent collects metrics from the
// kubelet and the cluster agent or cluster check runner collects metrics
// for other resources, label joins are not supported for pod metrics if the
// join source is not a pod.
CollectPodsFromKubelet bool `yaml:"collect_pods_from_kubelet"`

// CollectOnlyUnassignedPods enables pod collection only for unassigned
// pods. This is meant to be enabled when the check is running on the
// cluster agent or the cluster check runner and "CollectPodsFromKubelet" is
// enabled on the node agents, because unassigned pods cannot be collected
// from node agents.
CollectOnlyUnassignedPods bool `yaml:"collect_only_unassigned_pods"`
}

// KSMCheck wraps the config and the metric stores needed to run the check
Expand All @@ -160,6 +181,7 @@ type KSMCheck struct {
telemetry *telemetryCache
cancel context.CancelFunc
isCLCRunner bool
isRunningOnNodeAgent bool
clusterNameTagValue string
clusterNameRFC1123 string
metricNamesMapper map[string]string
Expand Down Expand Up @@ -337,6 +359,32 @@ func (k *KSMCheck) Configure(senderManager sender.SenderManager, integrationConf
return err
}

if k.instance.CollectPodsFromKubelet {
if k.isRunningOnNodeAgent {
// If the check is running in a node agent, we can collect pods from
// the kubelet but only if it's the only collector enabled. When
// there are more collectors enabled, we need leader election and
// pods would only be collected from one of the agents.
if len(collectors) == 1 && collectors[0] == "pods" {
builder.WithPodCollectionFromKubelet()
} else {
log.Warnf("collect_pods_from_kubelet is enabled but it's only supported when the only collector enabled is pods, " +
"so the check will collect pods from the API server instead of the Kubelet")
}
} else {
log.Warnf("collect_pods_from_kubelet is enabled but KSM is running in the cluster agent or cluster check runner, " +
"so the check will collect pods from the API server instead of the Kubelet")
}
}

if k.instance.CollectOnlyUnassignedPods {
if k.isRunningOnNodeAgent {
log.Warnf("collect_only_unassigned_pods is enabled but KSM is running in a node agent, so the option will be ignored")
} else {
builder.WithUnassignedPodsCollection()
}
}

// Start the collection process
k.allStores = builder.BuildStores()

Expand Down Expand Up @@ -478,9 +526,16 @@ func (k *KSMCheck) Run() error {
// Note that by design, some metrics cannot have hostnames (e.g kubernetes_state.pod.unschedulable)
sender.DisableDefaultHostname(true)

// If KSM is running in the node agent, and it's configured to collect only
// pods and from the node agent, we don't need to run leader election,
// because each node agent is responsible for collecting its own pods.
podsFromKubeletInNodeAgent := k.isRunningOnNodeAgent &&
k.instance.CollectPodsFromKubelet &&
len(k.instance.Collectors) == 1 && k.instance.Collectors[0] == "pods"

// If the check is configured as a cluster check, the cluster check worker needs to skip the leader election section.
// we also do a safety check for dedicated runners to avoid trying the leader election
if !k.isCLCRunner || !k.instance.LeaderSkip {
if (!k.isCLCRunner || !k.instance.LeaderSkip) && !podsFromKubeletInNodeAgent {
// Only run if Leader Election is enabled.
if !ddconfig.Datadog().GetBool("leader_election") {
return log.Error("Leader Election not enabled. The cluster-agent will not run the kube-state-metrics core check.")
Expand Down Expand Up @@ -868,13 +923,14 @@ func KubeStateMetricsFactoryWithParam(labelsMapper map[string]string, labelJoins

func newKSMCheck(base core.CheckBase, instance *KSMConfig) *KSMCheck {
return &KSMCheck{
CheckBase: base,
instance: instance,
telemetry: newTelemetryCache(),
isCLCRunner: ddconfig.IsCLCRunner(),
metricNamesMapper: defaultMetricNamesMapper(),
metricAggregators: defaultMetricAggregators(),
metricTransformers: defaultMetricTransformers(),
CheckBase: base,
instance: instance,
telemetry: newTelemetryCache(),
isCLCRunner: ddconfig.IsCLCRunner(),
isRunningOnNodeAgent: flavor.GetFlavor() != flavor.ClusterAgent && !ddconfig.IsCLCRunner(),
metricNamesMapper: defaultMetricNamesMapper(),
metricAggregators: defaultMetricAggregators(),
metricTransformers: defaultMetricTransformers(),

// metadata metrics are useful for label joins
// but shouldn't be submitted to Datadog
Expand Down
57 changes: 53 additions & 4 deletions pkg/kubestatemetrics/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type Builder struct {
metrics *watch.ListWatchMetrics

resync time.Duration

collectPodsFromKubelet bool
collectOnlyUnassignedPods bool
}

// New returns new Builder instance
Expand Down Expand Up @@ -135,6 +138,20 @@ func (b *Builder) WithAllowAnnotations(l map[string][]string) {
b.ksmBuilder.WithAllowAnnotations(l)
}

// WithPodCollectionFromKubelet configures the builder to collect pods from the
// Kubelet instead of the API server. This has no effect if pod collection is
// disabled.
func (b *Builder) WithPodCollectionFromKubelet() {
b.collectPodsFromKubelet = true
}

// WithUnassignedPodsCollection configures the builder to only collect pods that
// are not assigned to any node. This has no effect if pod collection is
// disabled.
func (b *Builder) WithUnassignedPodsCollection() {
b.collectOnlyUnassignedPods = true
}

// Build initializes and registers all enabled stores.
// Returns metric writers.
func (b *Builder) Build() metricsstore.MetricsWriterList {
Expand Down Expand Up @@ -172,17 +189,32 @@ func GenerateStores[T any](

if b.namespaces.IsAllNamespaces() {
store := store.NewMetricsStore(composedMetricGenFuncs, reflect.TypeOf(expectedType).String())
listWatcher := listWatchFunc(client, corev1.NamespaceAll, b.fieldSelectorFilter)
b.startReflector(expectedType, store, listWatcher, useAPIServerCache)

switch expectedType.(type) {
// Pods are handled differently because depending on the configuration
// they're collected from the API server or the Kubelet.
case *corev1.Pod:
handlePodCollection(b, store, client, listWatchFunc, corev1.NamespaceAll, useAPIServerCache)
default:
listWatcher := listWatchFunc(client, corev1.NamespaceAll, b.fieldSelectorFilter)
b.startReflector(expectedType, store, listWatcher, useAPIServerCache)
}
return []cache.Store{store}

}

stores := make([]cache.Store, 0, len(b.namespaces))
for _, ns := range b.namespaces {
store := store.NewMetricsStore(composedMetricGenFuncs, reflect.TypeOf(expectedType).String())
listWatcher := listWatchFunc(client, ns, b.fieldSelectorFilter)
b.startReflector(expectedType, store, listWatcher, useAPIServerCache)
switch expectedType.(type) {
// Pods are handled differently because depending on the configuration
// they're collected from the API server or the Kubelet.
case *corev1.Pod:
handlePodCollection(b, store, client, listWatchFunc, ns, useAPIServerCache)
default:
listWatcher := listWatchFunc(client, ns, b.fieldSelectorFilter)
b.startReflector(expectedType, store, listWatcher, useAPIServerCache)
}
stores = append(stores, store)
}

Expand Down Expand Up @@ -267,3 +299,20 @@ func (c *cacheEnabledListerWatcher) List(options v1.ListOptions) (runtime.Object

return res, err
}

func handlePodCollection[T any](b *Builder, store cache.Store, client T, listWatchFunc func(kubeClient T, ns string, fieldSelector string) cache.ListerWatcher, namespace string, useAPIServerCache bool) {
if b.collectPodsFromKubelet {
b.startKubeletPodWatcher(store, namespace)
return
}

fieldSelector := b.fieldSelectorFilter
if b.collectOnlyUnassignedPods {
// spec.nodeName is set to empty for unassigned pods. This ignores
// b.fieldSelectorFilter, but I think it's not used.
fieldSelector = "spec.nodeName="
}

listWatcher := listWatchFunc(client, namespace, fieldSelector)
b.startReflector(&corev1.Pod{}, store, listWatcher, useAPIServerCache)
}

0 comments on commit b669e24

Please sign in to comment.