Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make selector string casing configurable #18854

Merged
merged 11 commits into from
Jun 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix potential race condition in fingerprint processor. {pull}18738[18738]
- Fixed a service restart failure under Windows. {issue}18914[18914] {pull}18916[18916]
- The `monitoring.elasticsearch.api_key` value is correctly base64-encoded before being sent to the monitoring Elasticsearch cluster. {issue}18939[18939] {pull}18945[18945]
- Fix kafka topic setting not allowing upper case characters. {pull}18854[18854] {issue}18640[18640]
- Fix redis key setting not allowing upper case characters. {pull}18854[18854] {issue}18640[18640]

*Auditbeat*

Expand Down
8 changes: 5 additions & 3 deletions libbeat/idxmgmt/std.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package idxmgmt
import (
"errors"
"fmt"
"strings"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/beat/events"
Expand Down Expand Up @@ -198,6 +199,7 @@ func (s *indexSupport) BuildSelector(cfg *common.Config) (outputs.IndexSelector,
MultiKey: "indices",
EnableSingleOnly: true,
FailEmpty: mode != ilm.ModeEnabled,
Case: outil.SelectorLowerCase,
}

indexSel, err := outil.BuildSelectorFromConfig(selCfg, buildSettings)
Expand Down Expand Up @@ -354,21 +356,21 @@ func getEventCustomIndex(evt *beat.Event, beatInfo beat.Info) string {
}

if alias, err := events.GetMetaStringValue(*evt, events.FieldMetaAlias); err == nil {
return alias
return strings.ToLower(alias)
}

if idx, err := events.GetMetaStringValue(*evt, events.FieldMetaIndex); err == nil {
ts := evt.Timestamp.UTC()
return fmt.Sprintf("%s-%d.%02d.%02d",
idx, ts.Year(), ts.Month(), ts.Day())
strings.ToLower(idx), ts.Year(), ts.Month(), ts.Day())
}

// This is functionally identical to Meta["alias"], returning the overriding
// metadata as the index name if present. It is currently used by Filebeat
// to send the index for particular inputs to formatted string templates,
// which are then expanded by a processor to the "raw_index" field.
if idx, err := events.GetMetaStringValue(*evt, events.FieldMetaRawIndex); err == nil {
return idx
return strings.ToLower(idx)
}

return ""
Expand Down
44 changes: 44 additions & 0 deletions libbeat/idxmgmt/std_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ func TestDefaultSupport_BuildSelector(t *testing.T) {
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
want: stable("test-9.9.9"),
},
"without ilm must be lowercase": {
ilmCalls: noILM,
cfg: map[string]interface{}{"index": "TeSt-%{[agent.version]}"},
want: stable("test-9.9.9"),
},
"event alias without ilm": {
ilmCalls: noILM,
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
Expand All @@ -147,6 +152,14 @@ func TestDefaultSupport_BuildSelector(t *testing.T) {
"alias": "test",
},
},
"event alias without ilm must be lowercae": {
ilmCalls: noILM,
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
want: stable("test"),
meta: common.MapStr{
"alias": "Test",
},
},
"event index without ilm": {
ilmCalls: noILM,
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
Expand All @@ -155,11 +168,24 @@ func TestDefaultSupport_BuildSelector(t *testing.T) {
"index": "test",
},
},
"event index without ilm must be lowercase": {
ilmCalls: noILM,
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
want: dateIdx("test"),
meta: common.MapStr{
"index": "Test",
},
},
"with ilm": {
ilmCalls: ilmTemplateSettings("test-9.9.9", "test-9.9.9"),
cfg: map[string]interface{}{"index": "wrong-%{[agent.version]}"},
want: stable("test-9.9.9"),
},
"with ilm must be lowercase": {
ilmCalls: ilmTemplateSettings("Test-9.9.9", "Test-9.9.9"),
cfg: map[string]interface{}{"index": "wrong-%{[agent.version]}"},
want: stable("test-9.9.9"),
},
"event alias wit ilm": {
ilmCalls: ilmTemplateSettings("test-9.9.9", "test-9.9.9"),
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
Expand All @@ -168,6 +194,14 @@ func TestDefaultSupport_BuildSelector(t *testing.T) {
"alias": "event-alias",
},
},
"event alias wit ilm must be lowercase": {
ilmCalls: ilmTemplateSettings("test-9.9.9", "test-9.9.9"),
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
want: stable("event-alias"),
meta: common.MapStr{
"alias": "Event-alias",
},
},
"event index with ilm": {
ilmCalls: ilmTemplateSettings("test-9.9.9", "test-9.9.9"),
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
Expand All @@ -186,6 +220,16 @@ func TestDefaultSupport_BuildSelector(t *testing.T) {
},
want: stable("myindex"),
},
"use indices settings must be lowercase": {
ilmCalls: ilmTemplateSettings("test-9.9.9", "test-9.9.9"),
cfg: map[string]interface{}{
"index": "test-%{[agent.version]}",
"indices": []map[string]interface{}{
{"index": "MyIndex"},
},
},
want: stable("myindex"),
},
}
for name, test := range cases {
t.Run(name, func(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"net/http"
"strings"
"time"

"go.elastic.co/apm"
Expand Down Expand Up @@ -352,7 +353,7 @@ func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error)
return "", errors.New("pipeline metadata is no string")
}

return pipeline, nil
return strings.ToLower(pipeline), nil
}

if pipelineSel != nil {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/client_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func doClientPing(t *testing.T) {
Headers: map[string]string{headerTestField: headerTestValue},
ProxyDisable: proxyDisable != "",
},
Index: outil.MakeSelector(outil.ConstSelectorExpr("test")),
Index: outil.MakeSelector(outil.ConstSelectorExpr("test", outil.SelectorLowerCase)),
}
if proxy != "" {
proxyURL, err := url.Parse(proxy)
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func TestClientWithHeaders(t *testing.T) {
"X-Test": "testing value",
},
},
Index: outil.MakeSelector(outil.ConstSelectorExpr("test")),
Index: outil.MakeSelector(outil.ConstSelectorExpr("test", outil.SelectorLowerCase)),
}, nil)
assert.NoError(t, err)

Expand Down
17 changes: 11 additions & 6 deletions libbeat/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,7 @@ func buildSelectors(
return index, pipeline, err
}

pipelineSel, err := outil.BuildSelectorFromConfig(cfg, outil.Settings{
Key: "pipeline",
MultiKey: "pipelines",
EnableSingleOnly: true,
FailEmpty: false,
})
pipelineSel, err := buildPipelineSelector(cfg)
if err != nil {
return index, pipeline, err
}
Expand All @@ -149,3 +144,13 @@ func buildSelectors(

return index, pipeline, err
}

func buildPipelineSelector(cfg *common.Config) (outil.Selector, error) {
return outil.BuildSelectorFromConfig(cfg, outil.Settings{
Key: "pipeline",
MultiKey: "pipelines",
EnableSingleOnly: true,
FailEmpty: false,
Case: outil.SelectorLowerCase,
})
}
58 changes: 58 additions & 0 deletions libbeat/outputs/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"testing"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
)

Expand Down Expand Up @@ -73,3 +75,59 @@ func TestGlobalConnectCallbacksManagement(t *testing.T) {
t.Fatalf("third callback cannot be retrieved")
}
}

func TestPipelineSelection(t *testing.T) {
cases := map[string]struct {
cfg map[string]interface{}
event beat.Event
want string
}{
"no pipline configured": {},
"pipeline configured": {
cfg: map[string]interface{}{"pipeline": "test"},
want: "test",
},
"pipeline must be lowercase": {
cfg: map[string]interface{}{"pipeline": "Test"},
want: "test",
},
"pipeline via event meta": {
event: beat.Event{Meta: common.MapStr{"pipeline": "test"}},
want: "test",
},
"pipeline via event meta must be lowercase": {
event: beat.Event{Meta: common.MapStr{"pipeline": "Test"}},
want: "test",
},
"pipelines setting": {
cfg: map[string]interface{}{
"pipelines": []map[string]interface{}{{"pipeline": "test"}},
},
want: "test",
},
"pipelines setting must be lowercase": {
cfg: map[string]interface{}{
"pipelines": []map[string]interface{}{{"pipeline": "Test"}},
},
want: "test",
},
}

for name, test := range cases {
t.Run(name, func(t *testing.T) {
selector, err := buildPipelineSelector(common.MustNewConfigFrom(test.cfg))
if err != nil {
t.Fatalf("Failed to parse configuration: %v", err)
}

got, err := getPipeline(&test.event, &selector)
if err != nil {
t.Fatalf("Failed to create pipeline name: %v", err)
}

if test.want != got {
t.Errorf("Pipeline name missmatch (want: %v, got: %v)", test.want, got)
}
})
}
}
62 changes: 62 additions & 0 deletions libbeat/outputs/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/internal/testutil"
"github.com/elastic/beats/v7/libbeat/logp"
Expand Down Expand Up @@ -131,3 +132,64 @@ func TestBackoffFunc(t *testing.T) {
})
}
}

func TestTopicSelection(t *testing.T) {
cases := map[string]struct {
cfg map[string]interface{}
event beat.Event
want string
}{
"topic configured": {
cfg: map[string]interface{}{"topic": "test"},
want: "test",
},
"topic must keep case": {
cfg: map[string]interface{}{"topic": "Test"},
want: "Test",
},
"topics setting": {
cfg: map[string]interface{}{
"topics": []map[string]interface{}{{"topic": "test"}},
},
want: "test",
},
"topics setting must keep case": {
cfg: map[string]interface{}{
"topics": []map[string]interface{}{{"topic": "Test"}},
},
want: "Test",
},
"use event field": {
cfg: map[string]interface{}{"topic": "test-%{[field]}"},
event: beat.Event{
Fields: common.MapStr{"field": "from-event"},
},
want: "test-from-event",
},
"use event field must keep case": {
cfg: map[string]interface{}{"topic": "Test-%{[field]}"},
event: beat.Event{
Fields: common.MapStr{"field": "From-Event"},
},
want: "Test-From-Event",
},
}

for name, test := range cases {
t.Run(name, func(t *testing.T) {
selector, err := buildTopicSelector(common.MustNewConfigFrom(test.cfg))
if err != nil {
t.Fatalf("Failed to parse configuration: %v", err)
}

got, err := selector.Select(&test.event)
if err != nil {
t.Fatalf("Failed to create topic name: %v", err)
}

if test.want != got {
t.Errorf("Pipeline name missmatch (want: %v, got: %v)", test.want, got)
}
})
}
}
17 changes: 11 additions & 6 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,7 @@ func makeKafka(
return outputs.Fail(err)
}

topic, err := outil.BuildSelectorFromConfig(cfg, outil.Settings{
Key: "topic",
MultiKey: "topics",
EnableSingleOnly: true,
FailEmpty: true,
})
topic, err := buildTopicSelector(cfg)
if err != nil {
return outputs.Fail(err)
}
Expand Down Expand Up @@ -102,3 +97,13 @@ func makeKafka(
}
return outputs.Success(config.BulkMaxSize, retry, client)
}

func buildTopicSelector(cfg *common.Config) (outil.Selector, error) {
return outil.BuildSelectorFromConfig(cfg, outil.Settings{
Key: "topic",
MultiKey: "topics",
EnableSingleOnly: true,
FailEmpty: true,
Case: outil.SelectorKeepCase,
})
}
2 changes: 1 addition & 1 deletion libbeat/outputs/logstash/logstash_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func esConnect(t *testing.T, index string) *esConnection {

host := getElasticsearchHost()
indexFmt := fmtstr.MustCompileEvent(fmt.Sprintf("%s-%%{+yyyy.MM.dd}", index))
indexFmtExpr, _ := outil.FmtSelectorExpr(indexFmt, "")
indexFmtExpr, _ := outil.FmtSelectorExpr(indexFmt, "", outil.SelectorLowerCase)
indexSel := outil.MakeSelector(indexFmtExpr)
index, _ = indexSel.Select(&beat.Event{
Timestamp: ts,
Expand Down
Loading