Skip to content

Commit

Permalink
Index cluster_uuid in elasticsearch/node metricset (#8771)
Browse files Browse the repository at this point in the history
This PR teaches the `elasticsearch/node` metricset to index the Elasticsearch `cluster_uuid` as the module-level `cluster.id` field.
  • Loading branch information
ycombinator authored Nov 22, 2018
1 parent 19fb6e5 commit 9bd2499
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 13 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha1...master[Check the HEAD d

*Metricbeat*

- The `elasticsearch/node` metricset now reports the Elasticsearch cluster UUID. {pull}8771[8771]

*Packetbeat*

*Functionbeat*
Expand Down
11 changes: 6 additions & 5 deletions metricbeat/module/elasticsearch/node/_meta/data.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
},
"elasticsearch": {
"cluster": {
"id": "91RpCx2xSQ21pVPTZfDK0Q",
"name": "elasticsearch"
},
"node": {
Expand All @@ -16,25 +17,25 @@
"bytes": 1073741824
},
"max": {
"bytes": 1038876672
"bytes": 1037959168
}
},
"nonheap": {
"init": {
"bytes": 2555904
"bytes": 7667712
},
"max": {
"bytes": 0
}
}
},
"version": "1.8.0_144"
"version": "11.0.1"
},
"name": "523zXyT6TRWiqXcQItnkyQ",
"name": "DSiWcTyeThWtUXLB9J0BMw",
"process": {
"mlockall": false
},
"version": "6.3.0"
"version": "7.0.0-alpha1"
}
},
"metricset": {
Expand Down
3 changes: 2 additions & 1 deletion metricbeat/module/elasticsearch/node/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var (
}
)

func eventsMapping(r mb.ReporterV2, content []byte) error {
func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) error {
nodesStruct := struct {
ClusterName string `json:"cluster_name"`
Nodes map[string]map[string]interface{} `json:"nodes"`
Expand All @@ -83,6 +83,7 @@ func eventsMapping(r mb.ReporterV2, content []byte) error {

event.ModuleFields = common.MapStr{}
event.ModuleFields.Put("cluster.name", nodesStruct.ClusterName)
event.ModuleFields.Put("cluster.id", info.ClusterID)

event.MetricSetFields, err = schema.Apply(node)
if err != nil {
Expand Down
9 changes: 7 additions & 2 deletions metricbeat/module/elasticsearch/node/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@ import (
"github.com/elastic/beats/metricbeat/module/elasticsearch"
)

var info = elasticsearch.Info{
ClusterID: "1234",
ClusterName: "helloworld",
}

func TestGetMappings(t *testing.T) {
elasticsearch.TestMapper(t, "./_meta/test/node.*.json", eventsMapping)
elasticsearch.TestMapperWithInfo(t, "./_meta/test/node.*.json", eventsMapping)
}

func TestInvalid(t *testing.T) {
Expand All @@ -41,6 +46,6 @@ func TestInvalid(t *testing.T) {
assert.NoError(t, err)

reporter := &mbtest.CapturingReporterV2{}
err = eventsMapping(reporter, content)
err = eventsMapping(reporter, info, content)
assert.Error(t, err)
}
11 changes: 10 additions & 1 deletion metricbeat/module/elasticsearch/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package node

import (
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/metricbeat/helper/elastic"
"github.com/elastic/beats/metricbeat/mb"
Expand Down Expand Up @@ -72,7 +74,14 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {
return
}

err = eventsMapping(r, content)
info, err := elasticsearch.GetInfo(m.HTTP, m.HostData().SanitizedURI+nodeStatsPath)
if err != nil {
err = errors.Wrap(err, "failed to get info from Elasticsearch")
elastic.ReportAndLogError(err, r, m.Log)
return
}

err = eventsMapping(r, *info, content)
if err != nil {
elastic.ReportAndLogError(err, r, m.Log)
return
Expand Down
20 changes: 16 additions & 4 deletions metricbeat/module/elasticsearch/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,22 @@ func TestFetch(t *testing.T) {
assert.NoError(t, err)

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json;")
w.Write([]byte(response))
assert.Equal(t, "/_nodes/_local", r.RequestURI)
switch r.RequestURI {
case "/_nodes/_local":
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json;")
w.Write([]byte(response))

case "/":
rootResponse := "{\"cluster_name\":\"es1\",\"cluster_uuid\":\"4heb1eiady103dxu71\",\"version\":{\"number\":\"7.0.0\"}}"
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(rootResponse))

default:
t.FailNow()
}

}))
defer server.Close()

Expand Down

0 comments on commit 9bd2499

Please sign in to comment.