diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 59b2424a752d..6bacd956f740 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -46,6 +46,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Filebeat* +- Set `ecs: true` in user_agent processors when loading pipelines with Filebeat 7.0.x into Elasticsearch 6.7.x. {issue}10655[10655] {pull}10875[10875] + *Heartbeat* - Remove monitor generator script that was rarely used. {pull}9648[9648] diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index 0c1983ef3fc9..0f21dc4a026d 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -121,6 +121,12 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string return nil } } + + err := setECSProcessors(esClient.GetVersion(), pipelineID, content) + if err != nil { + return fmt.Errorf("failed to adapt pipeline for ECS compatibility: %v", err) + } + body, err := esClient.LoadJSON(path, content) if err != nil { return interpretError(err, body) @@ -129,6 +135,40 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string return nil } +// setECSProcessors sets required ECS options in processors when filebeat version is >= 7.0.0 +// and ES is 6.7.X to ease migration to ECS. +func setECSProcessors(esVersion common.Version, pipelineID string, content map[string]interface{}) error { + ecsVersion := common.MustNewVersion("7.0.0") + if !esVersion.LessThan(ecsVersion) { + return nil + } + + p, ok := content["processors"] + if !ok { + return nil + } + processors, ok := p.([]interface{}) + if !ok { + return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p) + } + + minUserAgentVersion := common.MustNewVersion("6.7.0") + for _, p := range processors { + processor, ok := p.(map[string]interface{}) + if !ok { + continue + } + if options, ok := processor["user_agent"].(map[string]interface{}); ok { + if esVersion.LessThan(minUserAgentVersion) { + return fmt.Errorf("user_agent processor requires option 'ecs: true', but Elasticsearch %v does not support this option (Elasticsearch %v or newer is required)", esVersion, minUserAgentVersion) + } + logp.Debug("modules", "Setting 'ecs: true' option in user_agent processor for field '%v' in pipeline '%s'", options["field"], pipelineID) + options["ecs"] = true + } + } + return nil +} + func deletePipeline(esClient PipelineLoader, pipelineID string) error { path := makeIngestPipelinePath(pipelineID) _, _, err := esClient.Request("DELETE", path, "", nil, nil) diff --git a/filebeat/fileset/pipelines_test.go b/filebeat/fileset/pipelines_test.go index 194df5e9f149..a9758df894a2 100644 --- a/filebeat/fileset/pipelines_test.go +++ b/filebeat/fileset/pipelines_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/assert" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/outputs/elasticsearch" ) @@ -103,3 +104,112 @@ func TestLoadPipelinesWithMultiPipelineFileset(t *testing.T) { }) } } + +func TestSetEcsProcessors(t *testing.T) { + cases := []struct { + name string + esVersion *common.Version + content map[string]interface{} + expected map[string]interface{} + isErrExpected bool + }{ + { + name: "ES < 6.7.0", + esVersion: common.MustNewVersion("6.6.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "user_agent": map[string]interface{}{ + "field": "foo.http_user_agent", + }, + }, + }}, + isErrExpected: true, + }, + { + name: "ES == 6.7.0", + esVersion: common.MustNewVersion("6.7.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "rename": map[string]interface{}{ + "field": "foo.src_ip", + "target_field": "source.ip", + }, + }, + map[string]interface{}{ + "user_agent": map[string]interface{}{ + "field": "foo.http_user_agent", + }, + }, + }, + }, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "rename": map[string]interface{}{ + "field": "foo.src_ip", + "target_field": "source.ip", + }, + }, + map[string]interface{}{ + "user_agent": map[string]interface{}{ + "field": "foo.http_user_agent", + "ecs": true, + }, + }, + }, + }, + isErrExpected: false, + }, + { + name: "ES >= 7.0.0", + esVersion: common.MustNewVersion("7.0.0"), + content: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "rename": map[string]interface{}{ + "field": "foo.src_ip", + "target_field": "source.ip", + }, + }, + map[string]interface{}{ + "user_agent": map[string]interface{}{ + "field": "foo.http_user_agent", + }, + }, + }, + }, + expected: map[string]interface{}{ + "processors": []interface{}{ + map[string]interface{}{ + "rename": map[string]interface{}{ + "field": "foo.src_ip", + "target_field": "source.ip", + }, + }, + map[string]interface{}{ + "user_agent": map[string]interface{}{ + "field": "foo.http_user_agent", + }, + }, + }, + }, + isErrExpected: false, + }, + } + + for _, test := range cases { + test := test + t.Run(test.name, func(t *testing.T) { + t.Parallel() + err := setECSProcessors(*test.esVersion, "foo-pipeline", test.content) + if test.isErrExpected { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, test.expected, test.content) + } + }) + } +}