diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6aa58f757e5..d9522335487 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -104,6 +104,7 @@ field. You can revert this change by configuring tags for the module and omittin - [Autodiscover] Check if runner is already running before starting again. {pull}18564[18564] - Fix `keystore add` hanging under Windows. {issue}18649[18649] {pull}18654[18654] - Fix an issue where error messages are not accurate in mapstriface. {issue}18662[18662] {pull}18663[18663] +- Fix regression in `add_kubernetes_metadata`, so configured `indexers` and `matchers` are used if defaults are not disabled. {issue}18481[18481] {pull}18818[18818] *Auditbeat* diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index 9b7bc5653f8..94bc3739145 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -90,24 +90,12 @@ func isKubernetesAvailableWithRetry(client k8sclient.Interface) bool { // New constructs a new add_kubernetes_metadata processor. func New(cfg *common.Config) (processors.Processor, error) { - config := defaultKubernetesAnnotatorConfig() - log := logp.NewLogger(selector).With("libbeat.processor", "add_kubernetes_metadata") - - err := cfg.Unpack(&config) + config, err := newProcessorConfig(cfg, Indexing) if err != nil { - return nil, fmt.Errorf("fail to unpack the kubernetes configuration: %s", err) - } - - //Load default indexer configs - if config.DefaultIndexers.Enabled == true { - config.Indexers = Indexing.GetDefaultIndexerConfigs() - } - - //Load default matcher configs - if config.DefaultMatchers.Enabled == true { - config.Matchers = Indexing.GetDefaultMatcherConfigs() + return nil, err } + log := logp.NewLogger(selector).With("libbeat.processor", "add_kubernetes_metadata") processor := &kubernetesAnnotator{ log: log, cache: newCache(config.CleanupTimeout), @@ -121,6 +109,27 @@ func New(cfg *common.Config) (processors.Processor, error) { return processor, nil } +func newProcessorConfig(cfg *common.Config, register *Register) (kubeAnnotatorConfig, error) { + config := defaultKubernetesAnnotatorConfig() + + err := cfg.Unpack(&config) + if err != nil { + return config, fmt.Errorf("fail to unpack the kubernetes configuration: %s", err) + } + + //Load and append default indexer configs + if config.DefaultIndexers.Enabled { + config.Indexers = append(config.Indexers, register.GetDefaultIndexerConfigs()...) + } + + //Load and append default matcher configs + if config.DefaultMatchers.Enabled { + config.Matchers = append(config.Matchers, register.GetDefaultMatcherConfigs()...) + } + + return config, nil +} + func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *common.Config) { k.initOnce.Do(func() { client, err := kubernetes.GetKubernetesClient(config.KubeConfig) diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes_test.go b/libbeat/processors/add_kubernetes_metadata/kubernetes_test.go index dec8f446fdc..a687fe6f8e3 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes_test.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" @@ -128,3 +129,103 @@ func TestAnnotatorWithNoKubernetesAvailable(t *testing.T) { assert.Equal(t, intialEventMap, event.Fields) } + +// TestNewProcessorConfigDefaultIndexers validates the behaviour of default indexers and +// matchers settings +func TestNewProcessorConfigDefaultIndexers(t *testing.T) { + emptyRegister := NewRegister() + registerWithDefaults := NewRegister() + registerWithDefaults.AddDefaultIndexerConfig("ip_port", *common.NewConfig()) + registerWithDefaults.AddDefaultMatcherConfig("field_format", *common.MustNewConfigFrom(map[string]interface{}{ + "format": "%{[destination.ip]}:%{[destination.port]}", + })) + + configWithIndexersAndMatchers := common.MustNewConfigFrom(map[string]interface{}{ + "indexers": []map[string]interface{}{ + { + "container": map[string]interface{}{}, + }, + }, + "matchers": []map[string]interface{}{ + { + "fields": map[string]interface{}{ + "lookup_fields": []string{"container.id"}, + }, + }, + }, + }) + configOverrideDefaults := common.MustNewConfigFrom(map[string]interface{}{ + "default_indexers.enabled": "false", + "default_matchers.enabled": "false", + }) + require.NoError(t, configOverrideDefaults.Merge(configWithIndexersAndMatchers)) + + cases := map[string]struct { + register *Register + config *common.Config + expectedMatchers []string + expectedIndexers []string + }{ + "no matchers": { + register: emptyRegister, + config: common.NewConfig(), + }, + "one configured indexer and matcher": { + register: emptyRegister, + config: configWithIndexersAndMatchers, + expectedIndexers: []string{"container"}, + expectedMatchers: []string{"fields"}, + }, + "default indexers and matchers": { + register: registerWithDefaults, + config: common.NewConfig(), + expectedIndexers: []string{"ip_port"}, + expectedMatchers: []string{"field_format"}, + }, + "default indexers and matchers, don't use indexers": { + register: registerWithDefaults, + config: common.MustNewConfigFrom(map[string]interface{}{ + "default_indexers.enabled": "false", + }), + expectedMatchers: []string{"field_format"}, + }, + "default indexers and matchers, don't use matchers": { + register: registerWithDefaults, + config: common.MustNewConfigFrom(map[string]interface{}{ + "default_matchers.enabled": "false", + }), + expectedIndexers: []string{"ip_port"}, + }, + "one configured indexer and matcher and defaults, configured should come first": { + register: registerWithDefaults, + config: configWithIndexersAndMatchers, + expectedIndexers: []string{"container", "ip_port"}, + expectedMatchers: []string{"fields", "field_format"}, + }, + "override defaults": { + register: registerWithDefaults, + config: configOverrideDefaults, + expectedIndexers: []string{"container"}, + expectedMatchers: []string{"fields"}, + }, + } + + names := func(plugins PluginConfig) []string { + var ns []string + for _, plugin := range plugins { + for name := range plugin { + ns = append(ns, name) + } + } + return ns + } + + for title, c := range cases { + t.Run(title, func(t *testing.T) { + config, err := newProcessorConfig(c.config, c.register) + require.NoError(t, err) + assert.Equal(t, c.expectedMatchers, names(config.Matchers), "expected matchers") + assert.Equal(t, c.expectedIndexers, names(config.Indexers), "expected indexers") + }) + } +}