Skip to content

Commit

Permalink
[check command] Add --instance-filter option (#15034)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexandreYang authored and val06 committed Jan 16, 2023
1 parent 4647959 commit 144d511
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 9 deletions.
49 changes: 44 additions & 5 deletions cmd/agent/common/autodiscovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ package common

import (
"context"
"fmt"
"time"

"go.uber.org/atomic"
utilserror "k8s.io/apimachinery/pkg/util/errors"

"github.com/DataDog/datadog-agent/pkg/autodiscovery"
"github.com/DataDog/datadog-agent/pkg/autodiscovery/integration"
Expand All @@ -18,6 +20,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/autodiscovery/scheduler"
"github.com/DataDog/datadog-agent/pkg/config"
confad "github.com/DataDog/datadog-agent/pkg/config/autodiscovery"
"github.com/DataDog/datadog-agent/pkg/util/jsonquery"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

Expand Down Expand Up @@ -211,14 +214,14 @@ func (sf schedulerFunc) Stop() {
//
// If the context is cancelled, then any accumulated, matching changes are
// returned, even if that is fewer than discoveryMinInstances.
func WaitForConfigsFromAD(ctx context.Context, checkNames []string, discoveryMinInstances int) (configs []integration.Config) {
return waitForConfigsFromAD(ctx, false, checkNames, discoveryMinInstances)
func WaitForConfigsFromAD(ctx context.Context, checkNames []string, discoveryMinInstances int, instanceFilter string) (configs []integration.Config, lastError error) {
return waitForConfigsFromAD(ctx, false, checkNames, discoveryMinInstances, instanceFilter)
}

// WaitForAllConfigsFromAD waits until its context expires, and then returns
// the full set of checks scheduled by AD.
func WaitForAllConfigsFromAD(ctx context.Context) (configs []integration.Config) {
return waitForConfigsFromAD(ctx, true, []string{}, 0)
func WaitForAllConfigsFromAD(ctx context.Context) (configs []integration.Config, lastError error) {
return waitForConfigsFromAD(ctx, true, []string{}, 0, "")
}

// waitForConfigsFromAD waits for configs from the AD scheduler and returns them.
Expand All @@ -234,7 +237,7 @@ func WaitForAllConfigsFromAD(ctx context.Context) (configs []integration.Config)
// If wildcard is true, this gathers all configs scheduled before the context
// is cancelled, and then returns. It will not return before the context is
// cancelled.
func waitForConfigsFromAD(ctx context.Context, wildcard bool, checkNames []string, discoveryMinInstances int) (configs []integration.Config) {
func waitForConfigsFromAD(ctx context.Context, wildcard bool, checkNames []string, discoveryMinInstances int, instanceFilter string) (configs []integration.Config, returnErr error) {
configChan := make(chan integration.Config)

// signal to the scheduler when we are no longer waiting, so we do not continue
Expand Down Expand Up @@ -265,23 +268,59 @@ func waitForConfigsFromAD(ctx context.Context, wildcard bool, checkNames []strin
}
}

stopChan := make(chan struct{})
// add the scheduler in a goroutine, since it will schedule any "catch-up" immediately,
// placing items in configChan
go AC.AddScheduler("check-cmd", schedulerFunc(func(configs []integration.Config) {
var errors []error
for _, cfg := range configs {
if instanceFilter != "" {
instances, filterErrors := filterInstances(cfg.Instances, instanceFilter)
if len(filterErrors) > 0 {
errors = append(errors, filterErrors...)
continue
}
if len(instances) == 0 {
continue
}
cfg.Instances = instances
}

if match(cfg) && waiting.Load() {
configChan <- cfg
}
}
if len(errors) > 0 {
returnErr = utilserror.NewAggregate(errors)
stopChan <- struct{}{}
}
}), true)

for wildcard || len(configs) < discoveryMinInstances {
select {
case cfg := <-configChan:
configs = append(configs, cfg)
case <-stopChan:
return
case <-ctx.Done():
return
}
}
return
}

func filterInstances(instances []integration.Data, instanceFilter string) ([]integration.Data, []error) {
var newInstances []integration.Data
var errors []error
for _, instance := range instances {
exist, err := jsonquery.YAMLCheckExist(instance, instanceFilter)
if err != nil {
errors = append(errors, fmt.Errorf("instance filter error: %v", err))
continue
}
if exist {
newInstances = append(newInstances, instance)
}
}
return newInstances, errors
}
9 changes: 7 additions & 2 deletions cmd/agent/subcommands/jmx/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type cliParams struct {
saveFlare bool
discoveryTimeout uint
discoveryMinInstances uint
instanceFilter string
}

// Commands returns a slice of subcommands for the 'agent' command.
Expand All @@ -63,6 +64,7 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command {
jmxCmd.PersistentFlags().UintVarP(&cliParams.discoveryTimeout, "discovery-timeout", "", 5, "max retry duration until Autodiscovery resolves the check template (in seconds)")
jmxCmd.PersistentFlags().UintVarP(&discoveryRetryInterval, "discovery-retry-interval", "", 1, "(unused)")
jmxCmd.PersistentFlags().UintVarP(&cliParams.discoveryMinInstances, "discovery-min-instances", "", 1, "minimum number of config instances to be discovered before running the check(s)")
jmxCmd.PersistentFlags().StringVarP(&cliParams.instanceFilter, "instance-filter", "", "", "filter instances using jq style syntax, example: --instance-filter '.ip_address == \"127.0.0.51\"'")

// All subcommands use the same provided components, with a different
// oneShot callback, and with some complex derivation of the
Expand Down Expand Up @@ -241,11 +243,14 @@ func runJmxCommandConsole(log log.Component, config config.Component, cliParams
context.Background(), time.Duration(cliParams.discoveryTimeout)*time.Second)
var allConfigs []integration.Config
if len(cliParams.cliSelectedChecks) == 0 {
allConfigs = common.WaitForAllConfigsFromAD(waitCtx)
allConfigs, err = common.WaitForAllConfigsFromAD(waitCtx)
} else {
allConfigs = common.WaitForConfigsFromAD(waitCtx, cliParams.cliSelectedChecks, int(cliParams.discoveryMinInstances))
allConfigs, err = common.WaitForConfigsFromAD(waitCtx, cliParams.cliSelectedChecks, int(cliParams.discoveryMinInstances), cliParams.instanceFilter)
}
cancelTimeout()
if err != nil {
return err
}

err = standalone.ExecJMXCommandConsole(cliParams.command, cliParams.cliSelectedChecks, cliParams.jmxLogLevel, allConfigs)

Expand Down
7 changes: 6 additions & 1 deletion cmd/cluster-agent/commands/check/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var (
checkPause int
checkName string
checkDelay int
instanceFilter string
logLevel string
formatJSON bool
formatTable bool
Expand Down Expand Up @@ -72,6 +73,7 @@ func setupCmd(cmd *cobra.Command) {
cmd.Flags().IntVar(&checkPause, "pause", 0, "pause between multiple runs of the check, in milliseconds")
cmd.Flags().StringVarP(&logLevel, "log-level", "l", "", "set the log level (default 'off') (deprecated, use the env var DD_LOG_LEVEL instead)")
cmd.Flags().IntVarP(&checkDelay, "delay", "d", 100, "delay between running the check and grabbing the metrics in milliseconds")
cmd.Flags().StringVarP(&instanceFilter, "instance-filter", "", "", "filter instances using jq style syntax, example: --instance-filter '.ip_address == \"127.0.0.51\"'")
cmd.Flags().BoolVarP(&formatJSON, "json", "", false, "format aggregator and check runner output as json")
cmd.Flags().BoolVarP(&formatTable, "table", "", false, "format aggregator and check runner output as an ascii table")
cmd.Flags().StringVarP(&breakPoint, "breakpoint", "b", "", "set a breakpoint at a particular line number (Python checks only)")
Expand Down Expand Up @@ -164,8 +166,11 @@ func Check(loggerName config.LoggerName, confFilePath *string, flagNoColor *bool

waitCtx, cancelTimeout := context.WithTimeout(
context.Background(), time.Duration(discoveryTimeout)*time.Second)
allConfigs := common.WaitForConfigsFromAD(waitCtx, []string{checkName}, int(discoveryMinInstances))
allConfigs, err := common.WaitForConfigsFromAD(waitCtx, []string{checkName}, int(discoveryMinInstances), instanceFilter)
cancelTimeout()
if err != nil {
return err
}

// make sure the checks in cs are not JMX checks
for idx := range allConfigs {
Expand Down
8 changes: 7 additions & 1 deletion pkg/cli/subcommands/check/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type cliParams struct {
checkPause int
checkName string
checkDelay int
instanceFilter string
logLevel string
formatJSON bool
formatTable bool
Expand Down Expand Up @@ -118,6 +119,7 @@ func MakeCommand(globalParamsGetter func() GlobalParams) *cobra.Command {
cmd.Flags().IntVar(&cliParams.checkPause, "pause", 0, "pause between multiple runs of the check, in milliseconds")
cmd.Flags().StringVarP(&cliParams.logLevel, "log-level", "l", "", "set the log level (default 'off') (deprecated, use the env var DD_LOG_LEVEL instead)")
cmd.Flags().IntVarP(&cliParams.checkDelay, "delay", "d", 100, "delay between running the check and grabbing the metrics in milliseconds")
cmd.Flags().StringVarP(&cliParams.instanceFilter, "instance-filter", "", "", "filter instances using jq style syntax, example: --instance-filter '.ip_address == \"127.0.0.51\"'")
cmd.Flags().BoolVarP(&cliParams.formatJSON, "json", "", false, "format aggregator and check runner output as json")
cmd.Flags().BoolVarP(&cliParams.formatTable, "table", "", false, "format aggregator and check runner output as an ascii table")
cmd.Flags().StringVarP(&cliParams.breakPoint, "breakpoint", "b", "", "set a breakpoint at a particular line number (Python checks only)")
Expand Down Expand Up @@ -190,8 +192,12 @@ func run(log log.Component, config config.Component, cliParams *cliParams) error

waitCtx, cancelTimeout := context.WithTimeout(
context.Background(), time.Duration(cliParams.discoveryTimeout)*time.Second)
allConfigs := common.WaitForConfigsFromAD(waitCtx, []string{cliParams.checkName}, int(cliParams.discoveryMinInstances))

allConfigs, err := common.WaitForConfigsFromAD(waitCtx, []string{cliParams.checkName}, int(cliParams.discoveryMinInstances), cliParams.instanceFilter)
cancelTimeout()
if err != nil {
return err
}

// make sure the checks in cs are not JMX checks
for idx := range allConfigs {
Expand Down
17 changes: 17 additions & 0 deletions pkg/util/jsonquery/yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package jsonquery
import (
"fmt"
"time"

"gopkg.in/yaml.v3"
)

// Copy from https://github.com/itchyny/gojq/blob/main/cli/yaml.go
Expand Down Expand Up @@ -46,3 +48,18 @@ func NormalizeYAMLForGoJQ(v interface{}) interface{} {
return v
}
}

// YAMLCheckExist check a property/value from a YAML exist (jq style syntax)
func YAMLCheckExist(yamlData []byte, query string) (bool, error) {
var yamlContent interface{}
if err := yaml.Unmarshal(yamlData, &yamlContent); err != nil {
return false, err
}
yamlContent = NormalizeYAMLForGoJQ(yamlContent)
output, _, err := RunSingleOutput(query, yamlContent)
var exist bool
if err := yaml.Unmarshal([]byte(output), &exist); err != nil {
return false, fmt.Errorf("filter query must return a boolean: %s", err)
}
return exist, err
}
32 changes: 32 additions & 0 deletions pkg/util/jsonquery/yaml_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2023-present Datadog, Inc.

package jsonquery

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/DataDog/datadog-agent/pkg/autodiscovery/integration"
)

func TestYAMLExistQuery(t *testing.T) {
exist, err := YAMLCheckExist(integration.Data("{\"ip_address\": \"127.0.0.50\"}"), ".ip_address == \"127.0.0.50\"")
assert.NoError(t, err)
assert.True(t, exist)

exist, err = YAMLCheckExist(integration.Data("{\"ip_address\": \"127.0.0.50\"}"), ".ip_address == \"127.0.0.99\"")
assert.NoError(t, err)
assert.False(t, exist)

exist, err = YAMLCheckExist(integration.Data("{\"ip_address\": \"127.0.0.50\"}"), ".ip_address")
assert.EqualError(t, err, "filter query must return a boolean: yaml: unmarshal errors:\n line 1: cannot unmarshal !!str `127.0.0.50` into bool")
assert.False(t, exist)

exist, err = YAMLCheckExist(integration.Data("{}"), ".ip_address == \"127.0.0.99\"")
assert.NoError(t, err)
assert.False(t, exist)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Each section from every release note are combined when the
# CHANGELOG.rst is rendered. So the text needs to be worded so that
# it does not depend on any information only available in another
# section. This may mean repeating some details, but each section
# must be readable independently of the other.
#
# Each section note must be formatted as reStructuredText.
---
enhancements:
- |
Add an ``--instance-filter`` option to the Agent check command.

0 comments on commit 144d511

Please sign in to comment.