diff --git a/CHANGELOG-developer.asciidoc b/CHANGELOG-developer.asciidoc index 00018bbb092..d46584f689e 100644 --- a/CHANGELOG-developer.asciidoc +++ b/CHANGELOG-developer.asciidoc @@ -62,3 +62,4 @@ The list below covers the major changes between 6.3.0 and HEAD only. - Allow to disable config resolver using the `Settings.DisableConfigResolver` field when initializing libbeat. {pull}8769[8769] - Add `mage.AddPlatforms` to allow to specify dependent platforms when building a beat. {pull}8889[8889] - Add docker image building to `mage.Package`. {pull}8898[8898] +- Filesets can now define multiple ingest pipelines, with the first one considered as the entry point pipeline. {pull}8914[8914] diff --git a/docs/devguide/modules-dev-guide.asciidoc b/docs/devguide/modules-dev-guide.asciidoc index 7fc1e5e10b1..0d8c9d7883d 100644 --- a/docs/devguide/modules-dev-guide.asciidoc +++ b/docs/devguide/modules-dev-guide.asciidoc @@ -229,6 +229,24 @@ This example selects the ingest pipeline file based on the value of the resolve to `ingest/with_plugins.json` (assuming the variable value isn't overridden at runtime.) +In 6.6 and later, you can specify multiple ingest pipelines. + +[source,yaml] +---- +ingest_pipeline: + - ingest/main.json + - ingest/plain_logs.json + - ingest/json_logs.json +---- + +When multiple ingest pipelines are specified the first one in the list is +considered to be the entry point pipeline. + +One reason for using multiple pipelines might be to send all logs harvested +by this fileset to the entry point pipeline and have it delegate different parts of +the processing to other pipelines. You can read details about setting +this up in <>. + [float] ==== config/*.yml @@ -336,6 +354,63 @@ Note that you should follow the convention of naming of fields prefixed with the module and fileset name: `{module}.{fileset}.field`, e.g. `nginx.access.remote_ip`. Also, please review our <>. +[[ingest-json-entry-point-pipeline]] +In 6.6 and later, ingest pipelines can use the +{ref}/conditionals-with-multiple-pipelines.html[`pipeline` processor] to delegate +parts of the processings to other pipelines. + +This can be useful if you want a fileset to ingest the same _logical_ information +presented in different formats, e.g. csv vs. json versions of the same log files. +Imagine an entry point ingest pipeline that detects the format of a log entry and then conditionally +delegates further processing of that log entry, depending on the format, to another +pipeline. + +["source","json",subs="callouts"] +---- +{ + "processors": [ + { + "grok": { + "field": "message", + "patterns": [ + "^%{CHAR:first_char}" + ], + "pattern_definitions": { + "CHAR": "." + } + } + }, + { + "pipeline": { + "if": "ctx.first_char == '{'", + "name": "{< IngestPipeline "json-log-processing-pipeline" >}" <1> + } + }, + { + "pipeline": { + "if": "ctx.first_char != '{'", + "name": "{< IngestPipeline "plain-log-processing-pipeline" >}" + } + } + ] +} +---- +<1> Use the `IngestPipeline` template function to resolve the name. This function converts the +specified name into the fully qualified pipeline ID that is stored in Elasticsearch. + +In order for the above pipeline to work, Filebeat must load the entry point pipeline +as well as any sub-pipelines into Elasticsearch. You can tell Filebeat to do +so by specifying all the necessary pipelines for the fileset in its `manifest.yml` +file. The first pipeline in the list is considered to be the entry point pipeline. + +[source,yaml] +---- +ingest_pipeline: + - ingest/main.json + - ingest/plain_logs.json + - ingest/json_logs.json +---- + While developing the pipeline definition, we recommend making use of the {elasticsearch}/simulate-pipeline-api.html[Simulate Pipeline API] for testing and quick iteration. diff --git a/filebeat/_meta/test/module/foo/_meta/config.yml b/filebeat/_meta/test/module/foo/_meta/config.yml new file mode 100644 index 00000000000..309b856eb83 --- /dev/null +++ b/filebeat/_meta/test/module/foo/_meta/config.yml @@ -0,0 +1,8 @@ +- module: foo + # Fileset with multiple pipelines + multi: + enabled: true + + # Fileset with multiple pipelines with the last one being bad + multibad: + enabled: true diff --git a/filebeat/_meta/test/module/foo/multi/config/multi.yml b/filebeat/_meta/test/module/foo/multi/config/multi.yml new file mode 100644 index 00000000000..00ffdbc7204 --- /dev/null +++ b/filebeat/_meta/test/module/foo/multi/config/multi.yml @@ -0,0 +1,8 @@ +type: log +paths: + - /tmp +exclude_files: [".gz$"] + +fields: + service.name: "foo" +fields_under_root: true diff --git a/filebeat/_meta/test/module/foo/multi/ingest/json_logs.json b/filebeat/_meta/test/module/foo/multi/ingest/json_logs.json new file mode 100644 index 00000000000..183f11b881c --- /dev/null +++ b/filebeat/_meta/test/module/foo/multi/ingest/json_logs.json @@ -0,0 +1,10 @@ +{ + "processors": [ + { + "rename": { + "field": "json", + "target_field": "log.meta" + } + } + ] +} diff --git a/filebeat/_meta/test/module/foo/multi/ingest/pipeline.json b/filebeat/_meta/test/module/foo/multi/ingest/pipeline.json new file mode 100644 index 00000000000..3197d1d1731 --- /dev/null +++ b/filebeat/_meta/test/module/foo/multi/ingest/pipeline.json @@ -0,0 +1,27 @@ +{ + "processors": [ + { + "grok": { + "field": "message", + "patterns": [ + "^%{CHAR:first_char}" + ], + "pattern_definitions": { + "CHAR": "." + } + } + }, + { + "pipeline": { + "if": "ctx.first_char == '{'", + "name": "{< IngestPipeline "json_logs" >}" + } + }, + { + "pipeline": { + "if": "ctx.first_char != '{'", + "name": "{< IngestPipeline "plain_logs" >}" + } + } + ] +} diff --git a/filebeat/_meta/test/module/foo/multi/ingest/plain_logs.json b/filebeat/_meta/test/module/foo/multi/ingest/plain_logs.json new file mode 100644 index 00000000000..c6547f342d9 --- /dev/null +++ b/filebeat/_meta/test/module/foo/multi/ingest/plain_logs.json @@ -0,0 +1,12 @@ +{ + "processors": [ + { + "grok": { + "field": "message", + "patterns": [ + "^%{DATA:some_data}" + ] + } + } + ] +} diff --git a/filebeat/_meta/test/module/foo/multi/manifest.yml b/filebeat/_meta/test/module/foo/multi/manifest.yml new file mode 100644 index 00000000000..aa961b73579 --- /dev/null +++ b/filebeat/_meta/test/module/foo/multi/manifest.yml @@ -0,0 +1,8 @@ +module_version: 1.0 + +ingest_pipeline: + - ingest/pipeline.json + - ingest/json_logs.json + - ingest/plain_logs.json + +input: config/multi.yml diff --git a/filebeat/_meta/test/module/foo/multibad/config/multi.yml b/filebeat/_meta/test/module/foo/multibad/config/multi.yml new file mode 100644 index 00000000000..00ffdbc7204 --- /dev/null +++ b/filebeat/_meta/test/module/foo/multibad/config/multi.yml @@ -0,0 +1,8 @@ +type: log +paths: + - /tmp +exclude_files: [".gz$"] + +fields: + service.name: "foo" +fields_under_root: true diff --git a/filebeat/_meta/test/module/foo/multibad/ingest/json_logs.json b/filebeat/_meta/test/module/foo/multibad/ingest/json_logs.json new file mode 100644 index 00000000000..183f11b881c --- /dev/null +++ b/filebeat/_meta/test/module/foo/multibad/ingest/json_logs.json @@ -0,0 +1,10 @@ +{ + "processors": [ + { + "rename": { + "field": "json", + "target_field": "log.meta" + } + } + ] +} diff --git a/filebeat/_meta/test/module/foo/multibad/ingest/pipeline.json b/filebeat/_meta/test/module/foo/multibad/ingest/pipeline.json new file mode 100644 index 00000000000..3197d1d1731 --- /dev/null +++ b/filebeat/_meta/test/module/foo/multibad/ingest/pipeline.json @@ -0,0 +1,27 @@ +{ + "processors": [ + { + "grok": { + "field": "message", + "patterns": [ + "^%{CHAR:first_char}" + ], + "pattern_definitions": { + "CHAR": "." + } + } + }, + { + "pipeline": { + "if": "ctx.first_char == '{'", + "name": "{< IngestPipeline "json_logs" >}" + } + }, + { + "pipeline": { + "if": "ctx.first_char != '{'", + "name": "{< IngestPipeline "plain_logs" >}" + } + } + ] +} diff --git a/filebeat/_meta/test/module/foo/multibad/ingest/plain_logs_bad.json b/filebeat/_meta/test/module/foo/multibad/ingest/plain_logs_bad.json new file mode 100644 index 00000000000..bc3ea8d3fa5 --- /dev/null +++ b/filebeat/_meta/test/module/foo/multibad/ingest/plain_logs_bad.json @@ -0,0 +1,12 @@ +{ + "processors": [ + { + "invalid_processor": { + "field": "message", + "patterns": [ + "^%{DATA:some_data}" + ] + } + } + ] +} diff --git a/filebeat/_meta/test/module/foo/multibad/manifest.yml b/filebeat/_meta/test/module/foo/multibad/manifest.yml new file mode 100644 index 00000000000..c3ae496cba4 --- /dev/null +++ b/filebeat/_meta/test/module/foo/multibad/manifest.yml @@ -0,0 +1,8 @@ +module_version: 1.0 + +ingest_pipeline: + - ingest/pipeline.json + - ingest/json_logs.json + - ingest/plain_logs_bad.json + +input: config/multi.yml diff --git a/filebeat/fileset/fileset.go b/filebeat/fileset/fileset.go index a57a40f6752..07e3321793f 100644 --- a/filebeat/fileset/fileset.go +++ b/filebeat/fileset/fileset.go @@ -34,6 +34,8 @@ import ( "strings" "text/template" + errw "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" mlimporter "github.com/elastic/beats/libbeat/ml-importer" @@ -41,13 +43,18 @@ import ( // Fileset struct is the representation of a fileset. type Fileset struct { - name string - mcfg *ModuleConfig - fcfg *FilesetConfig - modulePath string - manifest *manifest - vars map[string]interface{} - pipelineID string + name string + mcfg *ModuleConfig + fcfg *FilesetConfig + modulePath string + manifest *manifest + vars map[string]interface{} + pipelineIDs []string +} + +type pipeline struct { + id string + contents map[string]interface{} } // New allocates a new Fileset object with the given configuration. @@ -83,12 +90,12 @@ func (fs *Fileset) Read(beatVersion string) error { return err } - fs.vars, err = fs.evaluateVars() + fs.vars, err = fs.evaluateVars(beatVersion) if err != nil { return err } - fs.pipelineID, err = fs.getPipelineID(beatVersion) + fs.pipelineIDs, err = fs.getPipelineIDs(beatVersion) if err != nil { return err } @@ -101,7 +108,7 @@ func (fs *Fileset) Read(beatVersion string) error { type manifest struct { ModuleVersion string `config:"module_version"` Vars []map[string]interface{} `config:"var"` - IngestPipeline string `config:"ingest_pipeline"` + IngestPipeline []string `config:"ingest_pipeline"` Input string `config:"input"` Prospector string `config:"prospector"` MachineLearning []struct { @@ -148,10 +155,10 @@ func (fs *Fileset) readManifest() (*manifest, error) { } // evaluateVars resolves the fileset variables. -func (fs *Fileset) evaluateVars() (map[string]interface{}, error) { +func (fs *Fileset) evaluateVars(beatVersion string) (map[string]interface{}, error) { var err error vars := map[string]interface{}{} - vars["builtin"], err = fs.getBuiltinVars() + vars["builtin"], err = fs.getBuiltinVars(beatVersion) if err != nil { return nil, err } @@ -262,7 +269,14 @@ func applyTemplate(vars map[string]interface{}, templateString string, specialDe if specialDelims { tpl = tpl.Delims("{<", ">}") } - tpl, err := tpl.Parse(templateString) + + tplFunctions, err := getTemplateFunctions(vars) + if err != nil { + return "", errw.Wrap(err, "error fetching template functions") + } + tpl = tpl.Funcs(tplFunctions) + + tpl, err = tpl.Parse(templateString) if err != nil { return "", fmt.Errorf("Error parsing template %s: %v", templateString, err) } @@ -274,9 +288,27 @@ func applyTemplate(vars map[string]interface{}, templateString string, specialDe return buf.String(), nil } +func getTemplateFunctions(vars map[string]interface{}) (template.FuncMap, error) { + builtinVars, ok := vars["builtin"].(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("error fetching built-in vars as a dictionary") + } + + return template.FuncMap{ + "IngestPipeline": func(shortID string) string { + return formatPipelineID( + builtinVars["module"].(string), + builtinVars["fileset"].(string), + shortID, + builtinVars["beatVersion"].(string), + ) + }, + }, nil +} + // getBuiltinVars computes the supported built in variables and groups them // in a dictionary -func (fs *Fileset) getBuiltinVars() (map[string]interface{}, error) { +func (fs *Fileset) getBuiltinVars(beatVersion string) (map[string]interface{}, error) { host, err := os.Hostname() if err != nil || len(host) == 0 { return nil, fmt.Errorf("Error getting the hostname: %v", err) @@ -289,8 +321,11 @@ func (fs *Fileset) getBuiltinVars() (map[string]interface{}, error) { } return map[string]interface{}{ - "hostname": hostname, - "domain": domain, + "hostname": hostname, + "domain": domain, + "module": fs.mcfg.Module, + "fileset": fs.name, + "beatVersion": beatVersion, }, nil } @@ -327,7 +362,11 @@ func (fs *Fileset) getInputConfig() (*common.Config, error) { } // force our pipeline ID - err = cfg.SetString("pipeline", -1, fs.pipelineID) + rootPipelineID := "" + if len(fs.pipelineIDs) > 0 { + rootPipelineID = fs.pipelineIDs[0] + } + err = cfg.SetString("pipeline", -1, rootPipelineID) if err != nil { return nil, fmt.Errorf("Error setting the pipeline ID in the input config: %v", err) } @@ -347,43 +386,60 @@ func (fs *Fileset) getInputConfig() (*common.Config, error) { return cfg, nil } -// getPipelineID returns the Ingest Node pipeline ID -func (fs *Fileset) getPipelineID(beatVersion string) (string, error) { - path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline, false) - if err != nil { - return "", fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err) +// getPipelineIDs returns the Ingest Node pipeline IDs +func (fs *Fileset) getPipelineIDs(beatVersion string) ([]string, error) { + var pipelineIDs []string + for _, ingestPipeline := range fs.manifest.IngestPipeline { + path, err := applyTemplate(fs.vars, ingestPipeline, false) + if err != nil { + return nil, fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err) + } + + pipelineIDs = append(pipelineIDs, formatPipelineID(fs.mcfg.Module, fs.name, path, beatVersion)) } - return formatPipelineID(fs.mcfg.Module, fs.name, path, beatVersion), nil + return pipelineIDs, nil } -// GetPipeline returns the JSON content of the Ingest Node pipeline that parses the logs. -func (fs *Fileset) GetPipeline(esVersion common.Version) (pipelineID string, content map[string]interface{}, err error) { - path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline, false) +// GetPipelines returns the JSON content of the Ingest Node pipeline that parses the logs. +func (fs *Fileset) GetPipelines(esVersion common.Version) (pipelines []pipeline, err error) { + vars, err := fs.turnOffElasticsearchVars(fs.vars, esVersion) if err != nil { - return "", nil, fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err) + return nil, err } - strContents, err := ioutil.ReadFile(filepath.Join(fs.modulePath, fs.name, path)) - if err != nil { - return "", nil, fmt.Errorf("Error reading pipeline file %s: %v", path, err) - } + for idx, ingestPipeline := range fs.manifest.IngestPipeline { + path, err := applyTemplate(fs.vars, ingestPipeline, false) + if err != nil { + return nil, fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err) + } - vars, err := fs.turnOffElasticsearchVars(fs.vars, esVersion) - if err != nil { - return "", nil, err - } + strContents, err := ioutil.ReadFile(filepath.Join(fs.modulePath, fs.name, path)) + if err != nil { + return nil, fmt.Errorf("Error reading pipeline file %s: %v", path, err) + } - jsonString, err := applyTemplate(vars, string(strContents), true) - if err != nil { - return "", nil, fmt.Errorf("Error interpreting the template of the ingest pipeline: %v", err) - } + jsonString, err := applyTemplate(vars, string(strContents), true) + if err != nil { + return nil, fmt.Errorf("Error interpreting the template of the ingest pipeline: %v", err) + } - err = json.Unmarshal([]byte(jsonString), &content) - if err != nil { - return "", nil, fmt.Errorf("Error JSON decoding the pipeline file: %s: %v", path, err) + var content map[string]interface{} + err = json.Unmarshal([]byte(jsonString), &content) + if err != nil { + return nil, fmt.Errorf("Error JSON decoding the pipeline file: %s: %v", path, err) + } + + pipelineID := fs.pipelineIDs[idx] + + p := pipeline{ + pipelineID, + content, + } + pipelines = append(pipelines, p) } - return fs.pipelineID, content, nil + + return pipelines, nil } // formatPipelineID generates the ID to be used for the pipeline ID in Elasticsearch diff --git a/filebeat/fileset/fileset_test.go b/filebeat/fileset/fileset_test.go index 8435532efa7..8e12dc14461 100644 --- a/filebeat/fileset/fileset_test.go +++ b/filebeat/fileset/fileset_test.go @@ -48,7 +48,7 @@ func TestLoadManifestNginx(t *testing.T) { manifest, err := fs.readManifest() assert.NoError(t, err) assert.Equal(t, manifest.ModuleVersion, "1.0") - assert.Equal(t, manifest.IngestPipeline, "ingest/default.json") + assert.Equal(t, manifest.IngestPipeline, []string{"ingest/default.json"}) assert.Equal(t, manifest.Input, "config/nginx-access.yml") vars := manifest.Vars @@ -60,11 +60,14 @@ func TestLoadManifestNginx(t *testing.T) { func TestGetBuiltinVars(t *testing.T) { fs := getModuleForTesting(t, "nginx", "access") - vars, err := fs.getBuiltinVars() + vars, err := fs.getBuiltinVars("6.6.0") assert.NoError(t, err) assert.IsType(t, vars["hostname"], "a-mac-with-esc-key") assert.IsType(t, vars["domain"], "local") + assert.Equal(t, "nginx", vars["module"]) + assert.Equal(t, "access", vars["fileset"]) + assert.Equal(t, "6.6.0", vars["beatVersion"]) } func TestEvaluateVarsNginx(t *testing.T) { @@ -74,7 +77,7 @@ func TestEvaluateVarsNginx(t *testing.T) { fs.manifest, err = fs.readManifest() assert.NoError(t, err) - vars, err := fs.evaluateVars() + vars, err := fs.evaluateVars("6.6.0") assert.NoError(t, err) builtin := vars["builtin"].(map[string]interface{}) @@ -97,7 +100,7 @@ func TestEvaluateVarsNginxOverride(t *testing.T) { fs.manifest, err = fs.readManifest() assert.NoError(t, err) - vars, err := fs.evaluateVars() + vars, err := fs.evaluateVars("6.6.0") assert.NoError(t, err) assert.Equal(t, "no_plugins", vars["pipeline"]) @@ -110,7 +113,7 @@ func TestEvaluateVarsMySQL(t *testing.T) { fs.manifest, err = fs.readManifest() assert.NoError(t, err) - vars, err := fs.evaluateVars() + vars, err := fs.evaluateVars("6.6.0") assert.NoError(t, err) builtin := vars["builtin"].(map[string]interface{}) @@ -144,14 +147,16 @@ func TestResolveVariable(t *testing.T) { { Value: "test-{{.value}}", Vars: map[string]interface{}{ - "value": 2, + "value": 2, + "builtin": map[string]interface{}{}, }, Expected: "test-2", }, { Value: []interface{}{"test-{{.value}}", "test1-{{.value}}"}, Vars: map[string]interface{}{ - "value": 2, + "value": 2, + "builtin": map[string]interface{}{}, }, Expected: []interface{}{"test-2", "test1-2"}, }, @@ -216,11 +221,14 @@ func TestGetPipelineNginx(t *testing.T) { assert.NoError(t, fs.Read("5.2.0")) version := common.MustNewVersion("5.2.0") - pipelineID, content, err := fs.GetPipeline(*version) + pipelines, err := fs.GetPipelines(*version) assert.NoError(t, err) - assert.Equal(t, "filebeat-5.2.0-nginx-access-default", pipelineID) - assert.Contains(t, content, "description") - assert.Contains(t, content, "processors") + assert.Len(t, pipelines, 1) + + pipeline := pipelines[0] + assert.Equal(t, "filebeat-5.2.0-nginx-access-default", pipeline.id) + assert.Contains(t, pipeline.contents, "description") + assert.Contains(t, pipeline.contents, "processors") } func TestGetPipelineConvertTS(t *testing.T) { @@ -251,11 +259,13 @@ func TestGetPipelineConvertTS(t *testing.T) { t.Run(fmt.Sprintf("es=%v", esVersion), func(t *testing.T) { ver := common.MustNewVersion(esVersion) - pipelineID, content, err := fs.GetPipeline(*ver) + pipelines, err := fs.GetPipelines(*ver) require.NoError(t, err) - assert.Equal(t, pipelineName, pipelineID) - marshaled, err := json.Marshal(content) + pipeline := pipelines[0] + assert.Equal(t, pipelineName, pipeline.id) + + marshaled, err := json.Marshal(pipeline.contents) require.NoError(t, err) if cfg.Timezone { assert.Contains(t, string(marshaled), "beat.timezone") diff --git a/filebeat/fileset/modules_integration_test.go b/filebeat/fileset/modules_integration_test.go index 45baabdc354..c806ec2c7b7 100644 --- a/filebeat/fileset/modules_integration_test.go +++ b/filebeat/fileset/modules_integration_test.go @@ -143,3 +143,97 @@ func hasIngest(client *elasticsearch.Client) bool { v := client.GetVersion() return v.Major >= 5 } + +func hasIngestPipelineProcessor(client *elasticsearch.Client) bool { + v := client.GetVersion() + return v.Major > 6 || (v.Major == 6 && v.Minor >= 5) +} + +func TestLoadMultiplePipelines(t *testing.T) { + client := estest.GetTestingElasticsearch(t) + if !hasIngest(client) { + t.Skip("Skip tests because ingest is missing in this elasticsearch version: ", client.GetVersion()) + } + + if !hasIngestPipelineProcessor(client) { + t.Skip("Skip tests because ingest is missing the pipeline processor: ", client.GetVersion()) + } + + client.Request("DELETE", "/_ingest/pipeline/filebeat-6.6.0-foo-multi-pipeline", "", nil, nil) + client.Request("DELETE", "/_ingest/pipeline/filebeat-6.6.0-foo-multi-json_logs", "", nil, nil) + client.Request("DELETE", "/_ingest/pipeline/filebeat-6.6.0-foo-multi-plain_logs", "", nil, nil) + + modulesPath, err := filepath.Abs("../_meta/test/module") + assert.NoError(t, err) + + enabled := true + disabled := false + filesetConfigs := map[string]*FilesetConfig{ + "multi": &FilesetConfig{Enabled: &enabled}, + "multibad": &FilesetConfig{Enabled: &disabled}, + } + configs := []*ModuleConfig{ + &ModuleConfig{"foo", &enabled, filesetConfigs}, + } + + reg, err := newModuleRegistry(modulesPath, configs, nil, "6.6.0") + if err != nil { + t.Fatal(err) + } + + err = reg.LoadPipelines(client, false) + if err != nil { + t.Fatal(err) + } + + status, _, _ := client.Request("GET", "/_ingest/pipeline/filebeat-6.6.0-foo-multi-pipeline", "", nil, nil) + assert.Equal(t, 200, status) + status, _, _ = client.Request("GET", "/_ingest/pipeline/filebeat-6.6.0-foo-multi-json_logs", "", nil, nil) + assert.Equal(t, 200, status) + status, _, _ = client.Request("GET", "/_ingest/pipeline/filebeat-6.6.0-foo-multi-plain_logs", "", nil, nil) + assert.Equal(t, 200, status) +} + +func TestLoadMultiplePipelinesWithRollback(t *testing.T) { + client := estest.GetTestingElasticsearch(t) + if !hasIngest(client) { + t.Skip("Skip tests because ingest is missing in this elasticsearch version: ", client.GetVersion()) + } + + if !hasIngestPipelineProcessor(client) { + t.Skip("Skip tests because ingest is missing the pipeline processor: ", client.GetVersion()) + } + + client.Request("DELETE", "/_ingest/pipeline/filebeat-6.6.0-foo-multibad-pipeline", "", nil, nil) + client.Request("DELETE", "/_ingest/pipeline/filebeat-6.6.0-foo-multibad-json_logs", "", nil, nil) + client.Request("DELETE", "/_ingest/pipeline/filebeat-6.6.0-foo-multibad-plain_logs_bad", "", nil, nil) + + modulesPath, err := filepath.Abs("../_meta/test/module") + assert.NoError(t, err) + + enabled := true + disabled := false + filesetConfigs := map[string]*FilesetConfig{ + "multi": &FilesetConfig{Enabled: &disabled}, + "multibad": &FilesetConfig{Enabled: &enabled}, + } + configs := []*ModuleConfig{ + &ModuleConfig{"foo", &enabled, filesetConfigs}, + } + + reg, err := newModuleRegistry(modulesPath, configs, nil, "6.6.0") + if err != nil { + t.Fatal(err) + } + + err = reg.LoadPipelines(client, false) + assert.NotNil(t, err) + assert.Contains(t, err.Error(), "invalid_processor") + + status, _, _ := client.Request("GET", "/_ingest/pipeline/filebeat-6.6.0-foo-multibad-pipeline", "", nil, nil) + assert.Equal(t, 404, status) + status, _, _ = client.Request("GET", "/_ingest/pipeline/filebeat-6.6.0-foo-multibad-json_logs", "", nil, nil) + assert.Equal(t, 404, status) + status, _, _ = client.Request("GET", "/_ingest/pipeline/filebeat-6.6.0-foo-multibad-plain_logs_bad", "", nil, nil) + assert.Equal(t, 404, status) +} diff --git a/filebeat/fileset/pipelines.go b/filebeat/fileset/pipelines.go index d7f63bdabad..0b6e853ce22 100644 --- a/filebeat/fileset/pipelines.go +++ b/filebeat/fileset/pipelines.go @@ -22,6 +22,8 @@ import ( "fmt" "strings" + "github.com/joeshaw/multierror" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" ) @@ -51,13 +53,33 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool } } - pipelineID, content, err := fileset.GetPipeline(esClient.GetVersion()) + pipelines, err := fileset.GetPipelines(esClient.GetVersion()) if err != nil { return fmt.Errorf("Error getting pipeline for fileset %s/%s: %v", module, name, err) } - err = loadPipeline(esClient, pipelineID, content, overwrite) + + var pipelineIDsLoaded []string + for _, pipeline := range pipelines { + err = loadPipeline(esClient, pipeline.id, pipeline.contents, overwrite) + if err != nil { + err = fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err) + break + } + pipelineIDsLoaded = append(pipelineIDsLoaded, pipeline.id) + } + if err != nil { - return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err) + // Rollback pipelines and return errors + // TODO: Instead of attempting to load all pipelines and then rolling back loaded ones when there's an + // error, validate all pipelines before loading any of them. This requires https://github.com/elastic/elasticsearch/issues/35495. + errs := multierror.Errors{err} + for _, pipelineID := range pipelineIDsLoaded { + err = deletePipeline(esClient, pipelineID) + if err != nil { + errs = append(errs, err) + } + } + return errs.Err() } } } @@ -65,7 +87,7 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool } func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string]interface{}, overwrite bool) error { - path := "/_ingest/pipeline/" + pipelineID + path := makeIngestPipelinePath(pipelineID) if !overwrite { status, _, _ := esClient.Request("GET", path, "", nil, nil) if status == 200 { @@ -81,6 +103,16 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string return nil } +func deletePipeline(esClient PipelineLoader, pipelineID string) error { + path := makeIngestPipelinePath(pipelineID) + _, _, err := esClient.Request("DELETE", path, "", nil, nil) + return err +} + +func makeIngestPipelinePath(pipelineID string) string { + return "/_ingest/pipeline/" + pipelineID +} + func interpretError(initialErr error, body []byte) error { var response struct { Error struct {