From 373a9780140456ffac77b72b5099d295b3d19972 Mon Sep 17 00:00:00 2001 From: urso Date: Thu, 28 Jul 2016 14:56:25 +0200 Subject: [PATCH] Elasticsearch index configuration Update elasticsearch index configuration to support format strings to fully configure the index names to be used. The default index is `-%{+yyyy.MM.dd}` basically mimicing the old behavior of `output.elasticsearch.index`. More complex index selection rules can be configured via `output.elasticsearch.indices`. Implementation is based on outil.Select support. --- CHANGELOG.asciidoc | 1 + filebeat/filebeat.full.yml | 2 +- libbeat/_meta/config.full.yml | 2 +- libbeat/outputs/elasticsearch/api_test.go | 4 ++- libbeat/outputs/elasticsearch/client.go | 31 ++++++++++--------- libbeat/outputs/elasticsearch/client_test.go | 15 +++++++-- libbeat/outputs/elasticsearch/config.go | 1 - libbeat/outputs/elasticsearch/output.go | 20 +++++++++--- libbeat/outputs/elasticsearch/output_test.go | 16 +++++----- .../logstash/logstash_integration_test.go | 13 +++++--- metricbeat/metricbeat.full.yml | 2 +- packetbeat/packetbeat.full.yml | 2 +- winlogbeat/winlogbeat.full.yml | 2 +- 13 files changed, 72 insertions(+), 39 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 95fe482d9ad..469e4ffc930 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -13,6 +13,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d ==== Breaking changes *Affecting all Beats* +- Change Elasticsearch output index configuration to be based on format strings. If index has been configured, no date will be appended anymore to the index name. {pull}2119[2119] *Metricbeat* diff --git a/filebeat/filebeat.full.yml b/filebeat/filebeat.full.yml index e95b8faa359..c93739bae2e 100644 --- a/filebeat/filebeat.full.yml +++ b/filebeat/filebeat.full.yml @@ -440,7 +440,7 @@ output.elasticsearch: # Optional index name. The default index name is set to name of the beat # in all lowercase. - #index: filebeat + #index: 'filebeat-%{+yyyy.MM.dd}' # SOCKS5 proxy server URL #proxy_url: socks5://user:password@socks5-server:2233 diff --git a/libbeat/_meta/config.full.yml b/libbeat/_meta/config.full.yml index 08b81e3b46f..2b0b388e3c5 100644 --- a/libbeat/_meta/config.full.yml +++ b/libbeat/_meta/config.full.yml @@ -214,7 +214,7 @@ output.elasticsearch: # Optional index name. The default index name is set to name of the beat # in all lowercase. - #index: beatname + #index: 'beatname-%{+yyyy.MM.dd}' # SOCKS5 proxy server URL #proxy_url: socks5://user:password@socks5-server:2233 diff --git a/libbeat/outputs/elasticsearch/api_test.go b/libbeat/outputs/elasticsearch/api_test.go index 32391379081..c2611028db4 100644 --- a/libbeat/outputs/elasticsearch/api_test.go +++ b/libbeat/outputs/elasticsearch/api_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/elastic/beats/libbeat/outputs/outil" "github.com/stretchr/testify/assert" ) @@ -169,7 +170,8 @@ func newTestClient(url string) *Client { } func newTestClientAuth(url, user, pass string) *Client { - client, err := NewClient(url, "", nil, nil, user, pass, nil, 60*time.Second, 3, nil) + index := outil.MakeSelector() + client, err := NewClient(url, index, nil, nil, user, pass, nil, 60*time.Second, 3, nil) if err != nil { panic(err) } diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 207a886eb7b..860a2ddc85b 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -16,12 +16,13 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs/mode" + "github.com/elastic/beats/libbeat/outputs/outil" "github.com/elastic/beats/libbeat/outputs/transport" ) type Client struct { Connection - index string + index outil.Selector params map[string]string // buffered bulk requests @@ -75,7 +76,10 @@ var ( ) func NewClient( - esURL, index string, proxyURL *url.URL, tls *tls.Config, + esURL string, + index outil.Selector, + proxyURL *url.URL, + tls *tls.Config, username, password string, params map[string]string, timeout time.Duration, @@ -228,7 +232,7 @@ func (client *Client) PublishEvents( // successfully added to bulk request. func bulkEncodePublishRequest( body bulkWriter, - index string, + index outil.Selector, events []common.MapStr, ) []common.MapStr { okEvents := events[:0] @@ -245,11 +249,10 @@ func bulkEncodePublishRequest( return okEvents } -func eventBulkMeta(index string, event common.MapStr) bulkMeta { - index = getIndex(event, index) +func eventBulkMeta(index outil.Selector, event common.MapStr) bulkMeta { meta := bulkMeta{ Index: bulkMetaIndex{ - Index: index, + Index: getIndex(event, index), DocType: event["type"].(string), }, } @@ -259,29 +262,27 @@ func eventBulkMeta(index string, event common.MapStr) bulkMeta { // getIndex returns the full index name // Index is either defined in the config as part of the output // or can be overload by the event through setting index -func getIndex(event common.MapStr, index string) string { +func getIndex(event common.MapStr, index outil.Selector) string { ts := time.Time(event["@timestamp"].(common.Time)).UTC() // Check for dynamic index + // XXX: is this used/needed? if _, ok := event["beat"]; ok { beatMeta, ok := event["beat"].(common.MapStr) if ok { // Check if index is set dynamically if dynamicIndex, ok := beatMeta["index"]; ok { - dynamicIndexValue, ok := dynamicIndex.(string) - if ok { - index = dynamicIndexValue + if dynamicIndexValue, ok := dynamicIndex.(string); ok { + return fmt.Sprintf("%s-%d.%02d.%02d", + dynamicIndexValue, ts.Year(), ts.Month(), ts.Day()) } } } } - // Append timestamp to index - index = fmt.Sprintf("%s-%d.%02d.%02d", index, - ts.Year(), ts.Month(), ts.Day()) - - return index + str, _ := index.Select(event) + return str } // bulkCollectPublishFails checks per item errors returning all events diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 3e6a776c655..a88af0cd8ee 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -9,6 +9,8 @@ import ( "time" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/fmtstr" + "github.com/elastic/beats/libbeat/outputs/outil" "github.com/stretchr/testify/assert" ) @@ -128,7 +130,11 @@ func TestGetIndexStandard(t *testing.T) { "field": 1, } - index := getIndex(event, "beatname") + pattern := "beatname-%{+yyyy.MM.dd}" + fmtstr := fmtstr.MustCompileEvent(pattern) + indexSel := outil.MakeSelector(outil.FmtSelectorExpr(fmtstr, "")) + + index := getIndex(event, indexSel) assert.Equal(t, index, "beatname-"+extension) } @@ -145,7 +151,12 @@ func TestGetIndexOverwrite(t *testing.T) { "index": "dynamicindex", }, } - index := getIndex(event, "beatname") + + pattern := "beatname-%%{+yyyy.MM.dd}" + fmtstr := fmtstr.MustCompileEvent(pattern) + indexSel := outil.MakeSelector(outil.FmtSelectorExpr(fmtstr, "")) + + index := getIndex(event, indexSel) assert.Equal(t, index, "dynamicindex-"+extension) } diff --git a/libbeat/outputs/elasticsearch/config.go b/libbeat/outputs/elasticsearch/config.go index 7cf18de3fcc..060a507e577 100644 --- a/libbeat/outputs/elasticsearch/config.go +++ b/libbeat/outputs/elasticsearch/config.go @@ -13,7 +13,6 @@ type elasticsearchConfig struct { Username string `config:"username"` Password string `config:"password"` ProxyURL string `config:"proxy_url"` - Index string `config:"index"` LoadBalance bool `config:"loadbalance"` CompressionLevel int `config:"compression_level" validate:"min=0, max=9"` TLS *outputs.TLSConfig `config:"tls"` diff --git a/libbeat/outputs/elasticsearch/output.go b/libbeat/outputs/elasticsearch/output.go index 3f98fd02102..f7302912597 100644 --- a/libbeat/outputs/elasticsearch/output.go +++ b/libbeat/outputs/elasticsearch/output.go @@ -17,11 +17,12 @@ import ( "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/outputs/mode" "github.com/elastic/beats/libbeat/outputs/mode/modeutil" + "github.com/elastic/beats/libbeat/outputs/outil" "github.com/elastic/beats/libbeat/paths" ) type elasticsearchOutput struct { - index string + index outil.Selector beatName string mode mode.ConnectionMode topology @@ -57,7 +58,8 @@ func New(beatName string, cfg *common.Config, topologyExpire int) (outputs.Outpu } if !cfg.HasField("index") { - cfg.SetString("index", -1, beatName) + pattern := fmt.Sprintf("%v-%%{+yyyy.MM.dd}", beatName) + cfg.SetString("index", -1, pattern) } output := &elasticsearchOutput{beatName: beatName} @@ -77,6 +79,16 @@ func (out *elasticsearchOutput) init( return err } + index, err := outil.BuildSelectorFromConfig(cfg, outil.Settings{ + Key: "index", + MultiKey: "indices", + EnableSingleOnly: true, + FailEmpty: true, + }) + if err != nil { + return err + } + tlsConfig, err := outputs.LoadTLSConfig(config.TLS) if err != nil { return err @@ -87,6 +99,7 @@ func (out *elasticsearchOutput) init( return err } + out.index = index clients, err := modeutil.MakeClients(cfg, makeClientFactory(tlsConfig, &config, out)) if err != nil { return err @@ -110,7 +123,6 @@ func (out *elasticsearchOutput) init( } out.mode = m - out.index = config.Index return nil } @@ -241,7 +253,7 @@ func makeClientFactory( } return NewClient( - esURL, config.Index, proxyURL, tls, + esURL, out.index, proxyURL, tls, config.Username, config.Password, params, config.Timeout, config.CompressionLevel, diff --git a/libbeat/outputs/elasticsearch/output_test.go b/libbeat/outputs/elasticsearch/output_test.go index 3eb32418b68..088df463a72 100644 --- a/libbeat/outputs/elasticsearch/output_test.go +++ b/libbeat/outputs/elasticsearch/output_test.go @@ -17,7 +17,7 @@ import ( var testOptions = outputs.Options{} func createElasticsearchConnection(flushInterval int, bulkSize int) elasticsearchOutput { - index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid()) + index := fmt.Sprintf("packetbeat-int-test-%d", os.Getpid()) esPort, err := strconv.Atoi(GetEsPort()) @@ -32,7 +32,7 @@ func createElasticsearchConnection(flushInterval int, bulkSize int) elasticsearc "username": os.Getenv("ES_USER"), "password": os.Getenv("ES_PASS"), "path": "", - "index": index, + "index": fmt.Sprintf("%v-%%{+yyyy.MM.dd}", index), "protocol": "http", "flush_interval": flushInterval, "bulk_max_size": bulkSize, @@ -95,7 +95,7 @@ func TestOneEvent(t *testing.T) { output := createElasticsearchConnection(0, 0) event := common.MapStr{} - event["@timestamp"] = common.Time(time.Now()) + event["@timestamp"] = common.Time(ts) event["type"] = "redis" event["status"] = "OK" event["responsetime"] = 34 @@ -108,7 +108,7 @@ func TestOneEvent(t *testing.T) { r["request"] = "MGET key1" r["response"] = "value1" - index := fmt.Sprintf("%s-%d.%02d.%02d", output.index, ts.Year(), ts.Month(), ts.Day()) + index, _ := output.index.Select(event) debugf("index = %s", index) client := output.randomClient() @@ -167,7 +167,7 @@ func TestEvents(t *testing.T) { output := createElasticsearchConnection(0, 0) event := common.MapStr{} - event["@timestamp"] = common.Time(time.Now()) + event["@timestamp"] = common.Time(ts) event["type"] = "redis" event["status"] = "OK" event["responsetime"] = 34 @@ -181,7 +181,7 @@ func TestEvents(t *testing.T) { r["response"] = "value1" event["redis"] = r - index := fmt.Sprintf("%s-%d.%02d.%02d", output.index, ts.Year(), ts.Month(), ts.Day()) + index, _ := output.index.Select(event) output.randomClient().CreateIndex(index, common.MapStr{ "settings": common.MapStr{ "number_of_shards": 1, @@ -233,7 +233,9 @@ func TestEvents(t *testing.T) { func testBulkWithParams(t *testing.T, output elasticsearchOutput) { ts := time.Now() - index := fmt.Sprintf("%s-%d.%02d.%02d", output.index, ts.Year(), ts.Month(), ts.Day()) + index, _ := output.index.Select(common.MapStr{ + "@timestamp": common.Time(ts), + }) output.randomClient().CreateIndex(index, common.MapStr{ "settings": common.MapStr{ diff --git a/libbeat/outputs/logstash/logstash_integration_test.go b/libbeat/outputs/logstash/logstash_integration_test.go index 30825d39549..9a6ac648143 100644 --- a/libbeat/outputs/logstash/logstash_integration_test.go +++ b/libbeat/outputs/logstash/logstash_integration_test.go @@ -10,9 +10,11 @@ import ( "time" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/fmtstr" "github.com/elastic/beats/libbeat/common/op" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/outputs/elasticsearch" + "github.com/elastic/beats/libbeat/outputs/outil" "github.com/stretchr/testify/assert" ) @@ -68,12 +70,15 @@ func esConnect(t *testing.T, index string) *esConnection { ts := time.Now().UTC() host := getElasticsearchHost() - index = fmt.Sprintf("%s-%02d.%02d.%02d", - index, ts.Year(), ts.Month(), ts.Day()) + indexFmt := fmtstr.MustCompileEvent(fmt.Sprintf("%s-%%{+yyyy.MM.dd}", index)) + indexSel := outil.MakeSelector(outil.FmtSelectorExpr(indexFmt, "")) + index, _ = indexSel.Select(common.MapStr{ + "@timestamp": common.Time(ts), + }) username := os.Getenv("ES_USER") password := os.Getenv("ES_PASS") - client, err := elasticsearch.NewClient(host, "", nil, nil, username, password, + client, err := elasticsearch.NewClient(host, indexSel, nil, nil, username, password, nil, 60*time.Second, 0, nil) if err != nil { t.Fatal(err) @@ -142,7 +147,7 @@ func newTestElasticsearchOutput(t *testing.T, test string) *testOutputer { bulkSize := 0 config, _ := common.NewConfigFrom(map[string]interface{}{ "hosts": []string{getElasticsearchHost()}, - "index": index, + "index": connection.index, "flush_interval": &flushInterval, "bulk_max_size": &bulkSize, "username": os.Getenv("ES_USER"), diff --git a/metricbeat/metricbeat.full.yml b/metricbeat/metricbeat.full.yml index bccd7564124..14199d1a1b1 100644 --- a/metricbeat/metricbeat.full.yml +++ b/metricbeat/metricbeat.full.yml @@ -362,7 +362,7 @@ output.elasticsearch: # Optional index name. The default index name is set to name of the beat # in all lowercase. - #index: metricbeat + #index: 'metricbeat-%{+yyyy.MM.dd}' # SOCKS5 proxy server URL #proxy_url: socks5://user:password@socks5-server:2233 diff --git a/packetbeat/packetbeat.full.yml b/packetbeat/packetbeat.full.yml index 22c0ce8207a..eb18c8e2c5f 100644 --- a/packetbeat/packetbeat.full.yml +++ b/packetbeat/packetbeat.full.yml @@ -632,7 +632,7 @@ output.elasticsearch: # Optional index name. The default index name is set to name of the beat # in all lowercase. - #index: packetbeat + #index: 'packetbeat-%{+yyyy.MM.dd}' # SOCKS5 proxy server URL #proxy_url: socks5://user:password@socks5-server:2233 diff --git a/winlogbeat/winlogbeat.full.yml b/winlogbeat/winlogbeat.full.yml index 96ad4592aff..c17c364ceeb 100644 --- a/winlogbeat/winlogbeat.full.yml +++ b/winlogbeat/winlogbeat.full.yml @@ -249,7 +249,7 @@ output.elasticsearch: # Optional index name. The default index name is set to name of the beat # in all lowercase. - #index: winlogbeat + #index: 'winlogbeat-%{+yyyy.MM.dd}' # SOCKS5 proxy server URL #proxy_url: socks5://user:password@socks5-server:2233