diff --git a/pkg/clusteragent/clusterchecks/dangling_config.go b/pkg/clusteragent/clusterchecks/dangling_config.go index 12ee4c3410351d..6a15f0e7f268d4 100644 --- a/pkg/clusteragent/clusterchecks/dangling_config.go +++ b/pkg/clusteragent/clusterchecks/dangling_config.go @@ -8,29 +8,27 @@ package clusterchecks import ( - "time" - "github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration" ) type danglingConfigWrapper struct { config integration.Config - time time.Time + rescheduleAttempts int detectedExtendedDangling bool } -// createConfigEntry creates a new integrationConfigEntry +// createDanglingConfig creates a new integrationConfigEntry // This is used to keep track of the time a config was added to the store -func createConfigEntry(config integration.Config) *danglingConfigWrapper { +func createDanglingConfig(config integration.Config) *danglingConfigWrapper { return &danglingConfigWrapper{ config: config, - time: time.Now(), + rescheduleAttempts: 0, detectedExtendedDangling: false, } } -// isStuckScheduling returns true if the config has been in the -// store for longer than expectedScheduleTime -func (e *danglingConfigWrapper) isStuckScheduling(expectedScheduleTime time.Duration) bool { - return time.Since(e.time) > expectedScheduleTime +// isStuckScheduling returns true if the config has been attempted +// rescheduling more than attemptLimit times +func (c *danglingConfigWrapper) isStuckScheduling(attemptLimit int) bool { + return c.rescheduleAttempts > attemptLimit } diff --git a/pkg/clusteragent/clusterchecks/dispatcher_configs.go b/pkg/clusteragent/clusterchecks/dispatcher_configs.go index 614e829c3706e3..2979f16645c172 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_configs.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_configs.go @@ -28,7 +28,7 @@ func (d *dispatcher) getState() (types.StateResponse, error) { response := types.StateResponse{ Warmup: !d.store.active, - Dangling: makeConfigArrayFromEntry(d.store.danglingConfigs), + Dangling: makeConfigArrayFromDangling(d.store.danglingConfigs), } for _, node := range d.store.nodes { n := types.StateNodeResponse{ @@ -62,7 +62,7 @@ func (d *dispatcher) addConfig(config integration.Config, targetNodeName string) // Only update if it's a new dangling config if _, found := d.store.danglingConfigs[digest]; !found { danglingConfigs.Inc(le.JoinLeaderValue) - d.store.danglingConfigs[digest] = createConfigEntry(config) + d.store.danglingConfigs[digest] = createDanglingConfig(config) } return false } @@ -141,7 +141,7 @@ func (d *dispatcher) retrieveDangling() []integration.Config { d.store.RLock() defer d.store.RUnlock() - configs := makeConfigArrayFromEntry(d.store.danglingConfigs) + configs := makeConfigArrayFromDangling(d.store.danglingConfigs) return configs } @@ -151,7 +151,7 @@ func (d *dispatcher) deleteDangling(ids []string) { c := d.store.danglingConfigs[id] delete(d.store.danglingConfigs, id) danglingConfigs.Dec(le.JoinLeaderValue) - if c.detectedExtendedDangling { + if c.isStuckScheduling(d.extendedDanglingAttemptThreshold) { extendedDanglingConfigs.Dec(le.JoinLeaderValue, c.config.Name, c.config.Source) } } diff --git a/pkg/clusteragent/clusterchecks/dispatcher_main.go b/pkg/clusteragent/clusterchecks/dispatcher_main.go index aaa70f6397519f..044affefa41aff 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_main.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_main.go @@ -26,14 +26,15 @@ import ( // dispatcher holds the management logic for cluster-checks type dispatcher struct { - store *clusterStore - nodeExpirationSeconds int64 - extraTags []string - clcRunnersClient clusteragent.CLCRunnerClientInterface - advancedDispatching bool - excludedChecks map[string]struct{} - excludedChecksFromDispatching map[string]struct{} - rebalancingPeriod time.Duration + store *clusterStore + nodeExpirationSeconds int64 + extendedDanglingAttemptThreshold int + extraTags []string + clcRunnersClient clusteragent.CLCRunnerClientInterface + advancedDispatching bool + excludedChecks map[string]struct{} + excludedChecksFromDispatching map[string]struct{} + rebalancingPeriod time.Duration } func newDispatcher(tagger tagger.Component) *dispatcher { @@ -41,6 +42,7 @@ func newDispatcher(tagger tagger.Component) *dispatcher { store: newClusterStore(), } d.nodeExpirationSeconds = pkgconfigsetup.Datadog().GetInt64("cluster_checks.node_expiration_timeout") + d.extendedDanglingAttemptThreshold = pkgconfigsetup.Datadog().GetInt("cluster_checks.extended_dangling_attempt_threshold") // Attach the cluster agent's global tags to all dispatched checks // as defined in the tagger's workloadmeta collector @@ -226,27 +228,17 @@ func (d *dispatcher) run(ctx context.Context) { // Expire old nodes, orphaned configs are moved to dangling d.expireNodes() - log.Error("Clean up ticker signal received") - // Re-dispatch dangling configs if d.shouldDispatchDangling() { danglingConfigs := d.retrieveDangling() - - log.Errorf("Dangling configs to be dispatched: %d", len(danglingConfigs)) - scheduledConfigIDs := d.reschedule(danglingConfigs) - - log.Errorf("Dangling configs successfully rescheduled: %d", len(scheduledConfigIDs)) - d.store.Lock() d.deleteDangling(scheduledConfigIDs) d.store.Unlock() } - // pkg/clusteragent/clusterchecks/dispatcher_main.go:239 - // Check for configs that have been dangling longer than expected - scanExtendedDanglingConfigs(d.store, cleanUpTimeout*2) + scanExtendedDanglingConfigs(d.store, d.extendedDanglingAttemptThreshold) case <-rebalanceTicker.C: if d.advancedDispatching { d.rebalance(false) diff --git a/pkg/clusteragent/clusterchecks/dispatcher_nodes.go b/pkg/clusteragent/clusterchecks/dispatcher_nodes.go index 2876362e5000de..4e203fd9f048dc 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_nodes.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_nodes.go @@ -150,7 +150,7 @@ func (d *dispatcher) expireNodes() { for digest, config := range node.digestToConfig { delete(d.store.digestToNode, digest) log.Debugf("Adding %s:%s as a dangling Cluster Check config", config.Name, digest) - d.store.danglingConfigs[digest] = createConfigEntry(config) + d.store.danglingConfigs[digest] = createDanglingConfig(config) danglingConfigs.Inc(le.JoinLeaderValue) // TODO: Use partial label matching when it becomes available: diff --git a/pkg/clusteragent/clusterchecks/helpers.go b/pkg/clusteragent/clusterchecks/helpers.go index 02513da01f30a8..348d3456e18210 100644 --- a/pkg/clusteragent/clusterchecks/helpers.go +++ b/pkg/clusteragent/clusterchecks/helpers.go @@ -13,6 +13,8 @@ import ( "github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration" "github.com/DataDog/datadog-agent/pkg/clusteragent/clusterchecks/types" + le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics" + "github.com/DataDog/datadog-agent/pkg/util/log" ) var ( @@ -33,10 +35,10 @@ func makeConfigArray(configMap map[string]integration.Config) []integration.Conf return configSlice } -// makeConfigArrayFromEntry flattens a map of configs into a slice. Creating a new slice +// makeConfigArrayFromDangling flattens a map of configs into a slice. Creating a new slice // allows for thread-safe usage by other external, as long as the field values in // the config objects are not modified. -func makeConfigArrayFromEntry(configMap map[string]*danglingConfigWrapper) []integration.Config { +func makeConfigArrayFromDangling(configMap map[string]*danglingConfigWrapper) []integration.Config { configSlice := make([]integration.Config, 0, len(configMap)) for _, c := range configMap { configSlice = append(configSlice, c.config) @@ -83,3 +85,20 @@ func orderedKeys(m map[string]int) []string { sort.Strings(keys) return keys } + +// scanExtendedDanglingConfigs scans the store for extended dangling configs +// The attemptLimit is the number of times a reschedule is attempted before +// considering a config as extended dangling. +func scanExtendedDanglingConfigs(store *clusterStore, attemptLimit int) { + store.Lock() + defer store.Unlock() + + for _, c := range store.danglingConfigs { + c.rescheduleAttempts += 1 + if !c.detectedExtendedDangling && c.isStuckScheduling(attemptLimit) { + log.Warnf("Detected extended dangling config. Name:%s, Source:%s", c.config.Name, c.config.Source) + c.detectedExtendedDangling = true + extendedDanglingConfigs.Inc(le.JoinLeaderValue, c.config.Name, c.config.Source) + } + } +} diff --git a/pkg/clusteragent/clusterchecks/helpers_test.go b/pkg/clusteragent/clusterchecks/helpers_test.go index f28cbe317b086f..dd3ce50989eec6 100644 --- a/pkg/clusteragent/clusterchecks/helpers_test.go +++ b/pkg/clusteragent/clusterchecks/helpers_test.go @@ -10,6 +10,9 @@ package clusterchecks import ( "testing" + "github.com/stretchr/testify/assert" + + "github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration" "github.com/DataDog/datadog-agent/pkg/clusteragent/clusterchecks/types" ) @@ -151,3 +154,27 @@ func Test_calculateBusyness(t *testing.T) { }) } } + +func Test_scanExtendedDanglingConfigs(t *testing.T) { + attemptLimit := 3 + store := newClusterStore() + c1 := createDanglingConfig(integration.Config{ + Name: "config1", + Source: "source1", + }) + store.danglingConfigs[c1.config.Digest()] = c1 + + for i := 0; i < attemptLimit; i++ { + scanExtendedDanglingConfigs(store, attemptLimit) + } + + assert.Equal(t, attemptLimit, c1.rescheduleAttempts) + assert.False(t, c1.detectedExtendedDangling) + assert.False(t, c1.isStuckScheduling(attemptLimit)) + + scanExtendedDanglingConfigs(store, attemptLimit) + + assert.Equal(t, attemptLimit+1, c1.rescheduleAttempts) + assert.True(t, c1.detectedExtendedDangling) + assert.True(t, c1.isStuckScheduling(attemptLimit)) +} diff --git a/pkg/clusteragent/clusterchecks/metrics.go b/pkg/clusteragent/clusterchecks/metrics.go index 7b70e9feaedf32..6d4cccc4e016e4 100644 --- a/pkg/clusteragent/clusterchecks/metrics.go +++ b/pkg/clusteragent/clusterchecks/metrics.go @@ -8,11 +8,8 @@ package clusterchecks import ( - "time" - "github.com/DataDog/datadog-agent/pkg/telemetry" le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics" - "github.com/DataDog/datadog-agent/pkg/util/log" ) var ( @@ -56,18 +53,3 @@ var ( []string{"node", le.JoinLeaderLabel}, "Utilization predicted by the rebalance algorithm", telemetry.Options{NoDoubleUnderscoreSep: true}) ) - -func scanExtendedDanglingConfigs(store *clusterStore, expectedScheduleTime time.Duration) { - store.Lock() - defer store.Unlock() - - for _, c := range store.danglingConfigs { - if !c.detectedExtendedDangling && c.isStuckScheduling(expectedScheduleTime) { - log.Errorf("Stuck scheduling") - extendedDanglingConfigs.Inc(le.JoinLeaderValue, c.config.Name, c.config.Source) - c.detectedExtendedDangling = true - } else { - log.Errorf("Not stuck scheduling") - } - } -} diff --git a/pkg/config/setup/config.go b/pkg/config/setup/config.go index 53ef845aa8fac8..f0106212f44d0d 100644 --- a/pkg/config/setup/config.go +++ b/pkg/config/setup/config.go @@ -710,6 +710,7 @@ func InitConfig(config pkgconfigmodel.Setup) { config.BindEnvAndSetDefault("cluster_checks.enabled", false) config.BindEnvAndSetDefault("cluster_checks.node_expiration_timeout", 30) // value in seconds config.BindEnvAndSetDefault("cluster_checks.warmup_duration", 30) // value in seconds + config.BindEnvAndSetDefault("cluster_checks.extended_dangling_attempt_threshold", 3) config.BindEnvAndSetDefault("cluster_checks.cluster_tag_name", "cluster_name") config.BindEnvAndSetDefault("cluster_checks.extra_tags", []string{}) config.BindEnvAndSetDefault("cluster_checks.advanced_dispatching_enabled", false)