diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ce36b86222c..70439279c21 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -256,6 +256,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix long registry migration times. {pull}20717[20717] {issue}20705[20705] - Fix event types and categories in auditd module to comply with ECS {pull}20652[20652] - Update documentation in the azure module filebeat. {pull}20815[20815] +- Provide backwards compatibility for the `set` processor when Elasticsearch is less than 7.9.0. {pull}20908[20908] *Heartbeat* diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index db129305463..3d89e607ec6 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -127,6 +127,11 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string return fmt.Errorf("failed to adapt pipeline for ECS compatibility: %v", err) } + err = modifySetProcessor(esClient.GetVersion(), pipelineID, content) + if err != nil { + return fmt.Errorf("failed to modify set processor in pipeline: %v", err) + } + body, err := esClient.LoadJSON(path, content) if err != nil { return interpretError(err, body) @@ -232,3 +237,57 @@ func interpretError(initialErr error, body []byte) error { return fmt.Errorf("couldn't load pipeline: %v. Response body: %s", initialErr, body) } + +// modifySetProcessor replaces ignore_empty_value option with an if statement +// so ES less than 7.9 will still work +func modifySetProcessor(esVersion common.Version, pipelineID string, content map[string]interface{}) error { + flagVersion := common.MustNewVersion("7.9.0") + if !esVersion.LessThan(flagVersion) { + return nil + } + + p, ok := content["processors"] + if !ok { + return nil + } + processors, ok := p.([]interface{}) + if !ok { + return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p) + } + + for _, p := range processors { + processor, ok := p.(map[string]interface{}) + if !ok { + continue + } + if options, ok := processor["set"].(map[string]interface{}); ok { + _, ok := options["ignore_empty_value"].(bool) + if !ok { + // don't have ignore_empty_value nothing to do + continue + } + + logp.Debug("modules", "In pipeline %q removing unsupported 'ignore_empty_value' in set processor", pipelineID) + delete(options, "ignore_empty_value") + + _, ok = options["if"].(string) + if ok { + // assume if check is sufficient + continue + } + val, ok := options["value"].(string) + if !ok { + continue + } + + newIf := strings.TrimLeft(val, "{ ") + newIf = strings.TrimRight(newIf, "} ") + newIf = strings.ReplaceAll(newIf, ".", "?.") + newIf = "ctx?." + newIf + " != null" + + logp.Debug("modules", "In pipeline %q adding if %s to replace 'ignore_empty_value' in set processor", pipelineID, newIf) + options["if"] = newIf + } + } + return nil +} diff --git a/filebeat/fileset/pipelines_test.go b/filebeat/fileset/pipelines_test.go index 648e82a1c2e..65a10212b6b 100644 --- a/filebeat/fileset/pipelines_test.go +++ b/filebeat/fileset/pipelines_test.go @@ -215,3 +215,180 @@ func TestSetEcsProcessors(t *testing.T) { }) } } + +func TestModifySetProcessor(t *testing.T) { + cases := []struct { + name string + esVersion *common.Version + content map[string]interface{} + expected map[string]interface{} + isErrExpected bool + }{ + { + name: "ES < 7.9.0", + esVersion: common.MustNewVersion("7.8.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": true, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "if": "ctx?.panw?.panos?.ruleset != null", + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES == 7.9.0", + esVersion: common.MustNewVersion("7.9.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": true, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": true, + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES > 7.9.0", + esVersion: common.MustNewVersion("8.0.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": true, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": true, + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "existing if", + esVersion: common.MustNewVersion("7.7.7"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": true, + "if": "ctx?.panw?.panos?.ruleset != null", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "if": "ctx?.panw?.panos?.ruleset != null", + }, + }, + }}, + isErrExpected: false, + }, + { + name: "ignore_empty_value is false", + esVersion: common.MustNewVersion("7.7.7"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "ignore_empty_value": false, + "if": "ctx?.panw?.panos?.ruleset != null", + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "value": "{{panw.panos.ruleset}}", + "if": "ctx?.panw?.panos?.ruleset != null", + }, + }, + }}, + isErrExpected: false, + }, + { + name: "no value", + esVersion: common.MustNewVersion("7.7.7"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + "ignore_empty_value": false, + }, + }, + }}, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "set": map[string]interface{}{ + "field": "rule.name", + }, + }, + }}, + isErrExpected: false, + }, + } + + for _, test := range cases { + test := test + t.Run(test.name, func(t *testing.T) { + t.Parallel() + err := modifySetProcessor(*test.esVersion, "foo-pipeline", test.content) + if test.isErrExpected { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, test.expected, test.content, test.name) + } + }) + } +}