Skip to content

Commit

Permalink
backwards compatibility for set processor
Browse files Browse the repository at this point in the history
- "ignore_empty_value" option for the set processor only works on
elasticsearch >= 7.9.0.  This change removes that option and replaces
it with an if statement if pipeline is loaded on an earlier version of
elasticsearch.
  • Loading branch information
leehinman committed Sep 1, 2020
1 parent 0712468 commit 84cc638
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix long registry migration times. {pull}20717[20717] {issue}20705[20705]
- Fix event types and categories in auditd module to comply with ECS {pull}20652[20652]
- Update documentation in the azure module filebeat. {pull}20815[20815]
- provide backwards compatibility for set processor and elasticsearch less than 7.9.0 {pull}20908[20908]

*Heartbeat*

Expand Down
49 changes: 49 additions & 0 deletions filebeat/fileset/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string
return fmt.Errorf("failed to adapt pipeline for ECS compatibility: %v", err)
}

err = modifySetProcessor(esClient.GetVersion(), pipelineID, content)
if err != nil {
return fmt.Errorf("failed to modify set processor in pipeline: %v", err)
}

body, err := esClient.LoadJSON(path, content)
if err != nil {
return interpretError(err, body)
Expand Down Expand Up @@ -232,3 +237,47 @@ func interpretError(initialErr error, body []byte) error {

return fmt.Errorf("couldn't load pipeline: %v. Response body: %s", initialErr, body)
}

// modifySetProcessor replaces ignore_empty_value option with an if statement
// so ES less than 7.9 will still work
func modifySetProcessor(esVersion common.Version, pipelineID string, content map[string]interface{}) error {
flagVersion := common.MustNewVersion("7.9.0")
if !esVersion.LessThan(flagVersion) {
return nil
}

p, ok := content["processors"]
if !ok {
return nil
}
processors, ok := p.([]interface{})
if !ok {
return fmt.Errorf("'processors' in pipeline '%s' expected to be a list, found %T", pipelineID, p)
}

for _, p := range processors {
processor, ok := p.(map[string]interface{})
if !ok {
continue
}
if options, ok := processor["set"].(map[string]interface{}); ok {
iev, ok := options["ignore_empty_value"].(bool)
if !ok || !iev {
continue
}
val, ok := options["value"].(string)
if !ok {
continue
}
newIf := strings.ReplaceAll(val, "{", "")
newIf = strings.ReplaceAll(newIf, "}", "")
newIf = strings.TrimSpace(newIf)
newIf = strings.ReplaceAll(newIf, ".", "?.")
newIf = "ctx?." + newIf + " != null"
logp.Debug("modules", "in pipeline %s replacing unsupported 'ignore_empty_value' with if %s in set processor", pipelineID, newIf)
delete(options, "ignore_empty_value")
options["if"] = newIf
}
}
return nil
}
103 changes: 103 additions & 0 deletions filebeat/fileset/pipelines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,106 @@ func TestSetEcsProcessors(t *testing.T) {
})
}
}

func TestModifySetProcessor(t *testing.T) {
cases := []struct {
name string
esVersion *common.Version
content map[string]interface{}
expected map[string]interface{}
isErrExpected bool
}{
{
name: "ES < 7.9.0",
esVersion: common.MustNewVersion("7.8.0"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"ignore_empty_value": true,
},
},
}},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"if": "ctx?.panw?.panos?.ruleset != null",
},
},
},
},
isErrExpected: false,
},
{
name: "ES == 7.9.0",
esVersion: common.MustNewVersion("7.9.0"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"ignore_empty_value": true,
},
},
}},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"ignore_empty_value": true,
},
},
},
},
isErrExpected: false,
},
{
name: "ES > 7.9.0",
esVersion: common.MustNewVersion("8.0.0"),
content: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"ignore_empty_value": true,
},
},
}},
expected: map[string]interface{}{
"processors": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"field": "rule.name",
"value": "{{panw.panos.ruleset}}",
"ignore_empty_value": true,
},
},
},
},
isErrExpected: false,
},
}

for _, test := range cases {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
err := modifySetProcessor(*test.esVersion, "foo-pipeline", test.content)
if test.isErrExpected {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, test.expected, test.content)
}
})
}
}

0 comments on commit 84cc638

Please sign in to comment.