From b710e3deececa4b6c3d0d7b6021f7d214b04c028 Mon Sep 17 00:00:00 2001 From: Jennifer Chen <32009013+jennchenn@users.noreply.github.com> Date: Wed, 29 Nov 2023 10:51:08 -0500 Subject: [PATCH] [clusteragent/clusterchecks] Add isolate check command to datadog cluster agent (#21049) * Add isolate check command to dca * Clean up isolate command return object * Separate isolate logic into new file * Add changelog * Remove unused log param * fixup! Remove unused log param * Update release note * Add container integrations as codeowner for cli/subcommands/clusterchecks * Simplify for loop logic return early * Remove error return from IsolateCheck call * Rename IsolateResponse.Result to IsIsolated for consistency * Remove reference to checkID in command use text * Add tests for isolation error states * Add more documentation for IsolateCheck logic * fixup! Add more documentation for IsolateCheck logic * fixup! Add container integrations as codeowner for cli/subcommands/clusterchecks * Trigger gitlab sync --- .github/CODEOWNERS | 1 + cmd/cluster-agent/api/v1/clusterchecks.go | 21 ++ pkg/cli/subcommands/clusterchecks/command.go | 61 +++++ .../subcommands/clusterchecks/command_test.go | 18 ++ .../clusterchecks/checks_distribution.go | 14 +- .../clusterchecks/checks_distribution_test.go | 2 +- .../clusterchecks/dispatcher_isolate.go | 62 +++++ .../clusterchecks/dispatcher_isolate_test.go | 225 ++++++++++++++++++ .../clusterchecks/dispatcher_rebalance.go | 1 + pkg/clusteragent/clusterchecks/handler_api.go | 7 + pkg/clusteragent/clusterchecks/types/types.go | 8 + ...and-to-clusterchecks-53ece6c3fbf2720c.yaml | 7 + 12 files changed, 422 insertions(+), 5 deletions(-) create mode 100644 pkg/clusteragent/clusterchecks/dispatcher_isolate.go create mode 100644 pkg/clusteragent/clusterchecks/dispatcher_isolate_test.go create mode 100644 releasenotes-dca/notes/add-isolate-command-to-clusterchecks-53ece6c3fbf2720c.yaml diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 1156c14c0b4ab..b1a54b4d1567d 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -234,6 +234,7 @@ /pkg/aggregator/ @DataDog/agent-metrics-logs /pkg/collector/ @DataDog/agent-metrics-logs /pkg/cli/ @DataDog/agent-shared-components +/pkg/cli/subcommands/clusterchecks @DataDog/container-integrations /pkg/dogstatsd/ @DataDog/agent-metrics-logs /pkg/errors/ @DataDog/agent-shared-components /pkg/forwarder/ @DataDog/agent-metrics-logs @DataDog/agent-shared-components diff --git a/cmd/cluster-agent/api/v1/clusterchecks.go b/cmd/cluster-agent/api/v1/clusterchecks.go index 9dae02174badd..b26a1387cf119 100644 --- a/cmd/cluster-agent/api/v1/clusterchecks.go +++ b/cmd/cluster-agent/api/v1/clusterchecks.go @@ -29,6 +29,7 @@ func installClusterCheckEndpoints(r *mux.Router, sc clusteragent.ServerContext) r.HandleFunc("/clusterchecks/configs/{identifier}", api.WithTelemetryWrapper("getCheckConfigs", getCheckConfigs(sc))).Methods("GET") r.HandleFunc("/clusterchecks/rebalance", api.WithTelemetryWrapper("postRebalanceChecks", postRebalanceChecks(sc))).Methods("POST") r.HandleFunc("/clusterchecks", api.WithTelemetryWrapper("getState", getState(sc))).Methods("GET") + r.HandleFunc("/clusterchecks/isolate/check/{identifier}", api.WithTelemetryWrapper("postIsolateCheck", postIsolateCheck(sc))).Methods("POST") } // RebalancePostPayload struct is for the JSON messages received from a client POST request @@ -121,6 +122,26 @@ func postRebalanceChecks(sc clusteragent.ServerContext) func(w http.ResponseWrit } } +// postIsolateCheck requests that a specified check be isolated in a runner +func postIsolateCheck(sc clusteragent.ServerContext) func(w http.ResponseWriter, r *http.Request) { + if sc.ClusterCheckHandler == nil { + return clusterChecksDisabledHandler + } + + return func(w http.ResponseWriter, r *http.Request) { + if sc.ClusterCheckHandler.RejectOrForwardLeaderQuery(w, r) { + return + } + + vars := mux.Vars(r) + isolateCheckID := vars["identifier"] + + response := sc.ClusterCheckHandler.IsolateCheck(isolateCheckID) + + writeJSONResponse(w, response) + } +} + // getState is used by the clustercheck config func getState(sc clusteragent.ServerContext) func(w http.ResponseWriter, r *http.Request) { if sc.ClusterCheckHandler == nil { diff --git a/pkg/cli/subcommands/clusterchecks/command.go b/pkg/cli/subcommands/clusterchecks/command.go index 21bedb0188c3b..aeebed8057196 100644 --- a/pkg/cli/subcommands/clusterchecks/command.go +++ b/pkg/cli/subcommands/clusterchecks/command.go @@ -40,8 +40,11 @@ type GlobalParams struct { } type cliParams struct { + GlobalParams + checkName string force bool + checkID string } // MakeCommand returns a `clusterchecks` command to be used by cluster-agent @@ -85,6 +88,23 @@ func MakeCommand(globalParamsGetter func() GlobalParams) *cobra.Command { cmd.AddCommand(rebalanceCmd) + isolateCmd := &cobra.Command{ + Use: "isolate", + Short: "Isolates a single check in the cluster runner", + Long: ``, + RunE: func(cmd *cobra.Command, args []string) error { + globalParams := globalParamsGetter() + + return fxutil.OneShot(isolate, + fx.Supply(cliParams), + fx.Supply(bundleParams(globalParams)), + core.Bundle, + ) + }, + } + isolateCmd.Flags().StringVarP(&cliParams.checkID, "checkID", "", "", "the check ID to isolate") + cmd.AddCommand(isolateCmd) + return cmd } @@ -155,3 +175,44 @@ func rebalance(_ log.Component, _ config.Component, cliParams *cliParams) error return nil } + +func isolate(_ log.Component, _ config.Component, cliParams *cliParams) error { + c := util.GetClient(false) // FIX: get certificates right then make this true + if cliParams.checkID == "" { + return fmt.Errorf("checkID must be specified") + } + urlstr := fmt.Sprintf("https://localhost:%v/api/v1/clusterchecks/isolate/check/%s", pkgconfig.Datadog.GetInt("cluster_agent.cmd_port"), cliParams.checkID) + + // Set session token + err := util.SetAuthToken() + if err != nil { + return err + } + + r, err := util.DoPost(c, urlstr, "application/json", bytes.NewBuffer([]byte{})) + if err != nil { + var errMap = make(map[string]string) + json.Unmarshal(r, &errMap) //nolint:errcheck + // If the error has been marshalled into a json object, check it and return it properly + if e, found := errMap["error"]; found { + err = fmt.Errorf(e) + } + + fmt.Printf(` + Could not reach agent: %v + Make sure the agent is running before requesting to isolate a cluster check. + Contact support if you continue having issues.`, err) + + return err + } + + var response types.IsolateResponse + + json.Unmarshal(r, &response) //nolint:errcheck + if response.IsIsolated { + fmt.Printf("Check %s isolated successfully on node %s\n", response.CheckID, response.CheckNode) + } else { + fmt.Printf("Check %s could not be isolated: %s\n", response.CheckID, response.Reason) + } + return nil +} diff --git a/pkg/cli/subcommands/clusterchecks/command_test.go b/pkg/cli/subcommands/clusterchecks/command_test.go index 61736ac285ade..388005f7bcd7f 100644 --- a/pkg/cli/subcommands/clusterchecks/command_test.go +++ b/pkg/cli/subcommands/clusterchecks/command_test.go @@ -10,8 +10,10 @@ package clusterchecks import ( "testing" + "github.com/DataDog/datadog-agent/comp/core" "github.com/DataDog/datadog-agent/pkg/util/fxutil" "github.com/spf13/cobra" + "github.com/stretchr/testify/require" ) func TestCommand(t *testing.T) { @@ -41,3 +43,19 @@ func TestRebalance(t *testing.T) { rebalance, func() {}) } + +func TestIsolate(t *testing.T) { + commands := []*cobra.Command{ + MakeCommand(func() GlobalParams { + return GlobalParams{} + }), + } + + fxutil.TestOneShotSubcommand(t, + commands, + []string{"clusterchecks", "isolate", "--checkID", "checkID"}, + isolate, + func(cliParams *cliParams, coreParams core.BundleParams) { + require.Equal(t, "checkID", cliParams.checkID) + }) +} diff --git a/pkg/clusteragent/clusterchecks/checks_distribution.go b/pkg/clusteragent/clusterchecks/checks_distribution.go index cba283511a89c..989b4e7e67a97 100644 --- a/pkg/clusteragent/clusterchecks/checks_distribution.go +++ b/pkg/clusteragent/clusterchecks/checks_distribution.go @@ -59,13 +59,19 @@ func newChecksDistribution(workersPerRunner map[string]int) checksDistribution { // leastBusyRunner returns the runner with the lowest utilization. If there are // several options, it gives preference to preferredRunner. If preferredRunner // is not among the runners with the lowest utilization, it gives precedence to -// the runner with the lowest number of checks deployed. -func (distribution *checksDistribution) leastBusyRunner(preferredRunner string) string { +// the runner with the lowest number of checks deployed. excludeRunner can be set +// to avoid assigning a check to a specific runner. +func (distribution *checksDistribution) leastBusyRunner(preferredRunner string, excludeRunner string) string { leastBusyRunner := "" minUtilization := 0.0 numChecksLeastBusyRunner := 0 for runnerName, runnerStatus := range distribution.Runners { + // Allow exclusion of a runner from selection + if runnerName == excludeRunner { + continue + } + runnerUtilization := runnerStatus.utilization() runnerNumChecks := runnerStatus.NumChecks @@ -84,8 +90,8 @@ func (distribution *checksDistribution) leastBusyRunner(preferredRunner string) return leastBusyRunner } -func (distribution *checksDistribution) addToLeastBusy(checkID string, workersNeeded float64, preferredRunner string) { - leastBusy := distribution.leastBusyRunner(preferredRunner) +func (distribution *checksDistribution) addToLeastBusy(checkID string, workersNeeded float64, preferredRunner string, excludeRunner string) { + leastBusy := distribution.leastBusyRunner(preferredRunner, excludeRunner) if leastBusy == "" { return } diff --git a/pkg/clusteragent/clusterchecks/checks_distribution_test.go b/pkg/clusteragent/clusterchecks/checks_distribution_test.go index 9c2d0f58c3e24..0972ff99fbe05 100644 --- a/pkg/clusteragent/clusterchecks/checks_distribution_test.go +++ b/pkg/clusteragent/clusterchecks/checks_distribution_test.go @@ -137,7 +137,7 @@ func TestAddToLeastBusy(t *testing.T) { distribution.addCheck(checkID, checkStatus.WorkersNeeded, checkStatus.Runner) } - distribution.addToLeastBusy("newCheck", 10, test.preferredRunner) + distribution.addToLeastBusy("newCheck", 10, test.preferredRunner, "") assert.Equal(t, test.expectedPlacement, distribution.runnerForCheck("newCheck")) }) diff --git a/pkg/clusteragent/clusterchecks/dispatcher_isolate.go b/pkg/clusteragent/clusterchecks/dispatcher_isolate.go new file mode 100644 index 0000000000000..ad54bc5a27489 --- /dev/null +++ b/pkg/clusteragent/clusterchecks/dispatcher_isolate.go @@ -0,0 +1,62 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build clusterchecks + +package clusterchecks + +import "github.com/DataDog/datadog-agent/pkg/clusteragent/clusterchecks/types" + +func (d *dispatcher) isolateCheck(isolateCheckID string) types.IsolateResponse { + // Update stats prior to starting isolate to ensure all checks are accounted for + d.updateRunnersStats() + currentDistribution := d.currentDistribution() + + // If there is only one runner, we cannot isolate the check + if len(currentDistribution.runnerWorkers()) == 1 { + return types.IsolateResponse{ + CheckID: isolateCheckID, + CheckNode: "", + IsIsolated: false, + Reason: "No other runners available", + } + } + + isolateNode := currentDistribution.runnerForCheck(isolateCheckID) + if isolateNode == "" { + return types.IsolateResponse{ + CheckID: isolateCheckID, + CheckNode: "", + IsIsolated: false, + Reason: "Unable to find check", + } + } + + proposedDistribution := newChecksDistribution(currentDistribution.runnerWorkers()) + + for _, checkID := range currentDistribution.checksSortedByWorkersNeeded() { + if checkID == isolateCheckID { + // Keep the check to be isolated on its current runner + continue + } + + workersNeededForCheck := currentDistribution.workersNeededForCheck(checkID) + runnerForCheck := currentDistribution.runnerForCheck(checkID) + + proposedDistribution.addToLeastBusy( + checkID, + workersNeededForCheck, + runnerForCheck, + isolateNode, + ) + } + + d.applyDistribution(proposedDistribution, currentDistribution) + return types.IsolateResponse{ + CheckID: isolateCheckID, + CheckNode: isolateNode, + IsIsolated: true, + } +} diff --git a/pkg/clusteragent/clusterchecks/dispatcher_isolate_test.go b/pkg/clusteragent/clusterchecks/dispatcher_isolate_test.go new file mode 100644 index 0000000000000..b12e70c7edb46 --- /dev/null +++ b/pkg/clusteragent/clusterchecks/dispatcher_isolate_test.go @@ -0,0 +1,225 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build clusterchecks + +package clusterchecks + +import ( + "testing" + + "github.com/DataDog/datadog-agent/pkg/autodiscovery/integration" + "github.com/DataDog/datadog-agent/pkg/clusteragent/clusterchecks/types" + checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id" + "github.com/DataDog/datadog-agent/pkg/config" + "github.com/stretchr/testify/assert" +) + +func TestIsolateCheckSuccessful(t *testing.T) { + testDispatcher := newDispatcher() + testDispatcher.store.nodes["A"] = newNodeStore("A", "") + testDispatcher.store.nodes["A"].workers = config.DefaultNumWorkers + testDispatcher.store.nodes["B"] = newNodeStore("B", "") + testDispatcher.store.nodes["B"].workers = config.DefaultNumWorkers + + testDispatcher.store.nodes["A"].clcRunnerStats = map[string]types.CLCRunnerStats{ + "checkA0": { + AverageExecutionTime: 50, + MetricSamples: 10, + IsClusterCheck: true, + }, + "checkA1": { + AverageExecutionTime: 20, + MetricSamples: 10, + IsClusterCheck: true, + }, + "checkA2": { + AverageExecutionTime: 100, + MetricSamples: 10, + IsClusterCheck: true, + }, + } + + testDispatcher.store.nodes["B"].clcRunnerStats = map[string]types.CLCRunnerStats{ + "checkB0": { + AverageExecutionTime: 50, + MetricSamples: 10, + IsClusterCheck: true, + }, + "checkB1": { + AverageExecutionTime: 20, + MetricSamples: 10, + IsClusterCheck: true, + }, + "checkB2": { + AverageExecutionTime: 100, + MetricSamples: 10, + IsClusterCheck: true, + }, + } + testDispatcher.store.idToDigest = map[checkid.ID]string{ + "checkA0": "digestA0", + "checkA1": "digestA1", + "checkA2": "digestA2", + "checkB0": "digestB0", + "checkB1": "digestB1", + "checkB2": "digestB2", + } + testDispatcher.store.digestToConfig = map[string]integration.Config{ + "digestA0": {}, + "digestA1": {}, + "digestA2": {}, + "digestB0": {}, + "digestB1": {}, + "digestB2": {}, + } + testDispatcher.store.digestToNode = map[string]string{ + "digestA0": "A", + "digestA1": "A", + "digestA2": "A", + "digestB0": "B", + "digestB1": "B", + "digestB2": "B", + } + + response := testDispatcher.isolateCheck("checkA2") + assert.EqualValues(t, response.CheckID, "checkA2") + assert.EqualValues(t, response.CheckNode, "A") + assert.True(t, response.IsIsolated) + assert.EqualValues(t, response.Reason, "") + assert.EqualValues(t, len(testDispatcher.store.nodes["A"].clcRunnerStats), 1) + assert.EqualValues(t, len(testDispatcher.store.nodes["B"].clcRunnerStats), 5) + _, containsIsolatedCheck := testDispatcher.store.nodes["A"].clcRunnerStats["checkA2"] + assert.True(t, containsIsolatedCheck) + + requireNotLocked(t, testDispatcher.store) +} + +func TestIsolateNonExistentCheckFails(t *testing.T) { + testDispatcher := newDispatcher() + testDispatcher.store.nodes["A"] = newNodeStore("A", "") + testDispatcher.store.nodes["A"].workers = config.DefaultNumWorkers + testDispatcher.store.nodes["B"] = newNodeStore("B", "") + testDispatcher.store.nodes["B"].workers = config.DefaultNumWorkers + + testDispatcher.store.nodes["A"].clcRunnerStats = map[string]types.CLCRunnerStats{ + "checkA0": { + AverageExecutionTime: 50, + MetricSamples: 10, + IsClusterCheck: true, + }, + "checkA1": { + AverageExecutionTime: 20, + MetricSamples: 10, + IsClusterCheck: true, + }, + "checkA2": { + AverageExecutionTime: 100, + MetricSamples: 10, + IsClusterCheck: true, + }, + } + + testDispatcher.store.nodes["B"].clcRunnerStats = map[string]types.CLCRunnerStats{ + "checkB0": { + AverageExecutionTime: 50, + MetricSamples: 10, + IsClusterCheck: true, + }, + "checkB1": { + AverageExecutionTime: 20, + MetricSamples: 10, + IsClusterCheck: true, + }, + "checkB2": { + AverageExecutionTime: 100, + MetricSamples: 10, + IsClusterCheck: true, + }, + } + testDispatcher.store.idToDigest = map[checkid.ID]string{ + "checkA0": "digestA0", + "checkA1": "digestA1", + "checkA2": "digestA2", + "checkB0": "digestB0", + "checkB1": "digestB1", + "checkB2": "digestB2", + } + testDispatcher.store.digestToConfig = map[string]integration.Config{ + "digestA0": {}, + "digestA1": {}, + "digestA2": {}, + "digestB0": {}, + "digestB1": {}, + "digestB2": {}, + } + testDispatcher.store.digestToNode = map[string]string{ + "digestA0": "A", + "digestA1": "A", + "digestA2": "A", + "digestB0": "B", + "digestB1": "B", + "digestB2": "B", + } + + response := testDispatcher.isolateCheck("checkA5") + assert.EqualValues(t, response.CheckID, "checkA5") + assert.EqualValues(t, response.CheckNode, "") + assert.False(t, response.IsIsolated) + assert.EqualValues(t, response.Reason, "Unable to find check") + assert.EqualValues(t, len(testDispatcher.store.nodes["A"].clcRunnerStats), 3) + assert.EqualValues(t, len(testDispatcher.store.nodes["B"].clcRunnerStats), 3) + + requireNotLocked(t, testDispatcher.store) +} + +func TestIsolateCheckOnlyOneRunnerFails(t *testing.T) { + testDispatcher := newDispatcher() + testDispatcher.store.nodes["A"] = newNodeStore("A", "") + testDispatcher.store.nodes["A"].workers = config.DefaultNumWorkers + + testDispatcher.store.nodes["A"].clcRunnerStats = map[string]types.CLCRunnerStats{ + "checkA0": { + AverageExecutionTime: 50, + MetricSamples: 10, + IsClusterCheck: true, + }, + "checkA1": { + AverageExecutionTime: 20, + MetricSamples: 10, + IsClusterCheck: true, + }, + "checkA2": { + AverageExecutionTime: 100, + MetricSamples: 10, + IsClusterCheck: true, + }, + } + + testDispatcher.store.idToDigest = map[checkid.ID]string{ + "checkA0": "digestA0", + "checkA1": "digestA1", + "checkA2": "digestA2", + } + testDispatcher.store.digestToConfig = map[string]integration.Config{ + "digestA0": {}, + "digestA1": {}, + "digestA2": {}, + } + testDispatcher.store.digestToNode = map[string]string{ + "digestA0": "A", + "digestA1": "A", + "digestA2": "A", + } + + response := testDispatcher.isolateCheck("checkA1") + assert.EqualValues(t, response.CheckID, "checkA1") + assert.EqualValues(t, response.CheckNode, "") + assert.False(t, response.IsIsolated) + assert.EqualValues(t, response.Reason, "No other runners available") + assert.EqualValues(t, len(testDispatcher.store.nodes["A"].clcRunnerStats), 3) + + requireNotLocked(t, testDispatcher.store) +} diff --git a/pkg/clusteragent/clusterchecks/dispatcher_rebalance.go b/pkg/clusteragent/clusterchecks/dispatcher_rebalance.go index c044efdfa31d4..d1e28bce5fe83 100644 --- a/pkg/clusteragent/clusterchecks/dispatcher_rebalance.go +++ b/pkg/clusteragent/clusterchecks/dispatcher_rebalance.go @@ -315,6 +315,7 @@ func (d *dispatcher) rebalanceUsingUtilization(force bool) []types.RebalanceResp checkID, currentChecksDistribution.workersNeededForCheck(checkID), currentChecksDistribution.runnerForCheck(checkID), + "", ) } diff --git a/pkg/clusteragent/clusterchecks/handler_api.go b/pkg/clusteragent/clusterchecks/handler_api.go index c13ddd0d69807..71aee7c5838c8 100644 --- a/pkg/clusteragent/clusterchecks/handler_api.go +++ b/pkg/clusteragent/clusterchecks/handler_api.go @@ -119,3 +119,10 @@ func (h *Handler) RebalanceClusterChecks(force bool) ([]types.RebalanceResponse, return response, nil } + +// IsolateCheck triggers an attempt to isolate a check in a runner. Other checks +// will be redistributed to other runners using the existing rebalancing logic. +func (h *Handler) IsolateCheck(isolateCheckID string) types.IsolateResponse { + response := h.dispatcher.isolateCheck(isolateCheckID) + return response +} diff --git a/pkg/clusteragent/clusterchecks/types/types.go b/pkg/clusteragent/clusterchecks/types/types.go index 818597bb1541c..2ce00e8937118 100644 --- a/pkg/clusteragent/clusterchecks/types/types.go +++ b/pkg/clusteragent/clusterchecks/types/types.go @@ -39,6 +39,14 @@ type RebalanceResponse struct { DestDiff int `json:"dest_diff"` } +// IsolateResponse holds the DCA response for an isolate request +type IsolateResponse struct { + CheckID string `json:"check_id"` + CheckNode string `json:"check_node"` + IsIsolated bool `json:"is_isolated"` + Reason string `json:"reason"` +} + // ConfigResponse holds the DCA response for a config query type ConfigResponse struct { LastChange int64 `json:"last_change"` diff --git a/releasenotes-dca/notes/add-isolate-command-to-clusterchecks-53ece6c3fbf2720c.yaml b/releasenotes-dca/notes/add-isolate-command-to-clusterchecks-53ece6c3fbf2720c.yaml new file mode 100644 index 0000000000000..cc22686bdd20d --- /dev/null +++ b/releasenotes-dca/notes/add-isolate-command-to-clusterchecks-53ece6c3fbf2720c.yaml @@ -0,0 +1,7 @@ +--- +features: + - | + Add isolate command to clusterchecks to make it easier to pinpoint + a check that that is causing high CPU/memory usage. Command can be + run in the cluster agent with: + `datadog-cluster-agent clusterchecks isolate --checkID=`