From 5393674e4f897ea646da7962549e99e38899d4d5 Mon Sep 17 00:00:00 2001 From: Vincent Boulineau <58430298+vboulineau@users.noreply.github.com> Date: Wed, 4 Dec 2024 04:29:22 -0500 Subject: [PATCH] Performance improvements of External Metrics controller and allow multiple workers (#31671) --- cmd/secrethelper/secret_helper.go | 4 +- cmd/secrethelper/secret_helper_test.go | 2 +- .../datadogmetric_controller.go | 96 +++++++++++-------- .../datadogmetric_controller_test.go | 2 +- .../externalmetrics/metrics_retriever.go | 4 +- .../externalmetrics/metrics_retriever_test.go | 9 +- .../model/datadogmetricinternal.go | 2 +- .../autoscaling/externalmetrics/provider.go | 5 +- .../autoscaling/externalmetrics/telemetry.go | 36 ++++--- pkg/config/setup/config.go | 6 +- pkg/util/kubernetes/apiserver/apiserver.go | 60 +++++++----- .../apiserver/apiserver_nocompile.go | 10 +- 12 files changed, 137 insertions(+), 99 deletions(-) diff --git a/cmd/secrethelper/secret_helper.go b/cmd/secrethelper/secret_helper.go index fa385bc3765c9..a56201009e2b3 100644 --- a/cmd/secrethelper/secret_helper.go +++ b/cmd/secrethelper/secret_helper.go @@ -52,7 +52,7 @@ const ( ) // NewKubeClient returns a new kubernetes.Interface -type NewKubeClient func(timeout time.Duration) (kubernetes.Interface, error) +type NewKubeClient func(timeout time.Duration, qps float32, burst int) (kubernetes.Interface, error) // cliParams are the command-line arguments for this subcommand type cliParams struct { @@ -175,7 +175,7 @@ func readSecretsUsingPrefixes(secretsList []string, rootPath string, newKubeClie case filePrefix: res[secretID] = providers.ReadSecretFile(id) case k8sSecretPrefix: - kubeClient, err := newKubeClientFunc(10 * time.Second) + kubeClient, err := newKubeClientFunc(10*time.Second, 0, 0) // Default QPS and burst to Kube client defaults using 0 if err != nil { res[secretID] = secrets.SecretVal{Value: "", ErrorMsg: err.Error()} } else { diff --git a/cmd/secrethelper/secret_helper_test.go b/cmd/secrethelper/secret_helper_test.go index c84cfb5abaa5f..3b76fa8d461cb 100644 --- a/cmd/secrethelper/secret_helper_test.go +++ b/cmd/secrethelper/secret_helper_test.go @@ -22,7 +22,7 @@ import ( ) func TestReadSecrets(t *testing.T) { - newKubeClientFunc := func(_ time.Duration) (kubernetes.Interface, error) { + newKubeClientFunc := func(_ time.Duration, _ float32, _ int) (kubernetes.Interface, error) { return fake.NewSimpleClientset(&v1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: "some_name", diff --git a/pkg/clusteragent/autoscaling/externalmetrics/datadogmetric_controller.go b/pkg/clusteragent/autoscaling/externalmetrics/datadogmetric_controller.go index ca456d366b6aa..439a20e178a37 100644 --- a/pkg/clusteragent/autoscaling/externalmetrics/datadogmetric_controller.go +++ b/pkg/clusteragent/autoscaling/externalmetrics/datadogmetric_controller.go @@ -13,6 +13,7 @@ import ( "time" "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/externalmetrics/model" + le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics" "github.com/DataDog/datadog-agent/pkg/util/log" datadoghq "github.com/DataDog/datadog-operator/api/datadoghq/v1alpha1" @@ -20,7 +21,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/tools/cache" @@ -33,6 +33,15 @@ const ( ddmControllerStoreID string = "ddmc" ) +type controllerOperation string + +const ( + createControllerOperation controllerOperation = "create" + updateControllerOperation controllerOperation = "update" + deleteControllerOperation controllerOperation = "delete" + noopControllerOperation controllerOperation = "none" +) + var ( gvrDDM = datadoghq.GroupVersion.WithResource("datadogmetrics") metaDDM = metav1.TypeMeta{ @@ -92,30 +101,37 @@ func NewDatadogMetricController(client dynamic.Interface, informer dynamicinform } // Run starts the controller to handle DatadogMetrics -func (c *DatadogMetricController) Run(ctx context.Context) { +func (c *DatadogMetricController) Run(ctx context.Context, numWorkers int) { if ctx == nil { log.Errorf("Cannot run with a nil context") return } c.context = ctx - defer c.workqueue.ShutDown() - log.Infof("Starting DatadogMetric Controller (waiting for cache sync)") if !cache.WaitForCacheSync(ctx.Done(), c.synced) { log.Errorf("Failed to wait for DatadogMetric caches to sync") return } - go wait.Until(c.worker, time.Second, ctx.Done()) + for i := 0; i < numWorkers; i++ { + go c.worker(i) + } log.Infof("Started DatadogMetric Controller (cache sync finished)") <-ctx.Done() log.Infof("Stopping DatadogMetric Controller") + if c.isLeader() { + c.workqueue.ShutDownWithDrain() + } else { + c.workqueue.ShutDown() + } + log.Infof("DatadogMetric Controller stopped") } -func (c *DatadogMetricController) worker() { - for c.process() { +func (c *DatadogMetricController) worker(workerID int) { + log.Debugf("Starting DatadogMetric worker: %d", workerID) + for c.process(workerID) { } } @@ -135,16 +151,25 @@ func (c *DatadogMetricController) enqueueID(id, sender string) { } } -func (c *DatadogMetricController) process() bool { +func (c *DatadogMetricController) process(workerID int) bool { key, shutdown := c.workqueue.Get() if shutdown { log.Infof("DatadogMetric Controller: Caught stop signal in workqueue") return false } + // We start the timer after waiting on the queue itself to have actual processing time. + startTime := time.Now() + operation := noopControllerOperation + var err error + + defer func() { + reconcileElapsed.Observe(time.Since(startTime).Seconds(), string(operation), inErrorLabelValue(err), le.JoinLeaderValue) + }() + defer c.workqueue.Done(key) - err := c.processDatadogMetric(key) + operation, err = c.processDatadogMetric(workerID, key) if err == nil { c.workqueue.Forget(key) } else { @@ -158,13 +183,13 @@ func (c *DatadogMetricController) process() bool { return true } -func (c *DatadogMetricController) processDatadogMetric(key interface{}) error { +func (c *DatadogMetricController) processDatadogMetric(workerID int, key interface{}) (controllerOperation, error) { datadogMetricKey := key.(string) - log.Debugf("Processing DatadogMetric: %s", datadogMetricKey) + log.Tracef("Processing DatadogMetric: %s - worker %d", datadogMetricKey, workerID) ns, name, err := cache.SplitMetaNamespaceKey(datadogMetricKey) if err != nil { - return fmt.Errorf("Could not split the key: %v", err) + return noopControllerOperation, fmt.Errorf("Could not split the key: %v", err) } datadogMetricCached := &datadoghq.DatadogMetric{} @@ -178,34 +203,30 @@ func (c *DatadogMetricController) processDatadogMetric(key interface{}) error { // We ignore not found here as we may need to create a DatadogMetric later datadogMetricCached = nil case err != nil: - return fmt.Errorf("Unable to retrieve DatadogMetric: %w", err) - case datadogMetricCached == nil: - return fmt.Errorf("Could not parse empty DatadogMetric from local cache") + return noopControllerOperation, fmt.Errorf("Unable to retrieve DatadogMetric: %w", err) } // No error path, check what to do with this event if c.isLeader() { - err = c.syncDatadogMetric(ns, name, datadogMetricKey, datadogMetricCached) - if err != nil { - return err - } + return c.syncDatadogMetric(ns, name, datadogMetricKey, datadogMetricCached) + } + + // Follower flow + if datadogMetricCached != nil { + // Feeding local cache with DatadogMetric information + c.store.Set(datadogMetricKey, model.NewDatadogMetricInternal(datadogMetricKey, *datadogMetricCached), ddmControllerStoreID) + setDatadogMetricTelemetry(datadogMetricCached) } else { - if datadogMetricCached != nil { - // Feeding local cache with DatadogMetric information - c.store.Set(datadogMetricKey, model.NewDatadogMetricInternal(datadogMetricKey, *datadogMetricCached), ddmControllerStoreID) - setDatadogMetricTelemetry(datadogMetricCached) - } else { - c.store.Delete(datadogMetricKey, ddmControllerStoreID) - unsetDatadogMetricTelemetry(ns, name) - } + c.store.Delete(datadogMetricKey, ddmControllerStoreID) + unsetDatadogMetricTelemetry(ns, name) } - return nil + return noopControllerOperation, nil } // Synchronize DatadogMetric state between internal store and Kubernetes objects // Make sure any `return` has the proper store Unlock -func (c *DatadogMetricController) syncDatadogMetric(ns, name, datadogMetricKey string, datadogMetric *datadoghq.DatadogMetric) error { +func (c *DatadogMetricController) syncDatadogMetric(ns, name, datadogMetricKey string, datadogMetric *datadoghq.DatadogMetric) (controllerOperation, error) { datadogMetricInternal := c.store.LockRead(datadogMetricKey, true) if datadogMetricInternal == nil { if datadogMetric != nil { @@ -216,7 +237,7 @@ func (c *DatadogMetricController) syncDatadogMetric(ns, name, datadogMetricKey s c.store.Unlock(datadogMetricKey) } - return nil + return noopControllerOperation, nil } // If DatadogMetric object is not present in Kubernetes, we need to clear our store (removed by user) or create it (autogen) @@ -224,12 +245,12 @@ func (c *DatadogMetricController) syncDatadogMetric(ns, name, datadogMetricKey s if datadogMetricInternal.Autogen && !datadogMetricInternal.Deleted { err := c.createDatadogMetric(ns, name, datadogMetricInternal) c.store.Unlock(datadogMetricKey) - return err + return createControllerOperation, err } // Already deleted in Kube, cleaning internal store c.store.UnlockDelete(datadogMetricKey, ddmControllerStoreID) - return nil + return noopControllerOperation, nil } // Objects exists in both places (local store and K8S), we need to sync them @@ -241,20 +262,19 @@ func (c *DatadogMetricController) syncDatadogMetric(ns, name, datadogMetricKey s c.store.Unlock(datadogMetricKey) // We add a requeue in case the deleted event is lost c.workqueue.AddAfter(datadogMetricKey, time.Duration(requeueDelaySeconds)*time.Second) - return c.deleteDatadogMetric(ns, name) + return deleteControllerOperation, c.deleteDatadogMetric(ns, name) } + // After this `Unlock`, datadogMetricInternal cannot be modified datadogMetricInternal.UpdateFrom(*datadogMetric) - defer c.store.UnlockSet(datadogMetricInternal.ID, *datadogMetricInternal, ddmControllerStoreID) + c.store.UnlockSet(datadogMetricKey, *datadogMetricInternal, ddmControllerStoreID) if datadogMetricInternal.IsNewerThan(datadogMetric.Status) { err := c.updateDatadogMetric(ns, name, datadogMetricInternal, datadogMetric) - if err != nil { - return err - } + return updateControllerOperation, err } - return nil + return noopControllerOperation, nil } func (c *DatadogMetricController) createDatadogMetric(ns, name string, datadogMetricInternal *model.DatadogMetricInternal) error { diff --git a/pkg/clusteragent/autoscaling/externalmetrics/datadogmetric_controller_test.go b/pkg/clusteragent/autoscaling/externalmetrics/datadogmetric_controller_test.go index d52e4936b1f7b..97f6e385bbb40 100644 --- a/pkg/clusteragent/autoscaling/externalmetrics/datadogmetric_controller_test.go +++ b/pkg/clusteragent/autoscaling/externalmetrics/datadogmetric_controller_test.go @@ -87,7 +87,7 @@ func (f *fixture) runControllerSync(leader bool, datadogMetricID string, expecte defer close(stopCh) informer.Start(stopCh) - err := controller.processDatadogMetric(datadogMetricID) + _, err := controller.processDatadogMetric(0, datadogMetricID) assert.Equal(f.t, expectedError, err) actions := autoscaling.FilterInformerActions(f.client.Actions(), "datadogmetrics") diff --git a/pkg/clusteragent/autoscaling/externalmetrics/metrics_retriever.go b/pkg/clusteragent/autoscaling/externalmetrics/metrics_retriever.go index eee1fc9d0ddd9..0d63018d06326 100644 --- a/pkg/clusteragent/autoscaling/externalmetrics/metrics_retriever.go +++ b/pkg/clusteragent/autoscaling/externalmetrics/metrics_retriever.go @@ -13,6 +13,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/externalmetrics/model" "github.com/DataDog/datadog-agent/pkg/util/backoff" + le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics" "github.com/DataDog/datadog-agent/pkg/util/kubernetes/autoscalers" "github.com/DataDog/datadog-agent/pkg/util/log" ) @@ -59,7 +60,7 @@ func (mr *MetricsRetriever) Run(stopCh <-chan struct{}) { if mr.isLeader() { startTime := time.Now() mr.retrieveMetricsValues() - retrieverElapsed.Observe(time.Since(startTime).Seconds()) + retrieverElapsed.Observe(time.Since(startTime).Seconds(), le.JoinLeaderValue) } case <-stopCh: log.Infof("Stopping MetricsRetriever") @@ -196,7 +197,6 @@ func (mr *MetricsRetriever) retrieveMetricsValuesSlice(datadogMetrics []model.Da } datadogMetricFromStore.UpdateTime = currentTime - mr.store.UnlockSet(datadogMetric.ID, *datadogMetricFromStore, metricRetrieverStoreID) } } diff --git a/pkg/clusteragent/autoscaling/externalmetrics/metrics_retriever_test.go b/pkg/clusteragent/autoscaling/externalmetrics/metrics_retriever_test.go index 29b422e93eeec..47c8ad6175d62 100644 --- a/pkg/clusteragent/autoscaling/externalmetrics/metrics_retriever_test.go +++ b/pkg/clusteragent/autoscaling/externalmetrics/metrics_retriever_test.go @@ -62,8 +62,7 @@ type metricsFixture struct { expected []ddmWithQuery } -//nolint:revive // TODO(CINT) Fix revive linter -func (f *metricsFixture) run(t *testing.T, testTime time.Time) { +func (f *metricsFixture) run(t *testing.T) { t.Helper() // Create and fill store @@ -174,7 +173,7 @@ func TestRetrieveMetricsBasic(t *testing.T) { for i, fixture := range fixtures { t.Run(fmt.Sprintf("#%d %s", i, fixture.desc), func(t *testing.T) { - fixture.run(t, defaultTestTime) + fixture.run(t) }) } } @@ -500,7 +499,7 @@ func TestRetrieveMetricsErrorCases(t *testing.T) { for i, fixture := range fixtures { t.Run(fmt.Sprintf("#%d %s", i, fixture.desc), func(t *testing.T) { - fixture.run(t, defaultTestTime) + fixture.run(t) }) } } @@ -639,7 +638,7 @@ func TestRetrieveMetricsNotActive(t *testing.T) { for i, fixture := range fixtures { t.Run(fmt.Sprintf("#%d %s", i, fixture.desc), func(t *testing.T) { - fixture.run(t, defaultTestTime) + fixture.run(t) }) } } diff --git a/pkg/clusteragent/autoscaling/externalmetrics/model/datadogmetricinternal.go b/pkg/clusteragent/autoscaling/externalmetrics/model/datadogmetricinternal.go index 0b0bd1144c87f..ad7efdb847bf9 100644 --- a/pkg/clusteragent/autoscaling/externalmetrics/model/datadogmetricinternal.go +++ b/pkg/clusteragent/autoscaling/externalmetrics/model/datadogmetricinternal.go @@ -317,7 +317,7 @@ func (d *DatadogMetricInternal) resolveQuery(query string) { return } if resolvedQuery != "" { - log.Infof("DatadogMetric query %q was resolved successfully, new query: %q", query, resolvedQuery) + log.Debugf("DatadogMetric query %q was resolved successfully, new query: %q", query, resolvedQuery) d.resolvedQuery = &resolvedQuery return } diff --git a/pkg/clusteragent/autoscaling/externalmetrics/provider.go b/pkg/clusteragent/autoscaling/externalmetrics/provider.go index bf07e1251413e..cb00178fe36d3 100644 --- a/pkg/clusteragent/autoscaling/externalmetrics/provider.go +++ b/pkg/clusteragent/autoscaling/externalmetrics/provider.go @@ -70,6 +70,7 @@ func NewDatadogMetricProvider(ctx context.Context, apiCl *apiserver.APIClient, d autogenNamespace := common.GetResourcesNamespace() autogenEnabled := pkgconfigsetup.Datadog().GetBool("external_metrics_provider.enable_datadogmetric_autogen") wpaEnabled := pkgconfigsetup.Datadog().GetBool("external_metrics_provider.wpa_controller") + numWorkers := pkgconfigsetup.Datadog().GetInt("external_metrics_provider.num_workers") provider := &datadogMetricProvider{ apiCl: apiCl, @@ -117,7 +118,7 @@ func NewDatadogMetricProvider(ctx context.Context, apiCl *apiserver.APIClient, d apiCl.InformerFactory.Start(ctx.Done()) go autoscalerWatcher.Run(ctx.Done()) - go controller.Run(ctx) + go controller.Run(ctx, numWorkers) return provider, nil } @@ -133,7 +134,7 @@ func (p *datadogMetricProvider) GetExternalMetric(_ context.Context, namespace s } } - setQueryTelemtry("get", namespace, startTime, err) + setQueryTelemtry("get", startTime, err) return res, err } diff --git a/pkg/clusteragent/autoscaling/externalmetrics/telemetry.go b/pkg/clusteragent/autoscaling/externalmetrics/telemetry.go index 2b5c82d7c17c6..b5547cab2a01b 100644 --- a/pkg/clusteragent/autoscaling/externalmetrics/telemetry.go +++ b/pkg/clusteragent/autoscaling/externalmetrics/telemetry.go @@ -27,23 +27,30 @@ const ( var ( ddmTelemetryValues = []string{ddmTelemetryValid, ddmTelemetryInvalid} + // Leader metrics ddmTelemetry = telemetry.NewGaugeWithOpts("external_metrics", "datadog_metrics", []string{"namespace", "name", "valid", "active", le.JoinLeaderLabel}, "The label valid is true if the DatadogMetric CR is valid, false otherwise. The label active is true if DatadogMetrics CR is used, false otherwise.", telemetry.Options{NoDoubleUnderscoreSep: true}) + retrieverElapsed = telemetry.NewHistogramWithOpts("external_metrics", "retriever_elapsed", + []string{le.JoinLeaderLabel}, "Wall time spent to retrieve metrics (seconds)", + []float64{0.5, 1, 5, 10, 20, 30, 60, 120, 300}, + telemetry.Options{NoDoubleUnderscoreSep: true}) + + // All instances metrics + reconcileElapsed = telemetry.NewHistogramWithOpts("external_metrics", "reconcile_elapsed", + []string{"operation", "in_error", le.JoinLeaderLabel}, "Wall time spent to reconcile a datadogmetric object (seconds)", + []float64{0.001, 0.01, 0.05, 0.1, 0.2, 0.4, 0.8, 1}, + telemetry.Options{NoDoubleUnderscoreSep: true}) + requestsTelemetry = telemetry.NewGaugeWithOpts("external_metrics", "api_requests", - []string{"namespace", "handler", "in_error"}, "Count of API Requests received", + []string{"handler", "in_error"}, "Count of API Requests received", telemetry.Options{NoDoubleUnderscoreSep: true}) elapsedTelemetry = telemetry.NewHistogramWithOpts("external_metrics", "api_elapsed", - []string{"namespace", "handler", "in_error"}, "Wall time spent on API request (seconds)", + []string{"handler", "in_error"}, "Wall time spent on API request (seconds)", prometheus.DefBuckets, telemetry.Options{NoDoubleUnderscoreSep: true}) - - retrieverElapsed = telemetry.NewHistogramWithOpts("external_metrics", "retriever_elapsed", - []string{}, "Wall time spent to retrieve metrics (seconds)", - []float64{0.5, 1, 5, 10, 20, 30, 60, 120, 300}, - telemetry.Options{NoDoubleUnderscoreSep: true}) ) func setDatadogMetricTelemetry(ddm *datadoghq.DatadogMetric) { @@ -84,13 +91,16 @@ func isDatadogMetricConditionTrue(ddm *datadoghq.DatadogMetric, conditionType da return false } -func setQueryTelemtry(handler, namespace string, startTime time.Time, err error) { - // Handle telemtry - inErrror := "false" +func inErrorLabelValue(err error) string { if err != nil { - inErrror = "true" + return "true" } + return "false" +} + +func setQueryTelemtry(handler string, startTime time.Time, err error) { + inError := inErrorLabelValue(err) - requestsTelemetry.Inc(namespace, handler, inErrror) - elapsedTelemetry.Observe(time.Since(startTime).Seconds(), namespace, handler, inErrror) + requestsTelemetry.Inc(handler, inError) + elapsedTelemetry.Observe(time.Since(startTime).Seconds(), handler, inError) } diff --git a/pkg/config/setup/config.go b/pkg/config/setup/config.go index 3544b3753e331..e54aba5d6684e 100644 --- a/pkg/config/setup/config.go +++ b/pkg/config/setup/config.go @@ -257,8 +257,8 @@ func init() { if envvar == "enable" { datadog = nodetreemodel.NewConfig("datadog", "DD", strings.NewReplacer(".", "_")) // nolint: forbidigo // legit use case } else if envvar == "tee" { - var viperConfig = pkgconfigmodel.NewConfig("datadog", "DD", strings.NewReplacer(".", "_")) // nolint: forbidigo // legit use case - var nodetreeConfig = nodetreemodel.NewConfig("datadog", "DD", strings.NewReplacer(".", "_")) // nolint: forbidigo // legit use case + viperConfig := pkgconfigmodel.NewConfig("datadog", "DD", strings.NewReplacer(".", "_")) // nolint: forbidigo // legit use case + nodetreeConfig := nodetreemodel.NewConfig("datadog", "DD", strings.NewReplacer(".", "_")) // nolint: forbidigo // legit use case datadog = teeconfig.NewTeeConfig(viperConfig, nodetreeConfig) } else { datadog = pkgconfigmodel.NewConfig("datadog", "DD", strings.NewReplacer(".", "_")) // nolint: forbidigo // legit use case @@ -703,6 +703,7 @@ func InitConfig(config pkgconfigmodel.Setup) { config.BindEnvAndSetDefault("external_metrics_provider.local_copy_refresh_rate", 30) // value in seconds config.BindEnvAndSetDefault("external_metrics_provider.chunk_size", 35) // Maximum number of queries to batch when querying Datadog. config.BindEnvAndSetDefault("external_metrics_provider.split_batches_with_backoff", false) // Splits batches and runs queries with errors individually with an exponential backoff + config.BindEnvAndSetDefault("external_metrics_provider.num_workers", 2) // Number of workers spawned by controller (only when CRD is used) pkgconfigmodel.AddOverrideFunc(sanitizeExternalMetricsProviderChunkSize) // Cluster check Autodiscovery config.BindEnvAndSetDefault("cluster_checks.support_hybrid_ignore_ad_tags", false) // TODO(CINT)(Agent 7.53+) Remove this flag when hybrid ignore_ad_tags is fully deprecated @@ -1284,7 +1285,6 @@ func telemetry(config pkgconfigmodel.Setup) { config.BindEnvAndSetDefault("agent_telemetry.enabled", true) config.SetKnown("agent_telemetry.additional_endpoints.*") bindEnvAndSetLogsConfigKeys(config, "agent_telemetry.") - } func serializer(config pkgconfigmodel.Setup) { diff --git a/pkg/util/kubernetes/apiserver/apiserver.go b/pkg/util/kubernetes/apiserver/apiserver.go index de06f74f1d2eb..d96a07666fbff 100644 --- a/pkg/util/kubernetes/apiserver/apiserver.go +++ b/pkg/util/kubernetes/apiserver/apiserver.go @@ -64,6 +64,15 @@ const ( tokenTime = "tokenTimestamp" tokenKey = "tokenKey" metadataMapExpire = 2 * time.Minute + + // Default QPS and Burst values for the clients + informerClientQPSLimit = 5 + informerClientQPSBurst = 10 + standardClientQPSLimit = 10 + standardClientQPSBurst = 20 + // This is mostly required for built-in controllers in Cluster Agent (ExternalMetrics, Autoscaling that can generate a high nunber of `Update` requests) + controllerClientQPSLimit = 150 + controllerClientQPSBurst = 300 ) // APIClient provides authenticated access to the @@ -197,8 +206,7 @@ func WaitForAPIClient(ctx context.Context) (*APIClient, error) { } } -// GetClientConfig returns a REST client configuration -func GetClientConfig(timeout time.Duration) (*rest.Config, error) { +func getClientConfig(timeout time.Duration, qps float32, burst int) (*rest.Config, error) { var clientConfig *rest.Config var err error cfgPath := pkgconfigsetup.Datadog().GetString("kubernetes_kubeconfig_path") @@ -231,6 +239,8 @@ func GetClientConfig(timeout time.Duration) (*rest.Config, error) { } clientConfig.Timeout = timeout + clientConfig.QPS = qps + clientConfig.Burst = burst clientConfig.Wrap(func(rt http.RoundTripper) http.RoundTripper { return NewCustomRoundTripper(rt, timeout) }) @@ -240,10 +250,10 @@ func GetClientConfig(timeout time.Duration) (*rest.Config, error) { // GetKubeClient returns a kubernetes API server client // You should not use this function except if you need to create a *NEW* Client. -func GetKubeClient(timeout time.Duration) (kubernetes.Interface, error) { +func GetKubeClient(timeout time.Duration, qps float32, burst int) (kubernetes.Interface, error) { // TODO: Remove custom warning logger when we remove usage of ComponentStatus rest.SetDefaultWarningHandler(CustomWarningLogger{}) - clientConfig, err := GetClientConfig(timeout) + clientConfig, err := getClientConfig(timeout, qps, burst) if err != nil { return nil, err } @@ -251,8 +261,8 @@ func GetKubeClient(timeout time.Duration) (kubernetes.Interface, error) { return kubernetes.NewForConfig(clientConfig) } -func getKubeDynamicClient(timeout time.Duration) (dynamic.Interface, error) { - clientConfig, err := GetClientConfig(timeout) +func getKubeDynamicClient(timeout time.Duration, qps float32, burst int) (dynamic.Interface, error) { + clientConfig, err := getClientConfig(timeout, qps, burst) if err != nil { return nil, err } @@ -260,8 +270,8 @@ func getKubeDynamicClient(timeout time.Duration) (dynamic.Interface, error) { return dynamic.NewForConfig(clientConfig) } -func getCRDClient(timeout time.Duration) (*clientset.Clientset, error) { - clientConfig, err := GetClientConfig(timeout) +func getCRDClient(timeout time.Duration, qps float32, burst int) (*clientset.Clientset, error) { + clientConfig, err := getClientConfig(timeout, qps, burst) if err != nil { return nil, err } @@ -269,16 +279,16 @@ func getCRDClient(timeout time.Duration) (*clientset.Clientset, error) { return clientset.NewForConfig(clientConfig) } -func getAPISClient(timeout time.Duration) (*apiregistrationclient.ApiregistrationV1Client, error) { - clientConfig, err := GetClientConfig(timeout) +func getAPISClient(timeout time.Duration, qps float32, burst int) (*apiregistrationclient.ApiregistrationV1Client, error) { + clientConfig, err := getClientConfig(timeout, qps, burst) if err != nil { return nil, err } return apiregistrationclient.NewForConfig(clientConfig) } -func getKubeVPAClient(timeout time.Duration) (vpa.Interface, error) { - clientConfig, err := GetClientConfig(timeout) +func getKubeVPAClient(timeout time.Duration, qps float32, burst int) (vpa.Interface, error) { + clientConfig, err := getClientConfig(timeout, qps, burst) if err != nil { return nil, err } @@ -286,8 +296,8 @@ func getKubeVPAClient(timeout time.Duration) (vpa.Interface, error) { return vpa.NewForConfig(clientConfig) } -func getScaleClient(discoveryCl discovery.ServerResourcesInterface, restMapper meta.RESTMapper, timeout time.Duration) (scale.ScalesGetter, error) { - clientConfig, err := GetClientConfig(timeout) +func getScaleClient(discoveryCl discovery.ServerResourcesInterface, restMapper meta.RESTMapper, timeout time.Duration, qps float32, burst int) (scale.ScalesGetter, error) { + clientConfig, err := getClientConfig(timeout, qps, burst) if err != nil { return nil, err } @@ -311,13 +321,13 @@ func (c *APIClient) GetInformerWithOptions(resyncPeriod *time.Duration, options func (c *APIClient) connect() error { var err error // Clients - c.Cl, err = GetKubeClient(c.defaultClientTimeout) + c.Cl, err = GetKubeClient(c.defaultClientTimeout, standardClientQPSLimit, standardClientQPSBurst) if err != nil { log.Infof("Could not get apiserver client: %v", err) return err } - c.DynamicCl, err = getKubeDynamicClient(c.defaultClientTimeout) + c.DynamicCl, err = getKubeDynamicClient(c.defaultClientTimeout, controllerClientQPSLimit, controllerClientQPSBurst) if err != nil { log.Infof("Could not get apiserver dynamic client: %v", err) return err @@ -328,38 +338,38 @@ func (c *APIClient) connect() error { c.RESTMapper = restmapper.NewDeferredDiscoveryRESTMapper(cachedClient) // Scale client has specific init and dependencies - c.ScaleCl, err = getScaleClient(c.Cl.Discovery(), c.RESTMapper, c.defaultClientTimeout) + c.ScaleCl, err = getScaleClient(c.Cl.Discovery(), c.RESTMapper, c.defaultClientTimeout, controllerClientQPSLimit, controllerClientQPSBurst) if err != nil { log.Infof("Could not get scale client: %v", err) return err } // Informer clients - c.InformerCl, err = GetKubeClient(c.defaultInformerTimeout) + c.InformerCl, err = GetKubeClient(c.defaultInformerTimeout, informerClientQPSLimit, informerClientQPSBurst) if err != nil { log.Infof("Could not get apiserver client: %v", err) return err } - c.DynamicInformerCl, err = getKubeDynamicClient(c.defaultInformerTimeout) + c.DynamicInformerCl, err = getKubeDynamicClient(c.defaultInformerTimeout, informerClientQPSLimit, informerClientQPSBurst) if err != nil { log.Infof("Could not get apiserver dynamic client: %v", err) return err } - c.VPAInformerClient, err = getKubeVPAClient(c.defaultInformerTimeout) + c.VPAInformerClient, err = getKubeVPAClient(c.defaultInformerTimeout, informerClientQPSLimit, informerClientQPSBurst) if err != nil { log.Infof("Could not get apiserver vpa client: %v", err) return err } - c.CRDInformerClient, err = getCRDClient(c.defaultInformerTimeout) + c.CRDInformerClient, err = getCRDClient(c.defaultInformerTimeout, informerClientQPSLimit, informerClientQPSBurst) if err != nil { log.Infof("Could not get apiserver CRDClient client: %v", err) return err } - c.APISInformerClient, err = getAPISClient(c.defaultInformerTimeout) + c.APISInformerClient, err = getAPISClient(c.defaultInformerTimeout, informerClientQPSLimit, informerClientQPSBurst) if err != nil { log.Infof("Could not get apiserver APISClient client: %v", err) return err @@ -651,7 +661,7 @@ func (c *APIClient) GetARandomNodeName(ctx context.Context) (string, error) { // RESTClient returns a new REST client func (c *APIClient) RESTClient(apiPath string, groupVersion *schema.GroupVersion, negotiatedSerializer runtime.NegotiatedSerializer) (*rest.RESTClient, error) { - clientConfig, err := GetClientConfig(c.defaultClientTimeout) + clientConfig, err := getClientConfig(c.defaultClientTimeout, standardClientQPSLimit, standardClientQPSBurst) if err != nil { return nil, err } @@ -665,7 +675,7 @@ func (c *APIClient) RESTClient(apiPath string, groupVersion *schema.GroupVersion // MetadataClient returns a new kubernetes metadata client func (c *APIClient) MetadataClient() (metadata.Interface, error) { - clientConfig, err := GetClientConfig(c.defaultInformerTimeout) + clientConfig, err := getClientConfig(c.defaultInformerTimeout, standardClientQPSLimit, standardClientQPSBurst) if err != nil { return nil, err } @@ -676,7 +686,7 @@ func (c *APIClient) MetadataClient() (metadata.Interface, error) { // NewSPDYExecutor returns a new SPDY executor for the provided method and URL func (c *APIClient) NewSPDYExecutor(apiPath string, groupVersion *schema.GroupVersion, negotiatedSerializer runtime.NegotiatedSerializer, method string, url *url.URL) (remotecommand.Executor, error) { - clientConfig, err := GetClientConfig(c.defaultClientTimeout) + clientConfig, err := getClientConfig(c.defaultClientTimeout, standardClientQPSLimit, standardClientQPSBurst) if err != nil { return nil, err } diff --git a/pkg/util/kubernetes/apiserver/apiserver_nocompile.go b/pkg/util/kubernetes/apiserver/apiserver_nocompile.go index 7a246dbee1b46..5c34f2e7105e0 100644 --- a/pkg/util/kubernetes/apiserver/apiserver_nocompile.go +++ b/pkg/util/kubernetes/apiserver/apiserver_nocompile.go @@ -19,11 +19,9 @@ import ( "github.com/DataDog/datadog-agent/pkg/util/log" ) -var ( - // ErrNotCompiled is returned if kubernetes apiserver support is not compiled in. - // User classes should handle that case as gracefully as possible. - ErrNotCompiled = errors.New("kubernetes apiserver support not compiled in") -) +// ErrNotCompiled is returned if kubernetes apiserver support is not compiled in. +// User classes should handle that case as gracefully as possible. +var ErrNotCompiled = errors.New("kubernetes apiserver support not compiled in") // APIClient provides authenticated access to the type APIClient struct { @@ -69,6 +67,6 @@ func GetNodeLabels(_ *APIClient, _ string) (map[string]string, error) { // GetKubeClient returns a Kubernetes client. // //nolint:revive // TODO(CINT) Fix revive linter -func GetKubeClient(_ time.Duration) (kubernetes.Interface, error) { +func GetKubeClient(_ time.Duration, _ float32, _ int) (kubernetes.Interface, error) { return nil, ErrNotCompiled }