Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable Elasticsearch index pattern #2119

Merged
merged 1 commit into from
Aug 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d
==== Breaking changes

*Affecting all Beats*
- Change Elasticsearch output index configuration to be based on format strings. If index has been configured, no date will be appended anymore to the index name. {pull}2119[2119]

*Metricbeat*

Expand Down
2 changes: 1 addition & 1 deletion filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ output.elasticsearch:

# Optional index name. The default index name is set to name of the beat
# in all lowercase.
#index: filebeat
#index: 'filebeat-%{+yyyy.MM.dd}'

# SOCKS5 proxy server URL
#proxy_url: socks5://user:password@socks5-server:2233
Expand Down
2 changes: 1 addition & 1 deletion libbeat/_meta/config.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ output.elasticsearch:

# Optional index name. The default index name is set to name of the beat
# in all lowercase.
#index: beatname
#index: 'beatname-%{+yyyy.MM.dd}'

# SOCKS5 proxy server URL
#proxy_url: socks5://user:password@socks5-server:2233
Expand Down
4 changes: 3 additions & 1 deletion libbeat/outputs/elasticsearch/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -169,7 +170,8 @@ func newTestClient(url string) *Client {
}

func newTestClientAuth(url, user, pass string) *Client {
client, err := NewClient(url, "", nil, nil, user, pass, nil, 60*time.Second, 3, nil)
index := outil.MakeSelector()
client, err := NewClient(url, index, nil, nil, user, pass, nil, 60*time.Second, 3, nil)
if err != nil {
panic(err)
}
Expand Down
31 changes: 16 additions & 15 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/mode"
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/elastic/beats/libbeat/outputs/transport"
)

type Client struct {
Connection
index string
index outil.Selector
params map[string]string

// buffered bulk requests
Expand Down Expand Up @@ -75,7 +76,10 @@ var (
)

func NewClient(
esURL, index string, proxyURL *url.URL, tls *tls.Config,
esURL string,
index outil.Selector,
proxyURL *url.URL,
tls *tls.Config,
username, password string,
params map[string]string,
timeout time.Duration,
Expand Down Expand Up @@ -228,7 +232,7 @@ func (client *Client) PublishEvents(
// successfully added to bulk request.
func bulkEncodePublishRequest(
body bulkWriter,
index string,
index outil.Selector,
events []common.MapStr,
) []common.MapStr {
okEvents := events[:0]
Expand All @@ -245,11 +249,10 @@ func bulkEncodePublishRequest(
return okEvents
}

func eventBulkMeta(index string, event common.MapStr) bulkMeta {
index = getIndex(event, index)
func eventBulkMeta(index outil.Selector, event common.MapStr) bulkMeta {
meta := bulkMeta{
Index: bulkMetaIndex{
Index: index,
Index: getIndex(event, index),
DocType: event["type"].(string),
},
}
Expand All @@ -259,29 +262,27 @@ func eventBulkMeta(index string, event common.MapStr) bulkMeta {
// getIndex returns the full index name
// Index is either defined in the config as part of the output
// or can be overload by the event through setting index
func getIndex(event common.MapStr, index string) string {
func getIndex(event common.MapStr, index outil.Selector) string {

ts := time.Time(event["@timestamp"].(common.Time)).UTC()

// Check for dynamic index
// XXX: is this used/needed?
if _, ok := event["beat"]; ok {
beatMeta, ok := event["beat"].(common.MapStr)
if ok {
// Check if index is set dynamically
if dynamicIndex, ok := beatMeta["index"]; ok {
dynamicIndexValue, ok := dynamicIndex.(string)
if ok {
index = dynamicIndexValue
if dynamicIndexValue, ok := dynamicIndex.(string); ok {
return fmt.Sprintf("%s-%d.%02d.%02d",
dynamicIndexValue, ts.Year(), ts.Month(), ts.Day())
}
}
}
}

// Append timestamp to index
index = fmt.Sprintf("%s-%d.%02d.%02d", index,
ts.Year(), ts.Month(), ts.Day())

return index
str, _ := index.Select(event)
return str
}

// bulkCollectPublishFails checks per item errors returning all events
Expand Down
15 changes: 13 additions & 2 deletions libbeat/outputs/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -128,7 +130,11 @@ func TestGetIndexStandard(t *testing.T) {
"field": 1,
}

index := getIndex(event, "beatname")
pattern := "beatname-%{+yyyy.MM.dd}"
fmtstr := fmtstr.MustCompileEvent(pattern)
indexSel := outil.MakeSelector(outil.FmtSelectorExpr(fmtstr, ""))

index := getIndex(event, indexSel)
assert.Equal(t, index, "beatname-"+extension)
}

Expand All @@ -145,7 +151,12 @@ func TestGetIndexOverwrite(t *testing.T) {
"index": "dynamicindex",
},
}
index := getIndex(event, "beatname")

pattern := "beatname-%%{+yyyy.MM.dd}"
fmtstr := fmtstr.MustCompileEvent(pattern)
indexSel := outil.MakeSelector(outil.FmtSelectorExpr(fmtstr, ""))

index := getIndex(event, indexSel)
assert.Equal(t, index, "dynamicindex-"+extension)
}

Expand Down
1 change: 0 additions & 1 deletion libbeat/outputs/elasticsearch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ type elasticsearchConfig struct {
Username string `config:"username"`
Password string `config:"password"`
ProxyURL string `config:"proxy_url"`
Index string `config:"index"`
LoadBalance bool `config:"loadbalance"`
CompressionLevel int `config:"compression_level" validate:"min=0, max=9"`
TLS *outputs.TLSConfig `config:"tls"`
Expand Down
20 changes: 16 additions & 4 deletions libbeat/outputs/elasticsearch/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ import (
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/mode"
"github.com/elastic/beats/libbeat/outputs/mode/modeutil"
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/elastic/beats/libbeat/paths"
)

type elasticsearchOutput struct {
index string
index outil.Selector
beatName string
mode mode.ConnectionMode
topology
Expand Down Expand Up @@ -57,7 +58,8 @@ func New(beatName string, cfg *common.Config, topologyExpire int) (outputs.Outpu
}

if !cfg.HasField("index") {
cfg.SetString("index", -1, beatName)
pattern := fmt.Sprintf("%v-%%{+yyyy.MM.dd}", beatName)
cfg.SetString("index", -1, pattern)
}

output := &elasticsearchOutput{beatName: beatName}
Expand All @@ -77,6 +79,16 @@ func (out *elasticsearchOutput) init(
return err
}

index, err := outil.BuildSelectorFromConfig(cfg, outil.Settings{
Key: "index",
MultiKey: "indices",
EnableSingleOnly: true,
FailEmpty: true,
})
if err != nil {
return err
}

tlsConfig, err := outputs.LoadTLSConfig(config.TLS)
if err != nil {
return err
Expand All @@ -87,6 +99,7 @@ func (out *elasticsearchOutput) init(
return err
}

out.index = index
clients, err := modeutil.MakeClients(cfg, makeClientFactory(tlsConfig, &config, out))
if err != nil {
return err
Expand All @@ -110,7 +123,6 @@ func (out *elasticsearchOutput) init(
}

out.mode = m
out.index = config.Index

return nil
}
Expand Down Expand Up @@ -241,7 +253,7 @@ func makeClientFactory(
}

return NewClient(
esURL, config.Index, proxyURL, tls,
esURL, out.index, proxyURL, tls,
config.Username, config.Password,
params, config.Timeout,
config.CompressionLevel,
Expand Down
16 changes: 9 additions & 7 deletions libbeat/outputs/elasticsearch/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
var testOptions = outputs.Options{}

func createElasticsearchConnection(flushInterval int, bulkSize int) elasticsearchOutput {
index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid())
index := fmt.Sprintf("packetbeat-int-test-%d", os.Getpid())

esPort, err := strconv.Atoi(GetEsPort())

Expand All @@ -32,7 +32,7 @@ func createElasticsearchConnection(flushInterval int, bulkSize int) elasticsearc
"username": os.Getenv("ES_USER"),
"password": os.Getenv("ES_PASS"),
"path": "",
"index": index,
"index": fmt.Sprintf("%v-%%{+yyyy.MM.dd}", index),
"protocol": "http",
"flush_interval": flushInterval,
"bulk_max_size": bulkSize,
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestOneEvent(t *testing.T) {
output := createElasticsearchConnection(0, 0)

event := common.MapStr{}
event["@timestamp"] = common.Time(time.Now())
event["@timestamp"] = common.Time(ts)
event["type"] = "redis"
event["status"] = "OK"
event["responsetime"] = 34
Expand All @@ -108,7 +108,7 @@ func TestOneEvent(t *testing.T) {
r["request"] = "MGET key1"
r["response"] = "value1"

index := fmt.Sprintf("%s-%d.%02d.%02d", output.index, ts.Year(), ts.Month(), ts.Day())
index, _ := output.index.Select(event)
debugf("index = %s", index)

client := output.randomClient()
Expand Down Expand Up @@ -167,7 +167,7 @@ func TestEvents(t *testing.T) {
output := createElasticsearchConnection(0, 0)

event := common.MapStr{}
event["@timestamp"] = common.Time(time.Now())
event["@timestamp"] = common.Time(ts)
event["type"] = "redis"
event["status"] = "OK"
event["responsetime"] = 34
Expand All @@ -181,7 +181,7 @@ func TestEvents(t *testing.T) {
r["response"] = "value1"
event["redis"] = r

index := fmt.Sprintf("%s-%d.%02d.%02d", output.index, ts.Year(), ts.Month(), ts.Day())
index, _ := output.index.Select(event)
output.randomClient().CreateIndex(index, common.MapStr{
"settings": common.MapStr{
"number_of_shards": 1,
Expand Down Expand Up @@ -233,7 +233,9 @@ func TestEvents(t *testing.T) {

func testBulkWithParams(t *testing.T, output elasticsearchOutput) {
ts := time.Now()
index := fmt.Sprintf("%s-%d.%02d.%02d", output.index, ts.Year(), ts.Month(), ts.Day())
index, _ := output.index.Select(common.MapStr{
"@timestamp": common.Time(ts),
})

output.randomClient().CreateIndex(index, common.MapStr{
"settings": common.MapStr{
Expand Down
13 changes: 9 additions & 4 deletions libbeat/outputs/logstash/logstash_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/common/op"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -68,12 +70,15 @@ func esConnect(t *testing.T, index string) *esConnection {
ts := time.Now().UTC()

host := getElasticsearchHost()
index = fmt.Sprintf("%s-%02d.%02d.%02d",
index, ts.Year(), ts.Month(), ts.Day())
indexFmt := fmtstr.MustCompileEvent(fmt.Sprintf("%s-%%{+yyyy.MM.dd}", index))
indexSel := outil.MakeSelector(outil.FmtSelectorExpr(indexFmt, ""))
index, _ = indexSel.Select(common.MapStr{
"@timestamp": common.Time(ts),
})

username := os.Getenv("ES_USER")
password := os.Getenv("ES_PASS")
client, err := elasticsearch.NewClient(host, "", nil, nil, username, password,
client, err := elasticsearch.NewClient(host, indexSel, nil, nil, username, password,
nil, 60*time.Second, 0, nil)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -142,7 +147,7 @@ func newTestElasticsearchOutput(t *testing.T, test string) *testOutputer {
bulkSize := 0
config, _ := common.NewConfigFrom(map[string]interface{}{
"hosts": []string{getElasticsearchHost()},
"index": index,
"index": connection.index,
"flush_interval": &flushInterval,
"bulk_max_size": &bulkSize,
"username": os.Getenv("ES_USER"),
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/metricbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ output.elasticsearch:

# Optional index name. The default index name is set to name of the beat
# in all lowercase.
#index: metricbeat
#index: 'metricbeat-%{+yyyy.MM.dd}'

# SOCKS5 proxy server URL
#proxy_url: socks5://user:password@socks5-server:2233
Expand Down
2 changes: 1 addition & 1 deletion packetbeat/packetbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ output.elasticsearch:

# Optional index name. The default index name is set to name of the beat
# in all lowercase.
#index: packetbeat
#index: 'packetbeat-%{+yyyy.MM.dd}'

# SOCKS5 proxy server URL
#proxy_url: socks5://user:password@socks5-server:2233
Expand Down
2 changes: 1 addition & 1 deletion winlogbeat/winlogbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ output.elasticsearch:

# Optional index name. The default index name is set to name of the beat
# in all lowercase.
#index: winlogbeat
#index: 'winlogbeat-%{+yyyy.MM.dd}'

# SOCKS5 proxy server URL
#proxy_url: socks5://user:password@socks5-server:2233
Expand Down