diff --git a/pkg/clusteragent/clusterchecks/dangling_config.go b/pkg/clusteragent/clusterchecks/dangling_config.go new file mode 100644 index 0000000000000..a52289bc672f3 --- /dev/null +++ b/pkg/clusteragent/clusterchecks/dangling_config.go @@ -0,0 +1,37 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build clusterchecks + +package clusterchecks + +import ( + "time" + + "github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration" +) + +type danglingConfigWrapper struct { + config integration.Config + timeCreated time.Time + unscheduledCheck bool +} + +// createDanglingConfig creates a new danglingConfigWrapper +// This is used to keep track of the lifecycle of a dangling config +func createDanglingConfig(config integration.Config) *danglingConfigWrapper { + return &danglingConfigWrapper{ + config: config, + timeCreated: time.Now(), + unscheduledCheck: false, + } +} + +// isStuckScheduling returns true if the config has been in the store +// for longer than the unscheduledCheckThresholdSeconds +func (c *danglingConfigWrapper) isStuckScheduling(unscheduledCheckThresholdSeconds int64) bool { + expectCheckIsScheduledTime := c.timeCreated.Add(time.Duration(unscheduledCheckThresholdSeconds) * time.Second) + return time.Now().After(expectCheckIsScheduledTime) +} diff --git a/pkg/clusteragent/clusterchecks/dispatcher_configs.go b/pkg/clusteragent/clusterchecks/dispatcher_configs.go index 8d9dd4f7db0b5..f6769461090e5 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_configs.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_configs.go @@ -28,7 +28,7 @@ func (d *dispatcher) getState() (types.StateResponse, error) { response := types.StateResponse{ Warmup: !d.store.active, - Dangling: makeConfigArray(d.store.danglingConfigs), + Dangling: makeConfigArrayFromDangling(d.store.danglingConfigs), } for _, node := range d.store.nodes { n := types.StateNodeResponse{ @@ -41,7 +41,7 @@ func (d *dispatcher) getState() (types.StateResponse, error) { return response, nil } -func (d *dispatcher) addConfig(config integration.Config, targetNodeName string) { +func (d *dispatcher) addConfig(config integration.Config, targetNodeName string) bool { d.store.Lock() defer d.store.Unlock() @@ -59,9 +59,12 @@ func (d *dispatcher) addConfig(config integration.Config, targetNodeName string) // No target node specified: store in danglingConfigs if targetNodeName == "" { - danglingConfigs.Inc(le.JoinLeaderValue) - d.store.danglingConfigs[digest] = config - return + // Only update if it's a new dangling config + if _, found := d.store.danglingConfigs[digest]; !found { + danglingConfigs.Inc(le.JoinLeaderValue) + d.store.danglingConfigs[digest] = createDanglingConfig(config) + } + return false } currentNode, foundCurrent := d.store.getNodeStore(d.store.digestToNode[digest]) @@ -82,6 +85,8 @@ func (d *dispatcher) addConfig(config integration.Config, targetNodeName string) currentNode.removeConfig(digest) currentNode.Unlock() } + + return true } func (d *dispatcher) removeConfig(digest string) { @@ -94,7 +99,7 @@ func (d *dispatcher) removeConfig(digest string) { delete(d.store.digestToNode, digest) delete(d.store.digestToConfig, digest) - delete(d.store.danglingConfigs, digest) + d.deleteDangling([]string{digest}) // This is a list because each instance in a config has its own check ID and // all of them need to be deleted. @@ -131,16 +136,28 @@ func (d *dispatcher) shouldDispatchDangling() bool { return len(d.store.danglingConfigs) > 0 && len(d.store.nodes) > 0 } -// retrieveAndClearDangling extracts dangling configs from the store -func (d *dispatcher) retrieveAndClearDangling() []integration.Config { - d.store.Lock() - defer d.store.Unlock() - configs := makeConfigArray(d.store.danglingConfigs) - d.store.clearDangling() - danglingConfigs.Set(0, le.JoinLeaderValue) +// retrieveDangling extracts dangling configs from the store +func (d *dispatcher) retrieveDangling() []integration.Config { + d.store.RLock() + defer d.store.RUnlock() + + configs := makeConfigArrayFromDangling(d.store.danglingConfigs) return configs } +// deleteDangling clears the dangling configs from the store +func (d *dispatcher) deleteDangling(ids []string) { + for _, id := range ids { + if c, found := d.store.danglingConfigs[id]; found { + delete(d.store.danglingConfigs, id) + danglingConfigs.Dec(le.JoinLeaderValue) + if c.unscheduledCheck { + unscheduledCheck.Dec(le.JoinLeaderValue, c.config.Name, c.config.Source) + } + } + } +} + // patchConfiguration transforms the configuration from AD into a config // ready to use by node agents. It does the following changes: // - empty the ADIdentifiers array, to avoid node-agents detecting them as templates diff --git a/pkg/clusteragent/clusterchecks/dispatcher_main.go b/pkg/clusteragent/clusterchecks/dispatcher_main.go index 6f1a894f4e354..1e82de9a4fc03 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_main.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_main.go @@ -20,20 +20,22 @@ import ( "github.com/DataDog/datadog-agent/pkg/util/clusteragent" "github.com/DataDog/datadog-agent/pkg/util/containers" "github.com/DataDog/datadog-agent/pkg/util/hostname" + le "github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection/metrics" "github.com/DataDog/datadog-agent/pkg/util/kubernetes/clustername" "github.com/DataDog/datadog-agent/pkg/util/log" ) // dispatcher holds the management logic for cluster-checks type dispatcher struct { - store *clusterStore - nodeExpirationSeconds int64 - extraTags []string - clcRunnersClient clusteragent.CLCRunnerClientInterface - advancedDispatching bool - excludedChecks map[string]struct{} - excludedChecksFromDispatching map[string]struct{} - rebalancingPeriod time.Duration + store *clusterStore + nodeExpirationSeconds int64 + unscheduledCheckThresholdSeconds int64 + extraTags []string + clcRunnersClient clusteragent.CLCRunnerClientInterface + advancedDispatching bool + excludedChecks map[string]struct{} + excludedChecksFromDispatching map[string]struct{} + rebalancingPeriod time.Duration } func newDispatcher(tagger tagger.Component) *dispatcher { @@ -41,6 +43,12 @@ func newDispatcher(tagger tagger.Component) *dispatcher { store: newClusterStore(), } d.nodeExpirationSeconds = pkgconfigsetup.Datadog().GetInt64("cluster_checks.node_expiration_timeout") + d.unscheduledCheckThresholdSeconds = pkgconfigsetup.Datadog().GetInt64("cluster_checks.unscheduled_check_threshold") + + if d.unscheduledCheckThresholdSeconds < d.nodeExpirationSeconds { + log.Warnf("The unscheduled_check_threshold value should be larger than node_expiration_timeout, setting it to the same value") + d.unscheduledCheckThresholdSeconds = d.nodeExpirationSeconds + } // Attach the cluster agent's global tags to all dispatched checks // as defined in the tagger's workloadmeta collector @@ -162,15 +170,19 @@ func (d *dispatcher) Unschedule(configs []integration.Config) { } // reschdule sends configurations to dispatching without checking or patching them as Schedule does. -func (d *dispatcher) reschedule(configs []integration.Config) { +func (d *dispatcher) reschedule(configs []integration.Config) []string { + addedConfigIDs := make([]string, 0, len(configs)) for _, c := range configs { log.Debugf("Rescheduling the check %s:%s", c.Name, c.Digest()) - d.add(c) + if d.add(c) { + addedConfigIDs = append(addedConfigIDs, c.Digest()) + } } + return addedConfigIDs } // add stores and delegates a given configuration -func (d *dispatcher) add(config integration.Config) { +func (d *dispatcher) add(config integration.Config) bool { target := d.getNodeToScheduleCheck() if target == "" { // If no node is found, store it in the danglingConfigs map for retrying later. @@ -179,7 +191,7 @@ func (d *dispatcher) add(config integration.Config) { log.Infof("Dispatching configuration %s:%s to node %s", config.Name, config.Digest(), target) } - d.addConfig(config, target) + return d.addConfig(config, target) } // remove deletes a given configuration @@ -196,6 +208,21 @@ func (d *dispatcher) reset() { d.store.reset() } +// scanUnscheduledChecks scans the store for configs that have been +// unscheduled for longer than the unscheduledCheckThresholdSeconds +func (d *dispatcher) scanUnscheduledChecks() { + d.store.Lock() + defer d.store.Unlock() + + for _, c := range d.store.danglingConfigs { + if !c.unscheduledCheck && c.isStuckScheduling(d.unscheduledCheckThresholdSeconds) { + log.Warnf("Detected unscheduled check config. Name:%s, Source:%s", c.config.Name, c.config.Source) + c.unscheduledCheck = true + unscheduledCheck.Inc(le.JoinLeaderValue, c.config.Name, c.config.Source) + } + } +} + // run is the main management goroutine for the dispatcher func (d *dispatcher) run(ctx context.Context) { d.store.Lock() @@ -211,6 +238,9 @@ func (d *dispatcher) run(ctx context.Context) { rebalanceTicker := time.NewTicker(d.rebalancingPeriod) defer rebalanceTicker.Stop() + unscheduledCheckTicker := time.NewTicker(time.Duration(d.unscheduledCheckThresholdSeconds) * time.Second) + defer unscheduledCheckTicker.Stop() + for { select { case <-ctx.Done(): @@ -223,9 +253,15 @@ func (d *dispatcher) run(ctx context.Context) { // Re-dispatch dangling configs if d.shouldDispatchDangling() { - danglingConfs := d.retrieveAndClearDangling() - d.reschedule(danglingConfs) + danglingConfigs := d.retrieveDangling() + scheduledConfigIDs := d.reschedule(danglingConfigs) + d.store.Lock() + d.deleteDangling(scheduledConfigIDs) + d.store.Unlock() } + case <-unscheduledCheckTicker.C: + // Check for configs that have been dangling longer than expected + d.scanUnscheduledChecks() case <-rebalanceTicker.C: if d.advancedDispatching { d.rebalance(false) diff --git a/pkg/clusteragent/clusterchecks/dispatcher_nodes.go b/pkg/clusteragent/clusterchecks/dispatcher_nodes.go index 2e7dd891ab912..4e203fd9f048d 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_nodes.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_nodes.go @@ -150,7 +150,7 @@ func (d *dispatcher) expireNodes() { for digest, config := range node.digestToConfig { delete(d.store.digestToNode, digest) log.Debugf("Adding %s:%s as a dangling Cluster Check config", config.Name, digest) - d.store.danglingConfigs[digest] = config + d.store.danglingConfigs[digest] = createDanglingConfig(config) danglingConfigs.Inc(le.JoinLeaderValue) // TODO: Use partial label matching when it becomes available: diff --git a/pkg/clusteragent/clusterchecks/dispatcher_test.go b/pkg/clusteragent/clusterchecks/dispatcher_test.go index ff348a025ed58..f00141c8ea063 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_test.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_test.go @@ -10,6 +10,7 @@ package clusterchecks import ( "sort" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -348,9 +349,10 @@ func TestRescheduleDanglingFromExpiredNodes(t *testing.T) { // Ensure we have 1 dangling to schedule, as new available node is registered assert.True(t, dispatcher.shouldDispatchDangling()) - configs := dispatcher.retrieveAndClearDangling() + configs := dispatcher.retrieveDangling() // Assert the check is scheduled - dispatcher.reschedule(configs) + scheduledIDs := dispatcher.reschedule(configs) + dispatcher.deleteDangling(scheduledIDs) danglingConfig, err := dispatcher.getAllConfigs() assert.NoError(t, err) assert.Equal(t, 1, len(danglingConfig)) @@ -401,6 +403,9 @@ func TestDispatchFourConfigsTwoNodes(t *testing.T) { } func TestDanglingConfig(t *testing.T) { + mockConfig := configmock.New(t) + mockConfig.SetWithoutSource("cluster_checks.unscheduled_check_threshold", 1) + mockConfig.SetWithoutSource("cluster_checks.node_expiration_timeout", 1) fakeTagger := mock.SetupFakeTagger(t) dispatcher := newDispatcher(fakeTagger) config := integration.Config{ @@ -418,12 +423,20 @@ func TestDanglingConfig(t *testing.T) { // shouldDispatchDangling is still false because no node is available assert.False(t, dispatcher.shouldDispatchDangling()) + // force config to dangle long enough to be classified as unscheduled check + assert.False(t, dispatcher.store.danglingConfigs[config.Digest()].unscheduledCheck) + require.Eventually(t, func() bool { + dispatcher.scanUnscheduledChecks() + return dispatcher.store.danglingConfigs[config.Digest()].unscheduledCheck + }, 2*time.Second, 250*time.Millisecond) + // register a node, shouldDispatchDangling will become true dispatcher.processNodeStatus("nodeA", "10.0.0.1", types.NodeStatus{}) assert.True(t, dispatcher.shouldDispatchDangling()) // get the danglings and make sure they are removed from the store - configs := dispatcher.retrieveAndClearDangling() + configs := dispatcher.retrieveDangling() + dispatcher.deleteDangling([]string{config.Digest()}) assert.Len(t, configs, 1) assert.Equal(t, 0, len(dispatcher.store.danglingConfigs)) } diff --git a/pkg/clusteragent/clusterchecks/helpers.go b/pkg/clusteragent/clusterchecks/helpers.go index 8ddddc7187215..fca74d44f6374 100644 --- a/pkg/clusteragent/clusterchecks/helpers.go +++ b/pkg/clusteragent/clusterchecks/helpers.go @@ -33,6 +33,17 @@ func makeConfigArray(configMap map[string]integration.Config) []integration.Conf return configSlice } +// makeConfigArrayFromDangling flattens a map of configs into a slice. Creating a new slice +// allows for thread-safe usage by other external, as long as the field values in +// the config objects are not modified. +func makeConfigArrayFromDangling(configMap map[string]*danglingConfigWrapper) []integration.Config { + configSlice := make([]integration.Config, 0, len(configMap)) + for _, c := range configMap { + configSlice = append(configSlice, c.config) + } + return configSlice +} + func timestampNowNano() int64 { return time.Now().UnixNano() } diff --git a/pkg/clusteragent/clusterchecks/metrics.go b/pkg/clusteragent/clusterchecks/metrics.go index 92540e44fabad..ac44e5290257e 100644 --- a/pkg/clusteragent/clusterchecks/metrics.go +++ b/pkg/clusteragent/clusterchecks/metrics.go @@ -19,6 +19,9 @@ var ( danglingConfigs = telemetry.NewGaugeWithOpts("cluster_checks", "configs_dangling", []string{le.JoinLeaderLabel}, "Number of check configurations not dispatched.", telemetry.Options{NoDoubleUnderscoreSep: true}) + unscheduledCheck = telemetry.NewGaugeWithOpts("cluster_checks", "unscheduled_check", + []string{le.JoinLeaderLabel, "config_name", "config_source"}, "Number of check configurations not scheduled.", + telemetry.Options{NoDoubleUnderscoreSep: true}) dispatchedConfigs = telemetry.NewGaugeWithOpts("cluster_checks", "configs_dispatched", []string{"node", le.JoinLeaderLabel}, "Number of check configurations dispatched, by node.", telemetry.Options{NoDoubleUnderscoreSep: true}) diff --git a/pkg/clusteragent/clusterchecks/stats.go b/pkg/clusteragent/clusterchecks/stats.go index 52d57243f60ca..a2decaaedf8e6 100644 --- a/pkg/clusteragent/clusterchecks/stats.go +++ b/pkg/clusteragent/clusterchecks/stats.go @@ -60,12 +60,19 @@ func (d *dispatcher) getStats() *types.Stats { for _, m := range d.store.digestToConfig { checkNames[m.Name] = struct{}{} } + unscheduledChecks := 0 + for _, c := range d.store.danglingConfigs { + if c.unscheduledCheck { + unscheduledChecks++ + } + } return &types.Stats{ - Active: d.store.active, - NodeCount: len(d.store.nodes), - ActiveConfigs: len(d.store.digestToNode), - DanglingConfigs: len(d.store.danglingConfigs), - TotalConfigs: len(d.store.digestToConfig), - CheckNames: checkNames, + Active: d.store.active, + NodeCount: len(d.store.nodes), + ActiveConfigs: len(d.store.digestToNode), + DanglingConfigs: len(d.store.danglingConfigs), + UnscheduledChecks: unscheduledChecks, + TotalConfigs: len(d.store.digestToConfig), + CheckNames: checkNames, } } diff --git a/pkg/clusteragent/clusterchecks/stores.go b/pkg/clusteragent/clusterchecks/stores.go index 285e024c0e074..da73ea7534cfa 100644 --- a/pkg/clusteragent/clusterchecks/stores.go +++ b/pkg/clusteragent/clusterchecks/stores.go @@ -27,7 +27,7 @@ type clusterStore struct { digestToConfig map[string]integration.Config // All configurations to dispatch digestToNode map[string]string // Node running a config nodes map[string]*nodeStore // All nodes known to the cluster-agent - danglingConfigs map[string]integration.Config // Configs we could not dispatch to any node + danglingConfigs map[string]*danglingConfigWrapper // Configs we could not dispatch to any node endpointsConfigs map[string]map[string]integration.Config // Endpoints configs to be consumed by node agents idToDigest map[checkid.ID]string // link check IDs to check configs } @@ -44,7 +44,7 @@ func (s *clusterStore) reset() { s.digestToConfig = make(map[string]integration.Config) s.digestToNode = make(map[string]string) s.nodes = make(map[string]*nodeStore) - s.danglingConfigs = make(map[string]integration.Config) + s.danglingConfigs = make(map[string]*danglingConfigWrapper) s.endpointsConfigs = make(map[string]map[string]integration.Config) s.idToDigest = make(map[checkid.ID]string) } @@ -74,7 +74,7 @@ func (s *clusterStore) getOrCreateNodeStore(nodeName, clientIP string) *nodeStor // clearDangling resets the danglingConfigs map to a new empty one func (s *clusterStore) clearDangling() { - s.danglingConfigs = make(map[string]integration.Config) + s.danglingConfigs = make(map[string]*danglingConfigWrapper) } // nodeStore holds the state store for one node. diff --git a/pkg/clusteragent/clusterchecks/types/types.go b/pkg/clusteragent/clusterchecks/types/types.go index 352709a556e41..55782768e3cda 100644 --- a/pkg/clusteragent/clusterchecks/types/types.go +++ b/pkg/clusteragent/clusterchecks/types/types.go @@ -74,13 +74,14 @@ type Stats struct { LeaderIP string // Leading - Leader bool - Active bool - NodeCount int - ActiveConfigs int - DanglingConfigs int - TotalConfigs int - CheckNames map[string]struct{} + Leader bool + Active bool + NodeCount int + ActiveConfigs int + DanglingConfigs int + UnscheduledChecks int + TotalConfigs int + CheckNames map[string]struct{} } // LeaderIPCallback describes the leader-election method we diff --git a/pkg/config/setup/config.go b/pkg/config/setup/config.go index 53ef845aa8fac..6d8308fa3d828 100644 --- a/pkg/config/setup/config.go +++ b/pkg/config/setup/config.go @@ -708,8 +708,9 @@ func InitConfig(config pkgconfigmodel.Setup) { // Cluster check Autodiscovery config.BindEnvAndSetDefault("cluster_checks.support_hybrid_ignore_ad_tags", false) // TODO(CINT)(Agent 7.53+) Remove this flag when hybrid ignore_ad_tags is fully deprecated config.BindEnvAndSetDefault("cluster_checks.enabled", false) - config.BindEnvAndSetDefault("cluster_checks.node_expiration_timeout", 30) // value in seconds - config.BindEnvAndSetDefault("cluster_checks.warmup_duration", 30) // value in seconds + config.BindEnvAndSetDefault("cluster_checks.node_expiration_timeout", 30) // value in seconds + config.BindEnvAndSetDefault("cluster_checks.warmup_duration", 30) // value in seconds + config.BindEnvAndSetDefault("cluster_checks.unscheduled_check_threshold", 60) // value in seconds config.BindEnvAndSetDefault("cluster_checks.cluster_tag_name", "cluster_name") config.BindEnvAndSetDefault("cluster_checks.extra_tags", []string{}) config.BindEnvAndSetDefault("cluster_checks.advanced_dispatching_enabled", false)