Skip to content

Commit

Permalink
Merge pull request elastic#231 from urso/fix/logstash-no-index-but-beat
Browse files Browse the repository at this point in the history
send @metadata.beat to Logstash
  • Loading branch information
ruflin committed Oct 29, 2015
2 parents b78f6ad + aaace57 commit fe153be
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 18 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ All notable changes to this project will be documented in this file based on the
- Add Console output plugin. #218

### Improvements
- Send @metadata.beat to Logstash instead of @metadata.index to prevent
possible name clashes and give user full control over index name used for
Elasticsearch

### Deprecated

Expand Down
22 changes: 12 additions & 10 deletions docs/configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -383,17 +383,18 @@ Every event send to logstash contains additional meta data for indexing and filt
{
...
"@metadata": {
"index": "<beat>-<date>",
"beat": "<beat>",
"type": "<event type>"
}
}
------------------------------------------------------------------------------

In Logstash the elasticsearch output plugin can be configured to use the
In Logstash the Elasticsearch Output Plugin can be configured to use the
metadata and event type for indexing.

This logstash configuration file for logstash 1.5 will use the index and
document type reported by beats for indexing events right into elasticsearch.
This Logstash configuration file for logstash 1.5 will use the beat name and
document type reported by beats for indexing events right into Elasticsearch.
The index used will depend on `@timestamp` field as identified by Logstash.
[source,logstash]
------------------------------------------------------------------------------
Expand All @@ -408,13 +409,13 @@ output {
host => "localhost"
port => "9200"
protocol => "http"
index => "%{[@metadata][index]}"
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}
------------------------------------------------------------------------------

See the same configuration for logstash 2.x releases:
See the same configuration for Logstash 2.x releases:
[source,logstash]
------------------------------------------------------------------------------
Expand All @@ -427,14 +428,14 @@ input {
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "%{[@metadata][index]}"
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}
------------------------------------------------------------------------------

Events indexed into elasticsearch with shown logstash configuration will be
similar to events directly indexed by beats into elasticsearch.
Events indexed into Elasticsearch with shown Logstash configuration will be
similar to events directly indexed by beats into Elasticsearch.


Example beat configuration:
Expand All @@ -446,7 +447,8 @@ output:
hosts: ["localhost:5044"]
# configure index prefix name
# index configures '@metadata.beat' field to be used by Logstash for
# indexing. By Default the beat name is used (e.g. filebeat, topbeat, packetbeat)
index: mybeat
tls:
Expand Down
2 changes: 1 addition & 1 deletion docs/gettingstarted.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ output {
port => "9200"
sniffing => true
manage_template => false
index => "%{[@metadata][index]}"
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}
}
Expand Down
8 changes: 3 additions & 5 deletions outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,9 @@ func (lj *logstash) BulkPublish(
// decode/rename the "line" field into "message".
func (lj *logstash) addMeta(event common.MapStr) {
// add metadata for indexing
ts := time.Time(event["timestamp"].(common.Time)).UTC()
index := fmt.Sprintf("%s-%02d.%02d.%02d", lj.index,
ts.Year(), ts.Month(), ts.Day())
event["@metadata"] = common.MapStr{
"index": index,
"type": event["type"].(string),
"beat": lj.index,
"type": event["type"].(string),
}
fmt.Printf("meta data: %v\n", event["@metadata"])
}
13 changes: 13 additions & 0 deletions outputs/logstash/logstash_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,17 @@ type testOutputer struct {
*esConnection
}

type esSoure interface {
RefreshIndex()
}

type esValueReader interface {
esSoure
Read() ([]map[string]interface{}, error)
}

type esCountReader interface {
esSoure
Count() (int, error)
}

Expand Down Expand Up @@ -171,6 +177,8 @@ func (es *esConnection) Cleanup() {
}

func (es *esConnection) Read() ([]map[string]interface{}, error) {
fmt.Printf("try to read from index: %v\n", es.index)

_, err := es.Refresh(es.index)
if err != nil {
es.t.Errorf("Failed to refresh: %s", err)
Expand All @@ -191,6 +199,10 @@ func (es *esConnection) Read() ([]map[string]interface{}, error) {
return hits, err
}

func (es *esConnection) RefreshIndex() {
es.Refresh(es.index)
}

func (es *esConnection) Count() (int, error) {
_, err := es.Refresh(es.index)
if err != nil {
Expand Down Expand Up @@ -220,6 +232,7 @@ func waitUntilTrue(duration time.Duration, fn func() bool) bool {

func checkIndex(reader esCountReader, minValues int) func() bool {
return func() bool {
reader.RefreshIndex()
resp, err := reader.Count()
return err != nil || resp >= minValues
}
Expand Down
2 changes: 1 addition & 1 deletion scripts/docker/logstash/logstash.conf.1.5.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ output {
protocol => "http"
host => ""
port => "9200"
index => "%{[@metadata][index]}"
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}

Expand Down
2 changes: 1 addition & 1 deletion scripts/docker/logstash/logstash.conf.2.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ input {
output {
elasticsearch {
hosts => []
index => "%{[@metadata][index]}"
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
document_type => "%{[@metadata][type]}"
}

Expand Down

0 comments on commit fe153be

Please sign in to comment.