From 144d511d1b639831aa3172500f9c8567a93dba9c Mon Sep 17 00:00:00 2001 From: Alexandre Yang Date: Fri, 13 Jan 2023 16:51:50 +0100 Subject: [PATCH] [check command] Add `--instance-filter` option (#15034) --- cmd/agent/common/autodiscovery.go | 49 +++++++++++++++++-- cmd/agent/subcommands/jmx/command.go | 9 +++- cmd/cluster-agent/commands/check/check.go | 7 ++- pkg/cli/subcommands/check/command.go | 8 ++- pkg/util/jsonquery/yaml.go | 17 +++++++ pkg/util/jsonquery/yaml_test.go | 32 ++++++++++++ ..._agent_check_command-c166bec4ff8ebf96.yaml | 11 +++++ 7 files changed, 124 insertions(+), 9 deletions(-) create mode 100644 pkg/util/jsonquery/yaml_test.go create mode 100644 releasenotes/notes/add_instance_filter_to_agent_check_command-c166bec4ff8ebf96.yaml diff --git a/cmd/agent/common/autodiscovery.go b/cmd/agent/common/autodiscovery.go index e3d1e7f14cdafb..8077cbaaa7c0dd 100644 --- a/cmd/agent/common/autodiscovery.go +++ b/cmd/agent/common/autodiscovery.go @@ -7,9 +7,11 @@ package common import ( "context" + "fmt" "time" "go.uber.org/atomic" + utilserror "k8s.io/apimachinery/pkg/util/errors" "github.com/DataDog/datadog-agent/pkg/autodiscovery" "github.com/DataDog/datadog-agent/pkg/autodiscovery/integration" @@ -18,6 +20,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/autodiscovery/scheduler" "github.com/DataDog/datadog-agent/pkg/config" confad "github.com/DataDog/datadog-agent/pkg/config/autodiscovery" + "github.com/DataDog/datadog-agent/pkg/util/jsonquery" "github.com/DataDog/datadog-agent/pkg/util/log" ) @@ -211,14 +214,14 @@ func (sf schedulerFunc) Stop() { // // If the context is cancelled, then any accumulated, matching changes are // returned, even if that is fewer than discoveryMinInstances. -func WaitForConfigsFromAD(ctx context.Context, checkNames []string, discoveryMinInstances int) (configs []integration.Config) { - return waitForConfigsFromAD(ctx, false, checkNames, discoveryMinInstances) +func WaitForConfigsFromAD(ctx context.Context, checkNames []string, discoveryMinInstances int, instanceFilter string) (configs []integration.Config, lastError error) { + return waitForConfigsFromAD(ctx, false, checkNames, discoveryMinInstances, instanceFilter) } // WaitForAllConfigsFromAD waits until its context expires, and then returns // the full set of checks scheduled by AD. -func WaitForAllConfigsFromAD(ctx context.Context) (configs []integration.Config) { - return waitForConfigsFromAD(ctx, true, []string{}, 0) +func WaitForAllConfigsFromAD(ctx context.Context) (configs []integration.Config, lastError error) { + return waitForConfigsFromAD(ctx, true, []string{}, 0, "") } // waitForConfigsFromAD waits for configs from the AD scheduler and returns them. @@ -234,7 +237,7 @@ func WaitForAllConfigsFromAD(ctx context.Context) (configs []integration.Config) // If wildcard is true, this gathers all configs scheduled before the context // is cancelled, and then returns. It will not return before the context is // cancelled. -func waitForConfigsFromAD(ctx context.Context, wildcard bool, checkNames []string, discoveryMinInstances int) (configs []integration.Config) { +func waitForConfigsFromAD(ctx context.Context, wildcard bool, checkNames []string, discoveryMinInstances int, instanceFilter string) (configs []integration.Config, returnErr error) { configChan := make(chan integration.Config) // signal to the scheduler when we are no longer waiting, so we do not continue @@ -265,23 +268,59 @@ func waitForConfigsFromAD(ctx context.Context, wildcard bool, checkNames []strin } } + stopChan := make(chan struct{}) // add the scheduler in a goroutine, since it will schedule any "catch-up" immediately, // placing items in configChan go AC.AddScheduler("check-cmd", schedulerFunc(func(configs []integration.Config) { + var errors []error for _, cfg := range configs { + if instanceFilter != "" { + instances, filterErrors := filterInstances(cfg.Instances, instanceFilter) + if len(filterErrors) > 0 { + errors = append(errors, filterErrors...) + continue + } + if len(instances) == 0 { + continue + } + cfg.Instances = instances + } + if match(cfg) && waiting.Load() { configChan <- cfg } } + if len(errors) > 0 { + returnErr = utilserror.NewAggregate(errors) + stopChan <- struct{}{} + } }), true) for wildcard || len(configs) < discoveryMinInstances { select { case cfg := <-configChan: configs = append(configs, cfg) + case <-stopChan: + return case <-ctx.Done(): return } } return } + +func filterInstances(instances []integration.Data, instanceFilter string) ([]integration.Data, []error) { + var newInstances []integration.Data + var errors []error + for _, instance := range instances { + exist, err := jsonquery.YAMLCheckExist(instance, instanceFilter) + if err != nil { + errors = append(errors, fmt.Errorf("instance filter error: %v", err)) + continue + } + if exist { + newInstances = append(newInstances, instance) + } + } + return newInstances, errors +} diff --git a/cmd/agent/subcommands/jmx/command.go b/cmd/agent/subcommands/jmx/command.go index 7db96d29643077..1720a08e874da8 100644 --- a/cmd/agent/subcommands/jmx/command.go +++ b/cmd/agent/subcommands/jmx/command.go @@ -45,6 +45,7 @@ type cliParams struct { saveFlare bool discoveryTimeout uint discoveryMinInstances uint + instanceFilter string } // Commands returns a slice of subcommands for the 'agent' command. @@ -63,6 +64,7 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command { jmxCmd.PersistentFlags().UintVarP(&cliParams.discoveryTimeout, "discovery-timeout", "", 5, "max retry duration until Autodiscovery resolves the check template (in seconds)") jmxCmd.PersistentFlags().UintVarP(&discoveryRetryInterval, "discovery-retry-interval", "", 1, "(unused)") jmxCmd.PersistentFlags().UintVarP(&cliParams.discoveryMinInstances, "discovery-min-instances", "", 1, "minimum number of config instances to be discovered before running the check(s)") + jmxCmd.PersistentFlags().StringVarP(&cliParams.instanceFilter, "instance-filter", "", "", "filter instances using jq style syntax, example: --instance-filter '.ip_address == \"127.0.0.51\"'") // All subcommands use the same provided components, with a different // oneShot callback, and with some complex derivation of the @@ -241,11 +243,14 @@ func runJmxCommandConsole(log log.Component, config config.Component, cliParams context.Background(), time.Duration(cliParams.discoveryTimeout)*time.Second) var allConfigs []integration.Config if len(cliParams.cliSelectedChecks) == 0 { - allConfigs = common.WaitForAllConfigsFromAD(waitCtx) + allConfigs, err = common.WaitForAllConfigsFromAD(waitCtx) } else { - allConfigs = common.WaitForConfigsFromAD(waitCtx, cliParams.cliSelectedChecks, int(cliParams.discoveryMinInstances)) + allConfigs, err = common.WaitForConfigsFromAD(waitCtx, cliParams.cliSelectedChecks, int(cliParams.discoveryMinInstances), cliParams.instanceFilter) } cancelTimeout() + if err != nil { + return err + } err = standalone.ExecJMXCommandConsole(cliParams.command, cliParams.cliSelectedChecks, cliParams.jmxLogLevel, allConfigs) diff --git a/cmd/cluster-agent/commands/check/check.go b/cmd/cluster-agent/commands/check/check.go index 166cfced98778e..b53c0594ba4fbf 100644 --- a/cmd/cluster-agent/commands/check/check.go +++ b/cmd/cluster-agent/commands/check/check.go @@ -43,6 +43,7 @@ var ( checkPause int checkName string checkDelay int + instanceFilter string logLevel string formatJSON bool formatTable bool @@ -72,6 +73,7 @@ func setupCmd(cmd *cobra.Command) { cmd.Flags().IntVar(&checkPause, "pause", 0, "pause between multiple runs of the check, in milliseconds") cmd.Flags().StringVarP(&logLevel, "log-level", "l", "", "set the log level (default 'off') (deprecated, use the env var DD_LOG_LEVEL instead)") cmd.Flags().IntVarP(&checkDelay, "delay", "d", 100, "delay between running the check and grabbing the metrics in milliseconds") + cmd.Flags().StringVarP(&instanceFilter, "instance-filter", "", "", "filter instances using jq style syntax, example: --instance-filter '.ip_address == \"127.0.0.51\"'") cmd.Flags().BoolVarP(&formatJSON, "json", "", false, "format aggregator and check runner output as json") cmd.Flags().BoolVarP(&formatTable, "table", "", false, "format aggregator and check runner output as an ascii table") cmd.Flags().StringVarP(&breakPoint, "breakpoint", "b", "", "set a breakpoint at a particular line number (Python checks only)") @@ -164,8 +166,11 @@ func Check(loggerName config.LoggerName, confFilePath *string, flagNoColor *bool waitCtx, cancelTimeout := context.WithTimeout( context.Background(), time.Duration(discoveryTimeout)*time.Second) - allConfigs := common.WaitForConfigsFromAD(waitCtx, []string{checkName}, int(discoveryMinInstances)) + allConfigs, err := common.WaitForConfigsFromAD(waitCtx, []string{checkName}, int(discoveryMinInstances), instanceFilter) cancelTimeout() + if err != nil { + return err + } // make sure the checks in cs are not JMX checks for idx := range allConfigs { diff --git a/pkg/cli/subcommands/check/command.go b/pkg/cli/subcommands/check/command.go index ecdae2924c40dc..3343a7379afc29 100644 --- a/pkg/cli/subcommands/check/command.go +++ b/pkg/cli/subcommands/check/command.go @@ -57,6 +57,7 @@ type cliParams struct { checkPause int checkName string checkDelay int + instanceFilter string logLevel string formatJSON bool formatTable bool @@ -118,6 +119,7 @@ func MakeCommand(globalParamsGetter func() GlobalParams) *cobra.Command { cmd.Flags().IntVar(&cliParams.checkPause, "pause", 0, "pause between multiple runs of the check, in milliseconds") cmd.Flags().StringVarP(&cliParams.logLevel, "log-level", "l", "", "set the log level (default 'off') (deprecated, use the env var DD_LOG_LEVEL instead)") cmd.Flags().IntVarP(&cliParams.checkDelay, "delay", "d", 100, "delay between running the check and grabbing the metrics in milliseconds") + cmd.Flags().StringVarP(&cliParams.instanceFilter, "instance-filter", "", "", "filter instances using jq style syntax, example: --instance-filter '.ip_address == \"127.0.0.51\"'") cmd.Flags().BoolVarP(&cliParams.formatJSON, "json", "", false, "format aggregator and check runner output as json") cmd.Flags().BoolVarP(&cliParams.formatTable, "table", "", false, "format aggregator and check runner output as an ascii table") cmd.Flags().StringVarP(&cliParams.breakPoint, "breakpoint", "b", "", "set a breakpoint at a particular line number (Python checks only)") @@ -190,8 +192,12 @@ func run(log log.Component, config config.Component, cliParams *cliParams) error waitCtx, cancelTimeout := context.WithTimeout( context.Background(), time.Duration(cliParams.discoveryTimeout)*time.Second) - allConfigs := common.WaitForConfigsFromAD(waitCtx, []string{cliParams.checkName}, int(cliParams.discoveryMinInstances)) + + allConfigs, err := common.WaitForConfigsFromAD(waitCtx, []string{cliParams.checkName}, int(cliParams.discoveryMinInstances), cliParams.instanceFilter) cancelTimeout() + if err != nil { + return err + } // make sure the checks in cs are not JMX checks for idx := range allConfigs { diff --git a/pkg/util/jsonquery/yaml.go b/pkg/util/jsonquery/yaml.go index 58cfba9742efe2..a980b6a59f8c1f 100644 --- a/pkg/util/jsonquery/yaml.go +++ b/pkg/util/jsonquery/yaml.go @@ -8,6 +8,8 @@ package jsonquery import ( "fmt" "time" + + "gopkg.in/yaml.v3" ) // Copy from https://github.com/itchyny/gojq/blob/main/cli/yaml.go @@ -46,3 +48,18 @@ func NormalizeYAMLForGoJQ(v interface{}) interface{} { return v } } + +// YAMLCheckExist check a property/value from a YAML exist (jq style syntax) +func YAMLCheckExist(yamlData []byte, query string) (bool, error) { + var yamlContent interface{} + if err := yaml.Unmarshal(yamlData, &yamlContent); err != nil { + return false, err + } + yamlContent = NormalizeYAMLForGoJQ(yamlContent) + output, _, err := RunSingleOutput(query, yamlContent) + var exist bool + if err := yaml.Unmarshal([]byte(output), &exist); err != nil { + return false, fmt.Errorf("filter query must return a boolean: %s", err) + } + return exist, err +} diff --git a/pkg/util/jsonquery/yaml_test.go b/pkg/util/jsonquery/yaml_test.go new file mode 100644 index 00000000000000..231141e78d6be2 --- /dev/null +++ b/pkg/util/jsonquery/yaml_test.go @@ -0,0 +1,32 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2023-present Datadog, Inc. + +package jsonquery + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/DataDog/datadog-agent/pkg/autodiscovery/integration" +) + +func TestYAMLExistQuery(t *testing.T) { + exist, err := YAMLCheckExist(integration.Data("{\"ip_address\": \"127.0.0.50\"}"), ".ip_address == \"127.0.0.50\"") + assert.NoError(t, err) + assert.True(t, exist) + + exist, err = YAMLCheckExist(integration.Data("{\"ip_address\": \"127.0.0.50\"}"), ".ip_address == \"127.0.0.99\"") + assert.NoError(t, err) + assert.False(t, exist) + + exist, err = YAMLCheckExist(integration.Data("{\"ip_address\": \"127.0.0.50\"}"), ".ip_address") + assert.EqualError(t, err, "filter query must return a boolean: yaml: unmarshal errors:\n line 1: cannot unmarshal !!str `127.0.0.50` into bool") + assert.False(t, exist) + + exist, err = YAMLCheckExist(integration.Data("{}"), ".ip_address == \"127.0.0.99\"") + assert.NoError(t, err) + assert.False(t, exist) +} diff --git a/releasenotes/notes/add_instance_filter_to_agent_check_command-c166bec4ff8ebf96.yaml b/releasenotes/notes/add_instance_filter_to_agent_check_command-c166bec4ff8ebf96.yaml new file mode 100644 index 00000000000000..7d25f7775d6a55 --- /dev/null +++ b/releasenotes/notes/add_instance_filter_to_agent_check_command-c166bec4ff8ebf96.yaml @@ -0,0 +1,11 @@ +# Each section from every release note are combined when the +# CHANGELOG.rst is rendered. So the text needs to be worded so that +# it does not depend on any information only available in another +# section. This may mean repeating some details, but each section +# must be readable independently of the other. +# +# Each section note must be formatted as reStructuredText. +--- +enhancements: + - | + Add an ``--instance-filter`` option to the Agent check command.