Skip to content

Commit

Permalink
Performance improvements of External Metrics controller and allow mul…
Browse files Browse the repository at this point in the history
…tiple workers (DataDog#31671)
  • Loading branch information
vboulineau authored Dec 4, 2024
1 parent b2fc123 commit 5393674
Show file tree
Hide file tree
Showing 12 changed files with 137 additions and 99 deletions.
4 changes: 2 additions & 2 deletions cmd/secrethelper/secret_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const (
)

// NewKubeClient returns a new kubernetes.Interface
type NewKubeClient func(timeout time.Duration) (kubernetes.Interface, error)
type NewKubeClient func(timeout time.Duration, qps float32, burst int) (kubernetes.Interface, error)

// cliParams are the command-line arguments for this subcommand
type cliParams struct {
Expand Down Expand Up @@ -175,7 +175,7 @@ func readSecretsUsingPrefixes(secretsList []string, rootPath string, newKubeClie
case filePrefix:
res[secretID] = providers.ReadSecretFile(id)
case k8sSecretPrefix:
kubeClient, err := newKubeClientFunc(10 * time.Second)
kubeClient, err := newKubeClientFunc(10*time.Second, 0, 0) // Default QPS and burst to Kube client defaults using 0
if err != nil {
res[secretID] = secrets.SecretVal{Value: "", ErrorMsg: err.Error()}
} else {
Expand Down
2 changes: 1 addition & 1 deletion cmd/secrethelper/secret_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

func TestReadSecrets(t *testing.T) {
newKubeClientFunc := func(_ time.Duration) (kubernetes.Interface, error) {
newKubeClientFunc := func(_ time.Duration, _ float32, _ int) (kubernetes.Interface, error) {
return fake.NewSimpleClientset(&v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "some_name",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import (
"time"

"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/externalmetrics/model"
le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics"
"github.com/DataDog/datadog-agent/pkg/util/log"

datadoghq "github.com/DataDog/datadog-operator/api/datadoghq/v1alpha1"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/tools/cache"
Expand All @@ -33,6 +33,15 @@ const (
ddmControllerStoreID string = "ddmc"
)

type controllerOperation string

const (
createControllerOperation controllerOperation = "create"
updateControllerOperation controllerOperation = "update"
deleteControllerOperation controllerOperation = "delete"
noopControllerOperation controllerOperation = "none"
)

var (
gvrDDM = datadoghq.GroupVersion.WithResource("datadogmetrics")
metaDDM = metav1.TypeMeta{
Expand Down Expand Up @@ -92,30 +101,37 @@ func NewDatadogMetricController(client dynamic.Interface, informer dynamicinform
}

// Run starts the controller to handle DatadogMetrics
func (c *DatadogMetricController) Run(ctx context.Context) {
func (c *DatadogMetricController) Run(ctx context.Context, numWorkers int) {
if ctx == nil {
log.Errorf("Cannot run with a nil context")
return
}
c.context = ctx

defer c.workqueue.ShutDown()

log.Infof("Starting DatadogMetric Controller (waiting for cache sync)")
if !cache.WaitForCacheSync(ctx.Done(), c.synced) {
log.Errorf("Failed to wait for DatadogMetric caches to sync")
return
}

go wait.Until(c.worker, time.Second, ctx.Done())
for i := 0; i < numWorkers; i++ {
go c.worker(i)
}

log.Infof("Started DatadogMetric Controller (cache sync finished)")
<-ctx.Done()
log.Infof("Stopping DatadogMetric Controller")
if c.isLeader() {
c.workqueue.ShutDownWithDrain()
} else {
c.workqueue.ShutDown()
}
log.Infof("DatadogMetric Controller stopped")
}

func (c *DatadogMetricController) worker() {
for c.process() {
func (c *DatadogMetricController) worker(workerID int) {
log.Debugf("Starting DatadogMetric worker: %d", workerID)
for c.process(workerID) {
}
}

Expand All @@ -135,16 +151,25 @@ func (c *DatadogMetricController) enqueueID(id, sender string) {
}
}

func (c *DatadogMetricController) process() bool {
func (c *DatadogMetricController) process(workerID int) bool {
key, shutdown := c.workqueue.Get()
if shutdown {
log.Infof("DatadogMetric Controller: Caught stop signal in workqueue")
return false
}

// We start the timer after waiting on the queue itself to have actual processing time.
startTime := time.Now()
operation := noopControllerOperation
var err error

defer func() {
reconcileElapsed.Observe(time.Since(startTime).Seconds(), string(operation), inErrorLabelValue(err), le.JoinLeaderValue)
}()

defer c.workqueue.Done(key)

err := c.processDatadogMetric(key)
operation, err = c.processDatadogMetric(workerID, key)
if err == nil {
c.workqueue.Forget(key)
} else {
Expand All @@ -158,13 +183,13 @@ func (c *DatadogMetricController) process() bool {
return true
}

func (c *DatadogMetricController) processDatadogMetric(key interface{}) error {
func (c *DatadogMetricController) processDatadogMetric(workerID int, key interface{}) (controllerOperation, error) {
datadogMetricKey := key.(string)
log.Debugf("Processing DatadogMetric: %s", datadogMetricKey)
log.Tracef("Processing DatadogMetric: %s - worker %d", datadogMetricKey, workerID)

ns, name, err := cache.SplitMetaNamespaceKey(datadogMetricKey)
if err != nil {
return fmt.Errorf("Could not split the key: %v", err)
return noopControllerOperation, fmt.Errorf("Could not split the key: %v", err)
}

datadogMetricCached := &datadoghq.DatadogMetric{}
Expand All @@ -178,34 +203,30 @@ func (c *DatadogMetricController) processDatadogMetric(key interface{}) error {
// We ignore not found here as we may need to create a DatadogMetric later
datadogMetricCached = nil
case err != nil:
return fmt.Errorf("Unable to retrieve DatadogMetric: %w", err)
case datadogMetricCached == nil:
return fmt.Errorf("Could not parse empty DatadogMetric from local cache")
return noopControllerOperation, fmt.Errorf("Unable to retrieve DatadogMetric: %w", err)
}

// No error path, check what to do with this event
if c.isLeader() {
err = c.syncDatadogMetric(ns, name, datadogMetricKey, datadogMetricCached)
if err != nil {
return err
}
return c.syncDatadogMetric(ns, name, datadogMetricKey, datadogMetricCached)
}

// Follower flow
if datadogMetricCached != nil {
// Feeding local cache with DatadogMetric information
c.store.Set(datadogMetricKey, model.NewDatadogMetricInternal(datadogMetricKey, *datadogMetricCached), ddmControllerStoreID)
setDatadogMetricTelemetry(datadogMetricCached)
} else {
if datadogMetricCached != nil {
// Feeding local cache with DatadogMetric information
c.store.Set(datadogMetricKey, model.NewDatadogMetricInternal(datadogMetricKey, *datadogMetricCached), ddmControllerStoreID)
setDatadogMetricTelemetry(datadogMetricCached)
} else {
c.store.Delete(datadogMetricKey, ddmControllerStoreID)
unsetDatadogMetricTelemetry(ns, name)
}
c.store.Delete(datadogMetricKey, ddmControllerStoreID)
unsetDatadogMetricTelemetry(ns, name)
}

return nil
return noopControllerOperation, nil
}

// Synchronize DatadogMetric state between internal store and Kubernetes objects
// Make sure any `return` has the proper store Unlock
func (c *DatadogMetricController) syncDatadogMetric(ns, name, datadogMetricKey string, datadogMetric *datadoghq.DatadogMetric) error {
func (c *DatadogMetricController) syncDatadogMetric(ns, name, datadogMetricKey string, datadogMetric *datadoghq.DatadogMetric) (controllerOperation, error) {
datadogMetricInternal := c.store.LockRead(datadogMetricKey, true)
if datadogMetricInternal == nil {
if datadogMetric != nil {
Expand All @@ -216,20 +237,20 @@ func (c *DatadogMetricController) syncDatadogMetric(ns, name, datadogMetricKey s
c.store.Unlock(datadogMetricKey)
}

return nil
return noopControllerOperation, nil
}

// If DatadogMetric object is not present in Kubernetes, we need to clear our store (removed by user) or create it (autogen)
if datadogMetric == nil {
if datadogMetricInternal.Autogen && !datadogMetricInternal.Deleted {
err := c.createDatadogMetric(ns, name, datadogMetricInternal)
c.store.Unlock(datadogMetricKey)
return err
return createControllerOperation, err
}

// Already deleted in Kube, cleaning internal store
c.store.UnlockDelete(datadogMetricKey, ddmControllerStoreID)
return nil
return noopControllerOperation, nil
}

// Objects exists in both places (local store and K8S), we need to sync them
Expand All @@ -241,20 +262,19 @@ func (c *DatadogMetricController) syncDatadogMetric(ns, name, datadogMetricKey s
c.store.Unlock(datadogMetricKey)
// We add a requeue in case the deleted event is lost
c.workqueue.AddAfter(datadogMetricKey, time.Duration(requeueDelaySeconds)*time.Second)
return c.deleteDatadogMetric(ns, name)
return deleteControllerOperation, c.deleteDatadogMetric(ns, name)
}

// After this `Unlock`, datadogMetricInternal cannot be modified
datadogMetricInternal.UpdateFrom(*datadogMetric)
defer c.store.UnlockSet(datadogMetricInternal.ID, *datadogMetricInternal, ddmControllerStoreID)
c.store.UnlockSet(datadogMetricKey, *datadogMetricInternal, ddmControllerStoreID)

if datadogMetricInternal.IsNewerThan(datadogMetric.Status) {
err := c.updateDatadogMetric(ns, name, datadogMetricInternal, datadogMetric)
if err != nil {
return err
}
return updateControllerOperation, err
}

return nil
return noopControllerOperation, nil
}

func (c *DatadogMetricController) createDatadogMetric(ns, name string, datadogMetricInternal *model.DatadogMetricInternal) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (f *fixture) runControllerSync(leader bool, datadogMetricID string, expecte
defer close(stopCh)
informer.Start(stopCh)

err := controller.processDatadogMetric(datadogMetricID)
_, err := controller.processDatadogMetric(0, datadogMetricID)
assert.Equal(f.t, expectedError, err)

actions := autoscaling.FilterInformerActions(f.client.Actions(), "datadogmetrics")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/DataDog/datadog-agent/pkg/clusteragent/autoscaling/externalmetrics/model"
"github.com/DataDog/datadog-agent/pkg/util/backoff"
le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/autoscalers"
"github.com/DataDog/datadog-agent/pkg/util/log"
)
Expand Down Expand Up @@ -59,7 +60,7 @@ func (mr *MetricsRetriever) Run(stopCh <-chan struct{}) {
if mr.isLeader() {
startTime := time.Now()
mr.retrieveMetricsValues()
retrieverElapsed.Observe(time.Since(startTime).Seconds())
retrieverElapsed.Observe(time.Since(startTime).Seconds(), le.JoinLeaderValue)
}
case <-stopCh:
log.Infof("Stopping MetricsRetriever")
Expand Down Expand Up @@ -196,7 +197,6 @@ func (mr *MetricsRetriever) retrieveMetricsValuesSlice(datadogMetrics []model.Da
}

datadogMetricFromStore.UpdateTime = currentTime

mr.store.UnlockSet(datadogMetric.ID, *datadogMetricFromStore, metricRetrieverStoreID)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ type metricsFixture struct {
expected []ddmWithQuery
}

//nolint:revive // TODO(CINT) Fix revive linter
func (f *metricsFixture) run(t *testing.T, testTime time.Time) {
func (f *metricsFixture) run(t *testing.T) {
t.Helper()

// Create and fill store
Expand Down Expand Up @@ -174,7 +173,7 @@ func TestRetrieveMetricsBasic(t *testing.T) {

for i, fixture := range fixtures {
t.Run(fmt.Sprintf("#%d %s", i, fixture.desc), func(t *testing.T) {
fixture.run(t, defaultTestTime)
fixture.run(t)
})
}
}
Expand Down Expand Up @@ -500,7 +499,7 @@ func TestRetrieveMetricsErrorCases(t *testing.T) {

for i, fixture := range fixtures {
t.Run(fmt.Sprintf("#%d %s", i, fixture.desc), func(t *testing.T) {
fixture.run(t, defaultTestTime)
fixture.run(t)
})
}
}
Expand Down Expand Up @@ -639,7 +638,7 @@ func TestRetrieveMetricsNotActive(t *testing.T) {

for i, fixture := range fixtures {
t.Run(fmt.Sprintf("#%d %s", i, fixture.desc), func(t *testing.T) {
fixture.run(t, defaultTestTime)
fixture.run(t)
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (d *DatadogMetricInternal) resolveQuery(query string) {
return
}
if resolvedQuery != "" {
log.Infof("DatadogMetric query %q was resolved successfully, new query: %q", query, resolvedQuery)
log.Debugf("DatadogMetric query %q was resolved successfully, new query: %q", query, resolvedQuery)
d.resolvedQuery = &resolvedQuery
return
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusteragent/autoscaling/externalmetrics/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func NewDatadogMetricProvider(ctx context.Context, apiCl *apiserver.APIClient, d
autogenNamespace := common.GetResourcesNamespace()
autogenEnabled := pkgconfigsetup.Datadog().GetBool("external_metrics_provider.enable_datadogmetric_autogen")
wpaEnabled := pkgconfigsetup.Datadog().GetBool("external_metrics_provider.wpa_controller")
numWorkers := pkgconfigsetup.Datadog().GetInt("external_metrics_provider.num_workers")

provider := &datadogMetricProvider{
apiCl: apiCl,
Expand Down Expand Up @@ -117,7 +118,7 @@ func NewDatadogMetricProvider(ctx context.Context, apiCl *apiserver.APIClient, d
apiCl.InformerFactory.Start(ctx.Done())

go autoscalerWatcher.Run(ctx.Done())
go controller.Run(ctx)
go controller.Run(ctx, numWorkers)

return provider, nil
}
Expand All @@ -133,7 +134,7 @@ func (p *datadogMetricProvider) GetExternalMetric(_ context.Context, namespace s
}
}

setQueryTelemtry("get", namespace, startTime, err)
setQueryTelemtry("get", startTime, err)
return res, err
}

Expand Down
Loading

0 comments on commit 5393674

Please sign in to comment.