From ed19ca8ba2ba608d94fc8a49156fcce053508112 Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Tue, 24 Dec 2024 17:05:18 +0000 Subject: [PATCH 01/10] Base: track dangling configs with time --- .../clusterchecks/dangling_config.go | 36 +++++++++++++++ .../clusterchecks/dispatcher_configs.go | 44 +++++++++++++------ .../clusterchecks/dispatcher_main.go | 35 ++++++++++++--- .../clusterchecks/dispatcher_nodes.go | 2 +- .../clusterchecks/dispatcher_test.go | 8 ++-- pkg/clusteragent/clusterchecks/helpers.go | 11 +++++ pkg/clusteragent/clusterchecks/metrics.go | 21 +++++++++ pkg/clusteragent/clusterchecks/stores.go | 6 +-- 8 files changed, 135 insertions(+), 28 deletions(-) create mode 100644 pkg/clusteragent/clusterchecks/dangling_config.go diff --git a/pkg/clusteragent/clusterchecks/dangling_config.go b/pkg/clusteragent/clusterchecks/dangling_config.go new file mode 100644 index 0000000000000..12ee4c3410351 --- /dev/null +++ b/pkg/clusteragent/clusterchecks/dangling_config.go @@ -0,0 +1,36 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build clusterchecks + +package clusterchecks + +import ( + "time" + + "github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration" +) + +type danglingConfigWrapper struct { + config integration.Config + time time.Time + detectedExtendedDangling bool +} + +// createConfigEntry creates a new integrationConfigEntry +// This is used to keep track of the time a config was added to the store +func createConfigEntry(config integration.Config) *danglingConfigWrapper { + return &danglingConfigWrapper{ + config: config, + time: time.Now(), + detectedExtendedDangling: false, + } +} + +// isStuckScheduling returns true if the config has been in the +// store for longer than expectedScheduleTime +func (e *danglingConfigWrapper) isStuckScheduling(expectedScheduleTime time.Duration) bool { + return time.Since(e.time) > expectedScheduleTime +} diff --git a/pkg/clusteragent/clusterchecks/dispatcher_configs.go b/pkg/clusteragent/clusterchecks/dispatcher_configs.go index 8d9dd4f7db0b5..614e829c3706e 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_configs.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_configs.go @@ -28,7 +28,7 @@ func (d *dispatcher) getState() (types.StateResponse, error) { response := types.StateResponse{ Warmup: !d.store.active, - Dangling: makeConfigArray(d.store.danglingConfigs), + Dangling: makeConfigArrayFromEntry(d.store.danglingConfigs), } for _, node := range d.store.nodes { n := types.StateNodeResponse{ @@ -41,7 +41,7 @@ func (d *dispatcher) getState() (types.StateResponse, error) { return response, nil } -func (d *dispatcher) addConfig(config integration.Config, targetNodeName string) { +func (d *dispatcher) addConfig(config integration.Config, targetNodeName string) bool { d.store.Lock() defer d.store.Unlock() @@ -58,10 +58,13 @@ func (d *dispatcher) addConfig(config integration.Config, targetNodeName string) } // No target node specified: store in danglingConfigs - if targetNodeName == "" { - danglingConfigs.Inc(le.JoinLeaderValue) - d.store.danglingConfigs[digest] = config - return + if targetNodeName == "" || true { + // Only update if it's a new dangling config + if _, found := d.store.danglingConfigs[digest]; !found { + danglingConfigs.Inc(le.JoinLeaderValue) + d.store.danglingConfigs[digest] = createConfigEntry(config) + } + return false } currentNode, foundCurrent := d.store.getNodeStore(d.store.digestToNode[digest]) @@ -82,6 +85,8 @@ func (d *dispatcher) addConfig(config integration.Config, targetNodeName string) currentNode.removeConfig(digest) currentNode.Unlock() } + + return true } func (d *dispatcher) removeConfig(digest string) { @@ -94,7 +99,7 @@ func (d *dispatcher) removeConfig(digest string) { delete(d.store.digestToNode, digest) delete(d.store.digestToConfig, digest) - delete(d.store.danglingConfigs, digest) + d.deleteDangling([]string{digest}) // This is a list because each instance in a config has its own check ID and // all of them need to be deleted. @@ -131,16 +136,27 @@ func (d *dispatcher) shouldDispatchDangling() bool { return len(d.store.danglingConfigs) > 0 && len(d.store.nodes) > 0 } -// retrieveAndClearDangling extracts dangling configs from the store -func (d *dispatcher) retrieveAndClearDangling() []integration.Config { - d.store.Lock() - defer d.store.Unlock() - configs := makeConfigArray(d.store.danglingConfigs) - d.store.clearDangling() - danglingConfigs.Set(0, le.JoinLeaderValue) +// retrieveDangling extracts dangling configs from the store +func (d *dispatcher) retrieveDangling() []integration.Config { + d.store.RLock() + defer d.store.RUnlock() + + configs := makeConfigArrayFromEntry(d.store.danglingConfigs) return configs } +// deleteDangling clears the dangling configs from the store +func (d *dispatcher) deleteDangling(ids []string) { + for _, id := range ids { + c := d.store.danglingConfigs[id] + delete(d.store.danglingConfigs, id) + danglingConfigs.Dec(le.JoinLeaderValue) + if c.detectedExtendedDangling { + extendedDanglingConfigs.Dec(le.JoinLeaderValue, c.config.Name, c.config.Source) + } + } +} + // patchConfiguration transforms the configuration from AD into a config // ready to use by node agents. It does the following changes: // - empty the ADIdentifiers array, to avoid node-agents detecting them as templates diff --git a/pkg/clusteragent/clusterchecks/dispatcher_main.go b/pkg/clusteragent/clusterchecks/dispatcher_main.go index 6f1a894f4e354..aaa70f6397519 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_main.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_main.go @@ -162,15 +162,19 @@ func (d *dispatcher) Unschedule(configs []integration.Config) { } // reschdule sends configurations to dispatching without checking or patching them as Schedule does. -func (d *dispatcher) reschedule(configs []integration.Config) { +func (d *dispatcher) reschedule(configs []integration.Config) []string { + addedConfigIDs := make([]string, 0, len(configs)) for _, c := range configs { log.Debugf("Rescheduling the check %s:%s", c.Name, c.Digest()) - d.add(c) + if d.add(c) { + addedConfigIDs = append(addedConfigIDs, c.Digest()) + } } + return addedConfigIDs } // add stores and delegates a given configuration -func (d *dispatcher) add(config integration.Config) { +func (d *dispatcher) add(config integration.Config) bool { target := d.getNodeToScheduleCheck() if target == "" { // If no node is found, store it in the danglingConfigs map for retrying later. @@ -179,7 +183,7 @@ func (d *dispatcher) add(config integration.Config) { log.Infof("Dispatching configuration %s:%s to node %s", config.Name, config.Digest(), target) } - d.addConfig(config, target) + return d.addConfig(config, target) } // remove deletes a given configuration @@ -205,7 +209,8 @@ func (d *dispatcher) run(ctx context.Context) { healthProbe := health.RegisterLiveness("clusterchecks-dispatch") defer health.Deregister(healthProbe) //nolint:errcheck - cleanupTicker := time.NewTicker(time.Duration(d.nodeExpirationSeconds/2) * time.Second) + cleanUpTimeout := time.Duration(d.nodeExpirationSeconds/2) * time.Second + cleanupTicker := time.NewTicker(cleanUpTimeout) defer cleanupTicker.Stop() rebalanceTicker := time.NewTicker(d.rebalancingPeriod) @@ -221,11 +226,27 @@ func (d *dispatcher) run(ctx context.Context) { // Expire old nodes, orphaned configs are moved to dangling d.expireNodes() + log.Error("Clean up ticker signal received") + // Re-dispatch dangling configs if d.shouldDispatchDangling() { - danglingConfs := d.retrieveAndClearDangling() - d.reschedule(danglingConfs) + danglingConfigs := d.retrieveDangling() + + log.Errorf("Dangling configs to be dispatched: %d", len(danglingConfigs)) + + scheduledConfigIDs := d.reschedule(danglingConfigs) + + log.Errorf("Dangling configs successfully rescheduled: %d", len(scheduledConfigIDs)) + + d.store.Lock() + d.deleteDangling(scheduledConfigIDs) + d.store.Unlock() } + + // pkg/clusteragent/clusterchecks/dispatcher_main.go:239 + + // Check for configs that have been dangling longer than expected + scanExtendedDanglingConfigs(d.store, cleanUpTimeout*2) case <-rebalanceTicker.C: if d.advancedDispatching { d.rebalance(false) diff --git a/pkg/clusteragent/clusterchecks/dispatcher_nodes.go b/pkg/clusteragent/clusterchecks/dispatcher_nodes.go index 2e7dd891ab912..2876362e5000d 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_nodes.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_nodes.go @@ -150,7 +150,7 @@ func (d *dispatcher) expireNodes() { for digest, config := range node.digestToConfig { delete(d.store.digestToNode, digest) log.Debugf("Adding %s:%s as a dangling Cluster Check config", config.Name, digest) - d.store.danglingConfigs[digest] = config + d.store.danglingConfigs[digest] = createConfigEntry(config) danglingConfigs.Inc(le.JoinLeaderValue) // TODO: Use partial label matching when it becomes available: diff --git a/pkg/clusteragent/clusterchecks/dispatcher_test.go b/pkg/clusteragent/clusterchecks/dispatcher_test.go index ff348a025ed58..4aac1fd6124fe 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_test.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_test.go @@ -348,9 +348,10 @@ func TestRescheduleDanglingFromExpiredNodes(t *testing.T) { // Ensure we have 1 dangling to schedule, as new available node is registered assert.True(t, dispatcher.shouldDispatchDangling()) - configs := dispatcher.retrieveAndClearDangling() + configs := dispatcher.retrieveDangling() // Assert the check is scheduled - dispatcher.reschedule(configs) + scheduledIDs := dispatcher.reschedule(configs) + dispatcher.deleteDangling(scheduledIDs) danglingConfig, err := dispatcher.getAllConfigs() assert.NoError(t, err) assert.Equal(t, 1, len(danglingConfig)) @@ -423,7 +424,8 @@ func TestDanglingConfig(t *testing.T) { assert.True(t, dispatcher.shouldDispatchDangling()) // get the danglings and make sure they are removed from the store - configs := dispatcher.retrieveAndClearDangling() + configs := dispatcher.retrieveDangling() + dispatcher.deleteDangling([]string{config.Digest()}) assert.Len(t, configs, 1) assert.Equal(t, 0, len(dispatcher.store.danglingConfigs)) } diff --git a/pkg/clusteragent/clusterchecks/helpers.go b/pkg/clusteragent/clusterchecks/helpers.go index 8ddddc7187215..02513da01f30a 100644 --- a/pkg/clusteragent/clusterchecks/helpers.go +++ b/pkg/clusteragent/clusterchecks/helpers.go @@ -33,6 +33,17 @@ func makeConfigArray(configMap map[string]integration.Config) []integration.Conf return configSlice } +// makeConfigArrayFromEntry flattens a map of configs into a slice. Creating a new slice +// allows for thread-safe usage by other external, as long as the field values in +// the config objects are not modified. +func makeConfigArrayFromEntry(configMap map[string]*danglingConfigWrapper) []integration.Config { + configSlice := make([]integration.Config, 0, len(configMap)) + for _, c := range configMap { + configSlice = append(configSlice, c.config) + } + return configSlice +} + func timestampNowNano() int64 { return time.Now().UnixNano() } diff --git a/pkg/clusteragent/clusterchecks/metrics.go b/pkg/clusteragent/clusterchecks/metrics.go index 92540e44fabad..7b70e9feaedf3 100644 --- a/pkg/clusteragent/clusterchecks/metrics.go +++ b/pkg/clusteragent/clusterchecks/metrics.go @@ -8,8 +8,11 @@ package clusterchecks import ( + "time" + "github.com/DataDog/datadog-agent/pkg/telemetry" le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics" + "github.com/DataDog/datadog-agent/pkg/util/log" ) var ( @@ -19,6 +22,9 @@ var ( danglingConfigs = telemetry.NewGaugeWithOpts("cluster_checks", "configs_dangling", []string{le.JoinLeaderLabel}, "Number of check configurations not dispatched.", telemetry.Options{NoDoubleUnderscoreSep: true}) + extendedDanglingConfigs = telemetry.NewGaugeWithOpts("cluster_checks", "configs_extended_dangling", + []string{le.JoinLeaderLabel, "config_name", "config_source"}, "Number of check configurations not dispatched, for extended number of scheduling attempts.", + telemetry.Options{NoDoubleUnderscoreSep: true}) dispatchedConfigs = telemetry.NewGaugeWithOpts("cluster_checks", "configs_dispatched", []string{"node", le.JoinLeaderLabel}, "Number of check configurations dispatched, by node.", telemetry.Options{NoDoubleUnderscoreSep: true}) @@ -50,3 +56,18 @@ var ( []string{"node", le.JoinLeaderLabel}, "Utilization predicted by the rebalance algorithm", telemetry.Options{NoDoubleUnderscoreSep: true}) ) + +func scanExtendedDanglingConfigs(store *clusterStore, expectedScheduleTime time.Duration) { + store.Lock() + defer store.Unlock() + + for _, c := range store.danglingConfigs { + if !c.detectedExtendedDangling && c.isStuckScheduling(expectedScheduleTime) { + log.Errorf("Stuck scheduling") + extendedDanglingConfigs.Inc(le.JoinLeaderValue, c.config.Name, c.config.Source) + c.detectedExtendedDangling = true + } else { + log.Errorf("Not stuck scheduling") + } + } +} diff --git a/pkg/clusteragent/clusterchecks/stores.go b/pkg/clusteragent/clusterchecks/stores.go index 285e024c0e074..da73ea7534cfa 100644 --- a/pkg/clusteragent/clusterchecks/stores.go +++ b/pkg/clusteragent/clusterchecks/stores.go @@ -27,7 +27,7 @@ type clusterStore struct { digestToConfig map[string]integration.Config // All configurations to dispatch digestToNode map[string]string // Node running a config nodes map[string]*nodeStore // All nodes known to the cluster-agent - danglingConfigs map[string]integration.Config // Configs we could not dispatch to any node + danglingConfigs map[string]*danglingConfigWrapper // Configs we could not dispatch to any node endpointsConfigs map[string]map[string]integration.Config // Endpoints configs to be consumed by node agents idToDigest map[checkid.ID]string // link check IDs to check configs } @@ -44,7 +44,7 @@ func (s *clusterStore) reset() { s.digestToConfig = make(map[string]integration.Config) s.digestToNode = make(map[string]string) s.nodes = make(map[string]*nodeStore) - s.danglingConfigs = make(map[string]integration.Config) + s.danglingConfigs = make(map[string]*danglingConfigWrapper) s.endpointsConfigs = make(map[string]map[string]integration.Config) s.idToDigest = make(map[checkid.ID]string) } @@ -74,7 +74,7 @@ func (s *clusterStore) getOrCreateNodeStore(nodeName, clientIP string) *nodeStor // clearDangling resets the danglingConfigs map to a new empty one func (s *clusterStore) clearDangling() { - s.danglingConfigs = make(map[string]integration.Config) + s.danglingConfigs = make(map[string]*danglingConfigWrapper) } // nodeStore holds the state store for one node. From e7b7ccbf7f7fb29f31e9bae410b36c40fa8e807d Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Tue, 24 Dec 2024 18:41:13 +0000 Subject: [PATCH 02/10] Convert to counting number of reschedule attempts --- .../clusterchecks/dangling_config.go | 20 +++++------ .../clusterchecks/dispatcher_configs.go | 6 ++-- .../clusterchecks/dispatcher_main.go | 33 +++++++------------ .../clusterchecks/dispatcher_nodes.go | 2 +- pkg/clusteragent/clusterchecks/helpers.go | 23 +++++++++++-- .../clusterchecks/helpers_test.go | 27 +++++++++++++++ pkg/clusteragent/clusterchecks/metrics.go | 18 ---------- pkg/config/setup/config.go | 1 + 8 files changed, 74 insertions(+), 56 deletions(-) diff --git a/pkg/clusteragent/clusterchecks/dangling_config.go b/pkg/clusteragent/clusterchecks/dangling_config.go index 12ee4c3410351..8df244ea2190b 100644 --- a/pkg/clusteragent/clusterchecks/dangling_config.go +++ b/pkg/clusteragent/clusterchecks/dangling_config.go @@ -8,29 +8,27 @@ package clusterchecks import ( - "time" - "github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration" ) type danglingConfigWrapper struct { config integration.Config - time time.Time + rescheduleAttempts int detectedExtendedDangling bool } -// createConfigEntry creates a new integrationConfigEntry -// This is used to keep track of the time a config was added to the store -func createConfigEntry(config integration.Config) *danglingConfigWrapper { +// createDanglingConfig creates a new danglingConfigWrapper +// This is used to keep track of the lifecycle of a dangling config +func createDanglingConfig(config integration.Config) *danglingConfigWrapper { return &danglingConfigWrapper{ config: config, - time: time.Now(), + rescheduleAttempts: 0, detectedExtendedDangling: false, } } -// isStuckScheduling returns true if the config has been in the -// store for longer than expectedScheduleTime -func (e *danglingConfigWrapper) isStuckScheduling(expectedScheduleTime time.Duration) bool { - return time.Since(e.time) > expectedScheduleTime +// isStuckScheduling returns true if the config has been attempted +// rescheduling more than attemptLimit times +func (c *danglingConfigWrapper) isStuckScheduling(attemptLimit int) bool { + return c.rescheduleAttempts > attemptLimit } diff --git a/pkg/clusteragent/clusterchecks/dispatcher_configs.go b/pkg/clusteragent/clusterchecks/dispatcher_configs.go index 614e829c3706e..ed70b709b0ffd 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_configs.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_configs.go @@ -28,7 +28,7 @@ func (d *dispatcher) getState() (types.StateResponse, error) { response := types.StateResponse{ Warmup: !d.store.active, - Dangling: makeConfigArrayFromEntry(d.store.danglingConfigs), + Dangling: makeConfigArrayFromDangling(d.store.danglingConfigs), } for _, node := range d.store.nodes { n := types.StateNodeResponse{ @@ -62,7 +62,7 @@ func (d *dispatcher) addConfig(config integration.Config, targetNodeName string) // Only update if it's a new dangling config if _, found := d.store.danglingConfigs[digest]; !found { danglingConfigs.Inc(le.JoinLeaderValue) - d.store.danglingConfigs[digest] = createConfigEntry(config) + d.store.danglingConfigs[digest] = createDanglingConfig(config) } return false } @@ -141,7 +141,7 @@ func (d *dispatcher) retrieveDangling() []integration.Config { d.store.RLock() defer d.store.RUnlock() - configs := makeConfigArrayFromEntry(d.store.danglingConfigs) + configs := makeConfigArrayFromDangling(d.store.danglingConfigs) return configs } diff --git a/pkg/clusteragent/clusterchecks/dispatcher_main.go b/pkg/clusteragent/clusterchecks/dispatcher_main.go index aaa70f6397519..7d83d508c9132 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_main.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_main.go @@ -26,14 +26,15 @@ import ( // dispatcher holds the management logic for cluster-checks type dispatcher struct { - store *clusterStore - nodeExpirationSeconds int64 - extraTags []string - clcRunnersClient clusteragent.CLCRunnerClientInterface - advancedDispatching bool - excludedChecks map[string]struct{} - excludedChecksFromDispatching map[string]struct{} - rebalancingPeriod time.Duration + store *clusterStore + nodeExpirationSeconds int64 + extendedDanglingAttemptThreshold int + extraTags []string + clcRunnersClient clusteragent.CLCRunnerClientInterface + advancedDispatching bool + excludedChecks map[string]struct{} + excludedChecksFromDispatching map[string]struct{} + rebalancingPeriod time.Duration } func newDispatcher(tagger tagger.Component) *dispatcher { @@ -41,6 +42,7 @@ func newDispatcher(tagger tagger.Component) *dispatcher { store: newClusterStore(), } d.nodeExpirationSeconds = pkgconfigsetup.Datadog().GetInt64("cluster_checks.node_expiration_timeout") + d.extendedDanglingAttemptThreshold = pkgconfigsetup.Datadog().GetInt("cluster_checks.extended_dangling_attempt_threshold") // Attach the cluster agent's global tags to all dispatched checks // as defined in the tagger's workloadmeta collector @@ -209,8 +211,7 @@ func (d *dispatcher) run(ctx context.Context) { healthProbe := health.RegisterLiveness("clusterchecks-dispatch") defer health.Deregister(healthProbe) //nolint:errcheck - cleanUpTimeout := time.Duration(d.nodeExpirationSeconds/2) * time.Second - cleanupTicker := time.NewTicker(cleanUpTimeout) + cleanupTicker := time.NewTicker(time.Duration(d.nodeExpirationSeconds/2) * time.Second) defer cleanupTicker.Stop() rebalanceTicker := time.NewTicker(d.rebalancingPeriod) @@ -226,27 +227,17 @@ func (d *dispatcher) run(ctx context.Context) { // Expire old nodes, orphaned configs are moved to dangling d.expireNodes() - log.Error("Clean up ticker signal received") - // Re-dispatch dangling configs if d.shouldDispatchDangling() { danglingConfigs := d.retrieveDangling() - - log.Errorf("Dangling configs to be dispatched: %d", len(danglingConfigs)) - scheduledConfigIDs := d.reschedule(danglingConfigs) - - log.Errorf("Dangling configs successfully rescheduled: %d", len(scheduledConfigIDs)) - d.store.Lock() d.deleteDangling(scheduledConfigIDs) d.store.Unlock() } - // pkg/clusteragent/clusterchecks/dispatcher_main.go:239 - // Check for configs that have been dangling longer than expected - scanExtendedDanglingConfigs(d.store, cleanUpTimeout*2) + scanExtendedDanglingConfigs(d.store, d.extendedDanglingAttemptThreshold) case <-rebalanceTicker.C: if d.advancedDispatching { d.rebalance(false) diff --git a/pkg/clusteragent/clusterchecks/dispatcher_nodes.go b/pkg/clusteragent/clusterchecks/dispatcher_nodes.go index 2876362e5000d..4e203fd9f048d 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_nodes.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_nodes.go @@ -150,7 +150,7 @@ func (d *dispatcher) expireNodes() { for digest, config := range node.digestToConfig { delete(d.store.digestToNode, digest) log.Debugf("Adding %s:%s as a dangling Cluster Check config", config.Name, digest) - d.store.danglingConfigs[digest] = createConfigEntry(config) + d.store.danglingConfigs[digest] = createDanglingConfig(config) danglingConfigs.Inc(le.JoinLeaderValue) // TODO: Use partial label matching when it becomes available: diff --git a/pkg/clusteragent/clusterchecks/helpers.go b/pkg/clusteragent/clusterchecks/helpers.go index 02513da01f30a..348d3456e1821 100644 --- a/pkg/clusteragent/clusterchecks/helpers.go +++ b/pkg/clusteragent/clusterchecks/helpers.go @@ -13,6 +13,8 @@ import ( "github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration" "github.com/DataDog/datadog-agent/pkg/clusteragent/clusterchecks/types" + le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics" + "github.com/DataDog/datadog-agent/pkg/util/log" ) var ( @@ -33,10 +35,10 @@ func makeConfigArray(configMap map[string]integration.Config) []integration.Conf return configSlice } -// makeConfigArrayFromEntry flattens a map of configs into a slice. Creating a new slice +// makeConfigArrayFromDangling flattens a map of configs into a slice. Creating a new slice // allows for thread-safe usage by other external, as long as the field values in // the config objects are not modified. -func makeConfigArrayFromEntry(configMap map[string]*danglingConfigWrapper) []integration.Config { +func makeConfigArrayFromDangling(configMap map[string]*danglingConfigWrapper) []integration.Config { configSlice := make([]integration.Config, 0, len(configMap)) for _, c := range configMap { configSlice = append(configSlice, c.config) @@ -83,3 +85,20 @@ func orderedKeys(m map[string]int) []string { sort.Strings(keys) return keys } + +// scanExtendedDanglingConfigs scans the store for extended dangling configs +// The attemptLimit is the number of times a reschedule is attempted before +// considering a config as extended dangling. +func scanExtendedDanglingConfigs(store *clusterStore, attemptLimit int) { + store.Lock() + defer store.Unlock() + + for _, c := range store.danglingConfigs { + c.rescheduleAttempts += 1 + if !c.detectedExtendedDangling && c.isStuckScheduling(attemptLimit) { + log.Warnf("Detected extended dangling config. Name:%s, Source:%s", c.config.Name, c.config.Source) + c.detectedExtendedDangling = true + extendedDanglingConfigs.Inc(le.JoinLeaderValue, c.config.Name, c.config.Source) + } + } +} diff --git a/pkg/clusteragent/clusterchecks/helpers_test.go b/pkg/clusteragent/clusterchecks/helpers_test.go index f28cbe317b086..dd3ce50989eec 100644 --- a/pkg/clusteragent/clusterchecks/helpers_test.go +++ b/pkg/clusteragent/clusterchecks/helpers_test.go @@ -10,6 +10,9 @@ package clusterchecks import ( "testing" + "github.com/stretchr/testify/assert" + + "github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration" "github.com/DataDog/datadog-agent/pkg/clusteragent/clusterchecks/types" ) @@ -151,3 +154,27 @@ func Test_calculateBusyness(t *testing.T) { }) } } + +func Test_scanExtendedDanglingConfigs(t *testing.T) { + attemptLimit := 3 + store := newClusterStore() + c1 := createDanglingConfig(integration.Config{ + Name: "config1", + Source: "source1", + }) + store.danglingConfigs[c1.config.Digest()] = c1 + + for i := 0; i < attemptLimit; i++ { + scanExtendedDanglingConfigs(store, attemptLimit) + } + + assert.Equal(t, attemptLimit, c1.rescheduleAttempts) + assert.False(t, c1.detectedExtendedDangling) + assert.False(t, c1.isStuckScheduling(attemptLimit)) + + scanExtendedDanglingConfigs(store, attemptLimit) + + assert.Equal(t, attemptLimit+1, c1.rescheduleAttempts) + assert.True(t, c1.detectedExtendedDangling) + assert.True(t, c1.isStuckScheduling(attemptLimit)) +} diff --git a/pkg/clusteragent/clusterchecks/metrics.go b/pkg/clusteragent/clusterchecks/metrics.go index 7b70e9feaedf3..6d4cccc4e016e 100644 --- a/pkg/clusteragent/clusterchecks/metrics.go +++ b/pkg/clusteragent/clusterchecks/metrics.go @@ -8,11 +8,8 @@ package clusterchecks import ( - "time" - "github.com/DataDog/datadog-agent/pkg/telemetry" le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics" - "github.com/DataDog/datadog-agent/pkg/util/log" ) var ( @@ -56,18 +53,3 @@ var ( []string{"node", le.JoinLeaderLabel}, "Utilization predicted by the rebalance algorithm", telemetry.Options{NoDoubleUnderscoreSep: true}) ) - -func scanExtendedDanglingConfigs(store *clusterStore, expectedScheduleTime time.Duration) { - store.Lock() - defer store.Unlock() - - for _, c := range store.danglingConfigs { - if !c.detectedExtendedDangling && c.isStuckScheduling(expectedScheduleTime) { - log.Errorf("Stuck scheduling") - extendedDanglingConfigs.Inc(le.JoinLeaderValue, c.config.Name, c.config.Source) - c.detectedExtendedDangling = true - } else { - log.Errorf("Not stuck scheduling") - } - } -} diff --git a/pkg/config/setup/config.go b/pkg/config/setup/config.go index 53ef845aa8fac..f0106212f44d0 100644 --- a/pkg/config/setup/config.go +++ b/pkg/config/setup/config.go @@ -710,6 +710,7 @@ func InitConfig(config pkgconfigmodel.Setup) { config.BindEnvAndSetDefault("cluster_checks.enabled", false) config.BindEnvAndSetDefault("cluster_checks.node_expiration_timeout", 30) // value in seconds config.BindEnvAndSetDefault("cluster_checks.warmup_duration", 30) // value in seconds + config.BindEnvAndSetDefault("cluster_checks.extended_dangling_attempt_threshold", 3) config.BindEnvAndSetDefault("cluster_checks.cluster_tag_name", "cluster_name") config.BindEnvAndSetDefault("cluster_checks.extra_tags", []string{}) config.BindEnvAndSetDefault("cluster_checks.advanced_dispatching_enabled", false) From 84eebd1dbfb39ab2b87376fc2155a00bee6f1094 Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Thu, 26 Dec 2024 14:43:52 +0000 Subject: [PATCH 03/10] Include count of extended dangling configs in agent status --- pkg/clusteragent/clusterchecks/stats.go | 19 +++++++++++++------ pkg/clusteragent/clusterchecks/types/types.go | 15 ++++++++------- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/pkg/clusteragent/clusterchecks/stats.go b/pkg/clusteragent/clusterchecks/stats.go index 52d57243f60ca..3a5de893187f4 100644 --- a/pkg/clusteragent/clusterchecks/stats.go +++ b/pkg/clusteragent/clusterchecks/stats.go @@ -60,12 +60,19 @@ func (d *dispatcher) getStats() *types.Stats { for _, m := range d.store.digestToConfig { checkNames[m.Name] = struct{}{} } + extendedDanglingConfigs := 0 + for _, c := range d.store.danglingConfigs { + if c.detectedExtendedDangling { + extendedDanglingConfigs++ + } + } return &types.Stats{ - Active: d.store.active, - NodeCount: len(d.store.nodes), - ActiveConfigs: len(d.store.digestToNode), - DanglingConfigs: len(d.store.danglingConfigs), - TotalConfigs: len(d.store.digestToConfig), - CheckNames: checkNames, + Active: d.store.active, + NodeCount: len(d.store.nodes), + ActiveConfigs: len(d.store.digestToNode), + DanglingConfigs: len(d.store.danglingConfigs), + ExtendedDanglingConfigs: extendedDanglingConfigs, + TotalConfigs: len(d.store.digestToConfig), + CheckNames: checkNames, } } diff --git a/pkg/clusteragent/clusterchecks/types/types.go b/pkg/clusteragent/clusterchecks/types/types.go index 352709a556e41..85d7991210b44 100644 --- a/pkg/clusteragent/clusterchecks/types/types.go +++ b/pkg/clusteragent/clusterchecks/types/types.go @@ -74,13 +74,14 @@ type Stats struct { LeaderIP string // Leading - Leader bool - Active bool - NodeCount int - ActiveConfigs int - DanglingConfigs int - TotalConfigs int - CheckNames map[string]struct{} + Leader bool + Active bool + NodeCount int + ActiveConfigs int + DanglingConfigs int + ExtendedDanglingConfigs int + TotalConfigs int + CheckNames map[string]struct{} } // LeaderIPCallback describes the leader-election method we From 3c6c2f323cf21de8b185cec3fb3d2d3663050055 Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Thu, 26 Dec 2024 15:20:40 +0000 Subject: [PATCH 04/10] safe dangling deletes --- pkg/clusteragent/clusterchecks/dispatcher_configs.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/clusteragent/clusterchecks/dispatcher_configs.go b/pkg/clusteragent/clusterchecks/dispatcher_configs.go index ed70b709b0ffd..8a523902160bf 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_configs.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_configs.go @@ -148,11 +148,12 @@ func (d *dispatcher) retrieveDangling() []integration.Config { // deleteDangling clears the dangling configs from the store func (d *dispatcher) deleteDangling(ids []string) { for _, id := range ids { - c := d.store.danglingConfigs[id] - delete(d.store.danglingConfigs, id) - danglingConfigs.Dec(le.JoinLeaderValue) - if c.detectedExtendedDangling { - extendedDanglingConfigs.Dec(le.JoinLeaderValue, c.config.Name, c.config.Source) + if c, found := d.store.danglingConfigs[id]; found { + delete(d.store.danglingConfigs, id) + danglingConfigs.Dec(le.JoinLeaderValue) + if c.detectedExtendedDangling { + extendedDanglingConfigs.Dec(le.JoinLeaderValue, c.config.Name, c.config.Source) + } } } } From d120b9228d4fe62b7c6ce47f485c2491e39448ec Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Thu, 26 Dec 2024 15:35:49 +0000 Subject: [PATCH 05/10] Disable addConfig force failure --- pkg/clusteragent/clusterchecks/dispatcher_configs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/clusteragent/clusterchecks/dispatcher_configs.go b/pkg/clusteragent/clusterchecks/dispatcher_configs.go index 8a523902160bf..335ed75426258 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_configs.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_configs.go @@ -58,7 +58,7 @@ func (d *dispatcher) addConfig(config integration.Config, targetNodeName string) } // No target node specified: store in danglingConfigs - if targetNodeName == "" || true { + if targetNodeName == "" { // Only update if it's a new dangling config if _, found := d.store.danglingConfigs[digest]; !found { danglingConfigs.Inc(le.JoinLeaderValue) From 7c4d85eadd9d964428569be07bfe527de0c3df85 Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Thu, 26 Dec 2024 15:40:25 +0000 Subject: [PATCH 06/10] Lint fix on +=1 --- pkg/clusteragent/clusterchecks/helpers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/clusteragent/clusterchecks/helpers.go b/pkg/clusteragent/clusterchecks/helpers.go index 348d3456e1821..9e99e578ea8f4 100644 --- a/pkg/clusteragent/clusterchecks/helpers.go +++ b/pkg/clusteragent/clusterchecks/helpers.go @@ -94,7 +94,7 @@ func scanExtendedDanglingConfigs(store *clusterStore, attemptLimit int) { defer store.Unlock() for _, c := range store.danglingConfigs { - c.rescheduleAttempts += 1 + c.rescheduleAttempts++ if !c.detectedExtendedDangling && c.isStuckScheduling(attemptLimit) { log.Warnf("Detected extended dangling config. Name:%s, Source:%s", c.config.Name, c.config.Source) c.detectedExtendedDangling = true From eea7b33625d932638383b53733d225634efe6af5 Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Thu, 26 Dec 2024 17:29:51 +0000 Subject: [PATCH 07/10] Revert back to checking on time: base impl --- .../clusterchecks/dangling_config.go | 22 +++++++------ .../clusterchecks/dispatcher_configs.go | 4 +-- .../clusterchecks/dispatcher_main.go | 33 ++++++++++++++++--- pkg/clusteragent/clusterchecks/helpers.go | 19 ----------- .../clusterchecks/helpers_test.go | 27 --------------- pkg/clusteragent/clusterchecks/metrics.go | 4 +-- pkg/clusteragent/clusterchecks/stats.go | 20 +++++------ pkg/clusteragent/clusterchecks/types/types.go | 16 ++++----- pkg/config/setup/config.go | 6 ++-- 9 files changed, 66 insertions(+), 85 deletions(-) diff --git a/pkg/clusteragent/clusterchecks/dangling_config.go b/pkg/clusteragent/clusterchecks/dangling_config.go index 8df244ea2190b..54d6976401672 100644 --- a/pkg/clusteragent/clusterchecks/dangling_config.go +++ b/pkg/clusteragent/clusterchecks/dangling_config.go @@ -8,27 +8,29 @@ package clusterchecks import ( + "time" + "github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration" ) type danglingConfigWrapper struct { - config integration.Config - rescheduleAttempts int - detectedExtendedDangling bool + config integration.Config + timeCreated time.Time + unscheduledCheck bool } // createDanglingConfig creates a new danglingConfigWrapper // This is used to keep track of the lifecycle of a dangling config func createDanglingConfig(config integration.Config) *danglingConfigWrapper { return &danglingConfigWrapper{ - config: config, - rescheduleAttempts: 0, - detectedExtendedDangling: false, + config: config, + timeCreated: time.Now(), + unscheduledCheck: false, } } -// isStuckScheduling returns true if the config has been attempted -// rescheduling more than attemptLimit times -func (c *danglingConfigWrapper) isStuckScheduling(attemptLimit int) bool { - return c.rescheduleAttempts > attemptLimit +// isStuckScheduling returns true if the config has been in the store +// for longer than the unscheduledCheckThresholdSeconds +func (c *danglingConfigWrapper) isStuckScheduling(unscheduledCheckThresholdSeconds int64) bool { + return time.Since(c.timeCreated).Seconds() > float64(unscheduledCheckThresholdSeconds) } diff --git a/pkg/clusteragent/clusterchecks/dispatcher_configs.go b/pkg/clusteragent/clusterchecks/dispatcher_configs.go index 335ed75426258..f6769461090e5 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_configs.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_configs.go @@ -151,8 +151,8 @@ func (d *dispatcher) deleteDangling(ids []string) { if c, found := d.store.danglingConfigs[id]; found { delete(d.store.danglingConfigs, id) danglingConfigs.Dec(le.JoinLeaderValue) - if c.detectedExtendedDangling { - extendedDanglingConfigs.Dec(le.JoinLeaderValue, c.config.Name, c.config.Source) + if c.unscheduledCheck { + unscheduledCheck.Dec(le.JoinLeaderValue, c.config.Name, c.config.Source) } } } diff --git a/pkg/clusteragent/clusterchecks/dispatcher_main.go b/pkg/clusteragent/clusterchecks/dispatcher_main.go index 7d83d508c9132..e607ca5934143 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_main.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_main.go @@ -20,6 +20,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/util/clusteragent" "github.com/DataDog/datadog-agent/pkg/util/containers" "github.com/DataDog/datadog-agent/pkg/util/hostname" + le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics" "github.com/DataDog/datadog-agent/pkg/util/kubernetes/clustername" "github.com/DataDog/datadog-agent/pkg/util/log" ) @@ -28,7 +29,7 @@ import ( type dispatcher struct { store *clusterStore nodeExpirationSeconds int64 - extendedDanglingAttemptThreshold int + unscheduledCheckThresholdSeconds int64 extraTags []string clcRunnersClient clusteragent.CLCRunnerClientInterface advancedDispatching bool @@ -42,7 +43,12 @@ func newDispatcher(tagger tagger.Component) *dispatcher { store: newClusterStore(), } d.nodeExpirationSeconds = pkgconfigsetup.Datadog().GetInt64("cluster_checks.node_expiration_timeout") - d.extendedDanglingAttemptThreshold = pkgconfigsetup.Datadog().GetInt("cluster_checks.extended_dangling_attempt_threshold") + d.unscheduledCheckThresholdSeconds = pkgconfigsetup.Datadog().GetInt64("cluster_checks.unscheduled_check_threshold") + + if d.unscheduledCheckThresholdSeconds < d.nodeExpirationSeconds { + log.Warnf("The unscheduled_check_threshold value should be larger than node_expiration_timeout, setting it to the same value") + d.unscheduledCheckThresholdSeconds = d.nodeExpirationSeconds + } // Attach the cluster agent's global tags to all dispatched checks // as defined in the tagger's workloadmeta collector @@ -202,6 +208,22 @@ func (d *dispatcher) reset() { d.store.reset() } +// scanExtendedDanglingConfigs scans the store for extended dangling configs +// The attemptLimit is the number of times a reschedule is attempted before +// considering a config as extended dangling. +func (d *dispatcher) scanUnscheduledChecks() { + d.store.Lock() + defer d.store.Unlock() + + for _, c := range d.store.danglingConfigs { + if !c.unscheduledCheck && c.isStuckScheduling(d.unscheduledCheckThresholdSeconds) { + log.Warnf("Detected unscheduled check config. Name:%s, Source:%s", c.config.Name, c.config.Source) + c.unscheduledCheck = true + unscheduledCheck.Inc(le.JoinLeaderValue, c.config.Name, c.config.Source) + } + } +} + // run is the main management goroutine for the dispatcher func (d *dispatcher) run(ctx context.Context) { d.store.Lock() @@ -217,6 +239,9 @@ func (d *dispatcher) run(ctx context.Context) { rebalanceTicker := time.NewTicker(d.rebalancingPeriod) defer rebalanceTicker.Stop() + unscheduledCheckTicker := time.NewTicker(time.Duration(d.unscheduledCheckThresholdSeconds) * time.Second) + defer unscheduledCheckTicker.Stop() + for { select { case <-ctx.Done(): @@ -235,9 +260,9 @@ func (d *dispatcher) run(ctx context.Context) { d.deleteDangling(scheduledConfigIDs) d.store.Unlock() } - + case <-unscheduledCheckTicker.C: // Check for configs that have been dangling longer than expected - scanExtendedDanglingConfigs(d.store, d.extendedDanglingAttemptThreshold) + d.scanUnscheduledChecks() case <-rebalanceTicker.C: if d.advancedDispatching { d.rebalance(false) diff --git a/pkg/clusteragent/clusterchecks/helpers.go b/pkg/clusteragent/clusterchecks/helpers.go index 9e99e578ea8f4..fca74d44f6374 100644 --- a/pkg/clusteragent/clusterchecks/helpers.go +++ b/pkg/clusteragent/clusterchecks/helpers.go @@ -13,8 +13,6 @@ import ( "github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration" "github.com/DataDog/datadog-agent/pkg/clusteragent/clusterchecks/types" - le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics" - "github.com/DataDog/datadog-agent/pkg/util/log" ) var ( @@ -85,20 +83,3 @@ func orderedKeys(m map[string]int) []string { sort.Strings(keys) return keys } - -// scanExtendedDanglingConfigs scans the store for extended dangling configs -// The attemptLimit is the number of times a reschedule is attempted before -// considering a config as extended dangling. -func scanExtendedDanglingConfigs(store *clusterStore, attemptLimit int) { - store.Lock() - defer store.Unlock() - - for _, c := range store.danglingConfigs { - c.rescheduleAttempts++ - if !c.detectedExtendedDangling && c.isStuckScheduling(attemptLimit) { - log.Warnf("Detected extended dangling config. Name:%s, Source:%s", c.config.Name, c.config.Source) - c.detectedExtendedDangling = true - extendedDanglingConfigs.Inc(le.JoinLeaderValue, c.config.Name, c.config.Source) - } - } -} diff --git a/pkg/clusteragent/clusterchecks/helpers_test.go b/pkg/clusteragent/clusterchecks/helpers_test.go index dd3ce50989eec..f28cbe317b086 100644 --- a/pkg/clusteragent/clusterchecks/helpers_test.go +++ b/pkg/clusteragent/clusterchecks/helpers_test.go @@ -10,9 +10,6 @@ package clusterchecks import ( "testing" - "github.com/stretchr/testify/assert" - - "github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration" "github.com/DataDog/datadog-agent/pkg/clusteragent/clusterchecks/types" ) @@ -154,27 +151,3 @@ func Test_calculateBusyness(t *testing.T) { }) } } - -func Test_scanExtendedDanglingConfigs(t *testing.T) { - attemptLimit := 3 - store := newClusterStore() - c1 := createDanglingConfig(integration.Config{ - Name: "config1", - Source: "source1", - }) - store.danglingConfigs[c1.config.Digest()] = c1 - - for i := 0; i < attemptLimit; i++ { - scanExtendedDanglingConfigs(store, attemptLimit) - } - - assert.Equal(t, attemptLimit, c1.rescheduleAttempts) - assert.False(t, c1.detectedExtendedDangling) - assert.False(t, c1.isStuckScheduling(attemptLimit)) - - scanExtendedDanglingConfigs(store, attemptLimit) - - assert.Equal(t, attemptLimit+1, c1.rescheduleAttempts) - assert.True(t, c1.detectedExtendedDangling) - assert.True(t, c1.isStuckScheduling(attemptLimit)) -} diff --git a/pkg/clusteragent/clusterchecks/metrics.go b/pkg/clusteragent/clusterchecks/metrics.go index 6d4cccc4e016e..ac44e5290257e 100644 --- a/pkg/clusteragent/clusterchecks/metrics.go +++ b/pkg/clusteragent/clusterchecks/metrics.go @@ -19,8 +19,8 @@ var ( danglingConfigs = telemetry.NewGaugeWithOpts("cluster_checks", "configs_dangling", []string{le.JoinLeaderLabel}, "Number of check configurations not dispatched.", telemetry.Options{NoDoubleUnderscoreSep: true}) - extendedDanglingConfigs = telemetry.NewGaugeWithOpts("cluster_checks", "configs_extended_dangling", - []string{le.JoinLeaderLabel, "config_name", "config_source"}, "Number of check configurations not dispatched, for extended number of scheduling attempts.", + unscheduledCheck = telemetry.NewGaugeWithOpts("cluster_checks", "unscheduled_check", + []string{le.JoinLeaderLabel, "config_name", "config_source"}, "Number of check configurations not scheduled.", telemetry.Options{NoDoubleUnderscoreSep: true}) dispatchedConfigs = telemetry.NewGaugeWithOpts("cluster_checks", "configs_dispatched", []string{"node", le.JoinLeaderLabel}, "Number of check configurations dispatched, by node.", diff --git a/pkg/clusteragent/clusterchecks/stats.go b/pkg/clusteragent/clusterchecks/stats.go index 3a5de893187f4..a2decaaedf8e6 100644 --- a/pkg/clusteragent/clusterchecks/stats.go +++ b/pkg/clusteragent/clusterchecks/stats.go @@ -60,19 +60,19 @@ func (d *dispatcher) getStats() *types.Stats { for _, m := range d.store.digestToConfig { checkNames[m.Name] = struct{}{} } - extendedDanglingConfigs := 0 + unscheduledChecks := 0 for _, c := range d.store.danglingConfigs { - if c.detectedExtendedDangling { - extendedDanglingConfigs++ + if c.unscheduledCheck { + unscheduledChecks++ } } return &types.Stats{ - Active: d.store.active, - NodeCount: len(d.store.nodes), - ActiveConfigs: len(d.store.digestToNode), - DanglingConfigs: len(d.store.danglingConfigs), - ExtendedDanglingConfigs: extendedDanglingConfigs, - TotalConfigs: len(d.store.digestToConfig), - CheckNames: checkNames, + Active: d.store.active, + NodeCount: len(d.store.nodes), + ActiveConfigs: len(d.store.digestToNode), + DanglingConfigs: len(d.store.danglingConfigs), + UnscheduledChecks: unscheduledChecks, + TotalConfigs: len(d.store.digestToConfig), + CheckNames: checkNames, } } diff --git a/pkg/clusteragent/clusterchecks/types/types.go b/pkg/clusteragent/clusterchecks/types/types.go index 85d7991210b44..55782768e3cda 100644 --- a/pkg/clusteragent/clusterchecks/types/types.go +++ b/pkg/clusteragent/clusterchecks/types/types.go @@ -74,14 +74,14 @@ type Stats struct { LeaderIP string // Leading - Leader bool - Active bool - NodeCount int - ActiveConfigs int - DanglingConfigs int - ExtendedDanglingConfigs int - TotalConfigs int - CheckNames map[string]struct{} + Leader bool + Active bool + NodeCount int + ActiveConfigs int + DanglingConfigs int + UnscheduledChecks int + TotalConfigs int + CheckNames map[string]struct{} } // LeaderIPCallback describes the leader-election method we diff --git a/pkg/config/setup/config.go b/pkg/config/setup/config.go index f0106212f44d0..6d8308fa3d828 100644 --- a/pkg/config/setup/config.go +++ b/pkg/config/setup/config.go @@ -708,9 +708,9 @@ func InitConfig(config pkgconfigmodel.Setup) { // Cluster check Autodiscovery config.BindEnvAndSetDefault("cluster_checks.support_hybrid_ignore_ad_tags", false) // TODO(CINT)(Agent 7.53+) Remove this flag when hybrid ignore_ad_tags is fully deprecated config.BindEnvAndSetDefault("cluster_checks.enabled", false) - config.BindEnvAndSetDefault("cluster_checks.node_expiration_timeout", 30) // value in seconds - config.BindEnvAndSetDefault("cluster_checks.warmup_duration", 30) // value in seconds - config.BindEnvAndSetDefault("cluster_checks.extended_dangling_attempt_threshold", 3) + config.BindEnvAndSetDefault("cluster_checks.node_expiration_timeout", 30) // value in seconds + config.BindEnvAndSetDefault("cluster_checks.warmup_duration", 30) // value in seconds + config.BindEnvAndSetDefault("cluster_checks.unscheduled_check_threshold", 60) // value in seconds config.BindEnvAndSetDefault("cluster_checks.cluster_tag_name", "cluster_name") config.BindEnvAndSetDefault("cluster_checks.extra_tags", []string{}) config.BindEnvAndSetDefault("cluster_checks.advanced_dispatching_enabled", false) From 785f509b1cb5cf60dae277c70133683043b016ec Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Thu, 26 Dec 2024 21:06:39 +0000 Subject: [PATCH 08/10] Leverage more time methods for comparing expected schedule time --- pkg/clusteragent/clusterchecks/dangling_config.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/clusteragent/clusterchecks/dangling_config.go b/pkg/clusteragent/clusterchecks/dangling_config.go index 54d6976401672..a52289bc672f3 100644 --- a/pkg/clusteragent/clusterchecks/dangling_config.go +++ b/pkg/clusteragent/clusterchecks/dangling_config.go @@ -32,5 +32,6 @@ func createDanglingConfig(config integration.Config) *danglingConfigWrapper { // isStuckScheduling returns true if the config has been in the store // for longer than the unscheduledCheckThresholdSeconds func (c *danglingConfigWrapper) isStuckScheduling(unscheduledCheckThresholdSeconds int64) bool { - return time.Since(c.timeCreated).Seconds() > float64(unscheduledCheckThresholdSeconds) + expectCheckIsScheduledTime := c.timeCreated.Add(time.Duration(unscheduledCheckThresholdSeconds) * time.Second) + return time.Now().After(expectCheckIsScheduledTime) } From 3a1c7afed9ce61f2c553bbf8a68171b4eb3825a1 Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Thu, 26 Dec 2024 21:24:05 +0000 Subject: [PATCH 09/10] Update outdate method comment --- pkg/clusteragent/clusterchecks/dispatcher_main.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/clusteragent/clusterchecks/dispatcher_main.go b/pkg/clusteragent/clusterchecks/dispatcher_main.go index e607ca5934143..1e82de9a4fc03 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_main.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_main.go @@ -208,9 +208,8 @@ func (d *dispatcher) reset() { d.store.reset() } -// scanExtendedDanglingConfigs scans the store for extended dangling configs -// The attemptLimit is the number of times a reschedule is attempted before -// considering a config as extended dangling. +// scanUnscheduledChecks scans the store for configs that have been +// unscheduled for longer than the unscheduledCheckThresholdSeconds func (d *dispatcher) scanUnscheduledChecks() { d.store.Lock() defer d.store.Unlock() From ddcdf785c51e9b59f1430c3e6bba514b6f36e894 Mon Sep 17 00:00:00 2001 From: Gabriel Dos Santos Date: Fri, 27 Dec 2024 15:47:29 +0000 Subject: [PATCH 10/10] Unit test for unscheduled check --- pkg/clusteragent/clusterchecks/dispatcher_test.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pkg/clusteragent/clusterchecks/dispatcher_test.go b/pkg/clusteragent/clusterchecks/dispatcher_test.go index 4aac1fd6124fe..f00141c8ea063 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_test.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_test.go @@ -10,6 +10,7 @@ package clusterchecks import ( "sort" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -402,6 +403,9 @@ func TestDispatchFourConfigsTwoNodes(t *testing.T) { } func TestDanglingConfig(t *testing.T) { + mockConfig := configmock.New(t) + mockConfig.SetWithoutSource("cluster_checks.unscheduled_check_threshold", 1) + mockConfig.SetWithoutSource("cluster_checks.node_expiration_timeout", 1) fakeTagger := mock.SetupFakeTagger(t) dispatcher := newDispatcher(fakeTagger) config := integration.Config{ @@ -419,6 +423,13 @@ func TestDanglingConfig(t *testing.T) { // shouldDispatchDangling is still false because no node is available assert.False(t, dispatcher.shouldDispatchDangling()) + // force config to dangle long enough to be classified as unscheduled check + assert.False(t, dispatcher.store.danglingConfigs[config.Digest()].unscheduledCheck) + require.Eventually(t, func() bool { + dispatcher.scanUnscheduledChecks() + return dispatcher.store.danglingConfigs[config.Digest()].unscheduledCheck + }, 2*time.Second, 250*time.Millisecond) + // register a node, shouldDispatchDangling will become true dispatcher.processNodeStatus("nodeA", "10.0.0.1", types.NodeStatus{}) assert.True(t, dispatcher.shouldDispatchDangling())