Skip to content

Commit

Permalink
[clusteragent/clusterchecks] Add isolate check command to datadog clu…
Browse files Browse the repository at this point in the history
…ster agent (#21049)

* Add isolate check command to dca

* Clean up isolate command return object

* Separate isolate logic into new file

* Add changelog

* Remove unused log param

* fixup! Remove unused log param

* Update release note

* Add container integrations as codeowner for cli/subcommands/clusterchecks

* Simplify for loop logic return early

* Remove error return from IsolateCheck call

* Rename IsolateResponse.Result to IsIsolated for consistency

* Remove reference to checkID in command use text

* Add tests for isolation error states

* Add more documentation for IsolateCheck logic

* fixup! Add more documentation for IsolateCheck logic

* fixup! Add container integrations as codeowner for cli/subcommands/clusterchecks

* Trigger gitlab sync
  • Loading branch information
jennchenn authored Nov 29, 2023
1 parent c8cfd6e commit b710e3d
Show file tree
Hide file tree
Showing 12 changed files with 422 additions and 5 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@
/pkg/aggregator/ @DataDog/agent-metrics-logs
/pkg/collector/ @DataDog/agent-metrics-logs
/pkg/cli/ @DataDog/agent-shared-components
/pkg/cli/subcommands/clusterchecks @DataDog/container-integrations
/pkg/dogstatsd/ @DataDog/agent-metrics-logs
/pkg/errors/ @DataDog/agent-shared-components
/pkg/forwarder/ @DataDog/agent-metrics-logs @DataDog/agent-shared-components
Expand Down
21 changes: 21 additions & 0 deletions cmd/cluster-agent/api/v1/clusterchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func installClusterCheckEndpoints(r *mux.Router, sc clusteragent.ServerContext)
r.HandleFunc("/clusterchecks/configs/{identifier}", api.WithTelemetryWrapper("getCheckConfigs", getCheckConfigs(sc))).Methods("GET")
r.HandleFunc("/clusterchecks/rebalance", api.WithTelemetryWrapper("postRebalanceChecks", postRebalanceChecks(sc))).Methods("POST")
r.HandleFunc("/clusterchecks", api.WithTelemetryWrapper("getState", getState(sc))).Methods("GET")
r.HandleFunc("/clusterchecks/isolate/check/{identifier}", api.WithTelemetryWrapper("postIsolateCheck", postIsolateCheck(sc))).Methods("POST")
}

// RebalancePostPayload struct is for the JSON messages received from a client POST request
Expand Down Expand Up @@ -121,6 +122,26 @@ func postRebalanceChecks(sc clusteragent.ServerContext) func(w http.ResponseWrit
}
}

// postIsolateCheck requests that a specified check be isolated in a runner
func postIsolateCheck(sc clusteragent.ServerContext) func(w http.ResponseWriter, r *http.Request) {
if sc.ClusterCheckHandler == nil {
return clusterChecksDisabledHandler
}

return func(w http.ResponseWriter, r *http.Request) {
if sc.ClusterCheckHandler.RejectOrForwardLeaderQuery(w, r) {
return
}

vars := mux.Vars(r)
isolateCheckID := vars["identifier"]

response := sc.ClusterCheckHandler.IsolateCheck(isolateCheckID)

writeJSONResponse(w, response)
}
}

// getState is used by the clustercheck config
func getState(sc clusteragent.ServerContext) func(w http.ResponseWriter, r *http.Request) {
if sc.ClusterCheckHandler == nil {
Expand Down
61 changes: 61 additions & 0 deletions pkg/cli/subcommands/clusterchecks/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@ type GlobalParams struct {
}

type cliParams struct {
GlobalParams

checkName string
force bool
checkID string
}

// MakeCommand returns a `clusterchecks` command to be used by cluster-agent
Expand Down Expand Up @@ -85,6 +88,23 @@ func MakeCommand(globalParamsGetter func() GlobalParams) *cobra.Command {

cmd.AddCommand(rebalanceCmd)

isolateCmd := &cobra.Command{
Use: "isolate",
Short: "Isolates a single check in the cluster runner",
Long: ``,
RunE: func(cmd *cobra.Command, args []string) error {
globalParams := globalParamsGetter()

return fxutil.OneShot(isolate,
fx.Supply(cliParams),
fx.Supply(bundleParams(globalParams)),
core.Bundle,
)
},
}
isolateCmd.Flags().StringVarP(&cliParams.checkID, "checkID", "", "", "the check ID to isolate")
cmd.AddCommand(isolateCmd)

return cmd
}

Expand Down Expand Up @@ -155,3 +175,44 @@ func rebalance(_ log.Component, _ config.Component, cliParams *cliParams) error

return nil
}

func isolate(_ log.Component, _ config.Component, cliParams *cliParams) error {
c := util.GetClient(false) // FIX: get certificates right then make this true
if cliParams.checkID == "" {
return fmt.Errorf("checkID must be specified")
}
urlstr := fmt.Sprintf("https://localhost:%v/api/v1/clusterchecks/isolate/check/%s", pkgconfig.Datadog.GetInt("cluster_agent.cmd_port"), cliParams.checkID)

// Set session token
err := util.SetAuthToken()
if err != nil {
return err
}

r, err := util.DoPost(c, urlstr, "application/json", bytes.NewBuffer([]byte{}))
if err != nil {
var errMap = make(map[string]string)
json.Unmarshal(r, &errMap) //nolint:errcheck
// If the error has been marshalled into a json object, check it and return it properly
if e, found := errMap["error"]; found {
err = fmt.Errorf(e)
}

fmt.Printf(`
Could not reach agent: %v
Make sure the agent is running before requesting to isolate a cluster check.
Contact support if you continue having issues.`, err)

return err
}

var response types.IsolateResponse

json.Unmarshal(r, &response) //nolint:errcheck
if response.IsIsolated {
fmt.Printf("Check %s isolated successfully on node %s\n", response.CheckID, response.CheckNode)
} else {
fmt.Printf("Check %s could not be isolated: %s\n", response.CheckID, response.Reason)
}
return nil
}
18 changes: 18 additions & 0 deletions pkg/cli/subcommands/clusterchecks/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ package clusterchecks
import (
"testing"

"github.com/DataDog/datadog-agent/comp/core"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"github.com/spf13/cobra"
"github.com/stretchr/testify/require"
)

func TestCommand(t *testing.T) {
Expand Down Expand Up @@ -41,3 +43,19 @@ func TestRebalance(t *testing.T) {
rebalance,
func() {})
}

func TestIsolate(t *testing.T) {
commands := []*cobra.Command{
MakeCommand(func() GlobalParams {
return GlobalParams{}
}),
}

fxutil.TestOneShotSubcommand(t,
commands,
[]string{"clusterchecks", "isolate", "--checkID", "checkID"},
isolate,
func(cliParams *cliParams, coreParams core.BundleParams) {
require.Equal(t, "checkID", cliParams.checkID)
})
}
14 changes: 10 additions & 4 deletions pkg/clusteragent/clusterchecks/checks_distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,19 @@ func newChecksDistribution(workersPerRunner map[string]int) checksDistribution {
// leastBusyRunner returns the runner with the lowest utilization. If there are
// several options, it gives preference to preferredRunner. If preferredRunner
// is not among the runners with the lowest utilization, it gives precedence to
// the runner with the lowest number of checks deployed.
func (distribution *checksDistribution) leastBusyRunner(preferredRunner string) string {
// the runner with the lowest number of checks deployed. excludeRunner can be set
// to avoid assigning a check to a specific runner.
func (distribution *checksDistribution) leastBusyRunner(preferredRunner string, excludeRunner string) string {
leastBusyRunner := ""
minUtilization := 0.0
numChecksLeastBusyRunner := 0

for runnerName, runnerStatus := range distribution.Runners {
// Allow exclusion of a runner from selection
if runnerName == excludeRunner {
continue
}

runnerUtilization := runnerStatus.utilization()
runnerNumChecks := runnerStatus.NumChecks

Expand All @@ -84,8 +90,8 @@ func (distribution *checksDistribution) leastBusyRunner(preferredRunner string)
return leastBusyRunner
}

func (distribution *checksDistribution) addToLeastBusy(checkID string, workersNeeded float64, preferredRunner string) {
leastBusy := distribution.leastBusyRunner(preferredRunner)
func (distribution *checksDistribution) addToLeastBusy(checkID string, workersNeeded float64, preferredRunner string, excludeRunner string) {
leastBusy := distribution.leastBusyRunner(preferredRunner, excludeRunner)
if leastBusy == "" {
return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/clusteragent/clusterchecks/checks_distribution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestAddToLeastBusy(t *testing.T) {
distribution.addCheck(checkID, checkStatus.WorkersNeeded, checkStatus.Runner)
}

distribution.addToLeastBusy("newCheck", 10, test.preferredRunner)
distribution.addToLeastBusy("newCheck", 10, test.preferredRunner, "")

assert.Equal(t, test.expectedPlacement, distribution.runnerForCheck("newCheck"))
})
Expand Down
62 changes: 62 additions & 0 deletions pkg/clusteragent/clusterchecks/dispatcher_isolate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build clusterchecks

package clusterchecks

import "github.com/DataDog/datadog-agent/pkg/clusteragent/clusterchecks/types"

func (d *dispatcher) isolateCheck(isolateCheckID string) types.IsolateResponse {
// Update stats prior to starting isolate to ensure all checks are accounted for
d.updateRunnersStats()
currentDistribution := d.currentDistribution()

// If there is only one runner, we cannot isolate the check
if len(currentDistribution.runnerWorkers()) == 1 {
return types.IsolateResponse{
CheckID: isolateCheckID,
CheckNode: "",
IsIsolated: false,
Reason: "No other runners available",
}
}

isolateNode := currentDistribution.runnerForCheck(isolateCheckID)
if isolateNode == "" {
return types.IsolateResponse{
CheckID: isolateCheckID,
CheckNode: "",
IsIsolated: false,
Reason: "Unable to find check",
}
}

proposedDistribution := newChecksDistribution(currentDistribution.runnerWorkers())

for _, checkID := range currentDistribution.checksSortedByWorkersNeeded() {
if checkID == isolateCheckID {
// Keep the check to be isolated on its current runner
continue
}

workersNeededForCheck := currentDistribution.workersNeededForCheck(checkID)
runnerForCheck := currentDistribution.runnerForCheck(checkID)

proposedDistribution.addToLeastBusy(
checkID,
workersNeededForCheck,
runnerForCheck,
isolateNode,
)
}

d.applyDistribution(proposedDistribution, currentDistribution)
return types.IsolateResponse{
CheckID: isolateCheckID,
CheckNode: isolateNode,
IsIsolated: true,
}
}
Loading

0 comments on commit b710e3d

Please sign in to comment.