Skip to content

Commit

Permalink
Index cluster.id, node.id, and node.name in elasticsearch/node_stats …
Browse files Browse the repository at this point in the history
…metricset (#9168)

This PR teaches the `elasticsearch/node_stats` metricset to index the Elasticsearch `cluster_uuid` as the module-level `cluster.id` field, as well as the node ID and node `name` fields as the metricset-level `node.id` and `node.name` fields, respectively.
  • Loading branch information
ycombinator authored Nov 27, 2018
1 parent 0edd07a commit 9410148
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 59 deletions.
30 changes: 20 additions & 10 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -5389,6 +5389,26 @@ type: keyword
Elasticsearch state id.
--
*`elasticsearch.node.id`*::
+
--
type: keyword
Node ID
--
*`elasticsearch.node.name`*::
+
--
type: keyword
Node name.
--
[float]
Expand Down Expand Up @@ -5928,16 +5948,6 @@ node
*`elasticsearch.node.name`*::
+
--
type: keyword
Node name.
--
*`elasticsearch.node.version`*::
+
--
Expand Down
10 changes: 10 additions & 0 deletions metricbeat/module/elasticsearch/_meta/fields.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,13 @@
type: keyword
description: >
Elasticsearch state id.
- name: node.id
type: keyword
description: >
Node ID
- name: node.name
type: keyword
description: >
Node name.
2 changes: 1 addition & 1 deletion metricbeat/module/elasticsearch/fields.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions metricbeat/module/elasticsearch/node/_meta/fields.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@
node
release: beta
fields:
- name: name
type: keyword
description: >
Node name.
- name: version
type: keyword
description: >
Expand Down
56 changes: 29 additions & 27 deletions metricbeat/module/elasticsearch/node_stats/_meta/data.json
Original file line number Diff line number Diff line change
@@ -1,43 +1,45 @@
{
"@timestamp": "2017-10-12T08:05:34.853Z",
"beat": {
"agent": {
"hostname": "host.example.com",
"name": "host.example.com"
},
"elasticsearch": {
"cluster": {
"name": "elasticsearch"
"id": "3LbUkLkURz--FR-YO0wLNA",
"name": "es1"
},
"node": {
"name": "523zXyT6TRWiqXcQItnkyQ",
"id": "FMRmkE3HTU6xxxoFK-06Ww",
"name": "es1_1",
"stats": {
"fs": {
"summary": {
"available": {
"bytes": 47081816064
"bytes": 350828584960
},
"free": {
"bytes": 52054323200
"bytes": 354770468864
},
"total": {
"bytes": 250790436864
"bytes": 499963170816
}
}
},
"indices": {
"docs": {
"count": 231,
"deleted": 56
"count": 30880,
"deleted": 124
},
"segments": {
"count": 16,
"count": 39,
"memory": {
"bytes": 105245
"bytes": 300797
}
},
"store": {
"size": {
"bytes": 444882
"bytes": 15205991
}
}
},
Expand All @@ -46,14 +48,14 @@
"collectors": {
"old": {
"collection": {
"count": 2,
"ms": 139
"count": 3,
"ms": 219
}
},
"young": {
"collection": {
"count": 56,
"ms": 1288
"count": 505,
"ms": 2439
}
}
}
Expand All @@ -62,44 +64,44 @@
"pools": {
"old": {
"max": {
"bytes": 724828160
"bytes": 715849728
},
"peak": {
"bytes": 225180248
"bytes": 543519960
},
"peak_max": {
"bytes": 724828160
"bytes": 715849728
},
"used": {
"bytes": 225180248
"bytes": 382281744
}
},
"survivor": {
"max": {
"bytes": 34865152
"bytes": 35782656
},
"peak": {
"bytes": 34865152
"bytes": 35782656
},
"peak_max": {
"bytes": 34865152
"bytes": 35782656
},
"used": {
"bytes": 5280456
"bytes": 6418816
}
},
"young": {
"max": {
"bytes": 279183360
"bytes": 286326784
},
"peak": {
"bytes": 279183360
"bytes": 286326784
},
"peak_max": {
"bytes": 279183360
"bytes": 286326784
},
"used": {
"bytes": 188134984
"bytes": 118870448
}
}
}
Expand Down
35 changes: 29 additions & 6 deletions metricbeat/module/elasticsearch/node_stats/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package node_stats

import (
"encoding/json"
"fmt"

"github.com/elastic/beats/metricbeat/helper/elastic"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"
Expand All @@ -32,6 +35,7 @@ import (

var (
schema = s.Schema{
"name": c.Str("name"),
"jvm": c.Dict("jvm", s.Schema{
"mem": c.Dict("mem", s.Schema{
"pools": c.Dict("pools", s.Schema{
Expand Down Expand Up @@ -103,11 +107,10 @@ var (
)

type nodesStruct struct {
ClusterName string `json:"cluster_name"`
Nodes map[string]map[string]interface{} `json:"nodes"`
Nodes map[string]map[string]interface{} `json:"nodes"`
}

func eventsMapping(r mb.ReporterV2, content []byte) error {
func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) error {

nodeData := &nodesStruct{}
err := json.Unmarshal(content, nodeData)
Expand All @@ -118,18 +121,19 @@ func eventsMapping(r mb.ReporterV2, content []byte) error {
}

var errs multierror.Errors
for name, node := range nodeData.Nodes {
for id, node := range nodeData.Nodes {
event := mb.Event{}

event.RootFields = common.MapStr{}
event.RootFields.Put("service.name", elasticsearch.ModuleName)

event.ModuleFields = common.MapStr{
"node": common.MapStr{
"name": name,
"id": id,
},
"cluster": common.MapStr{
"name": nodeData.ClusterName,
"name": info.ClusterName,
"id": info.ClusterID,
},
}

Expand All @@ -138,7 +142,26 @@ func eventsMapping(r mb.ReporterV2, content []byte) error {
event.Error = errors.Wrap(err, "failure to apply node schema")
r.Event(event)
errs = append(errs, event.Error)
continue
}

name, err := event.MetricSetFields.GetValue("name")
if err != nil {
event.Error = elastic.MakeErrorForMissingField("name", elastic.Elasticsearch)
r.Event(event)
errs = append(errs, event.Error)
continue
}

nameStr, ok := name.(string)
if !ok {
event.Error = fmt.Errorf("name is not a string")
r.Event(event)
errs = append(errs, event.Error)
continue
}
event.ModuleFields.Put("node.name", nameStr)
event.MetricSetFields.Delete("name")

r.Event(event)
}
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/elasticsearch/node_stats/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ import (
)

func TestStats(t *testing.T) {
elasticsearch.TestMapper(t, "./_meta/test/node_stats.*.json", eventsMapping)
elasticsearch.TestMapperWithInfo(t, "./_meta/test/node_stats.*.json", eventsMapping)
}
10 changes: 2 additions & 8 deletions metricbeat/module/elasticsearch/node_stats/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ var (
}
)

func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error {
func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, content []byte) error {
nodesStruct := struct {
ClusterName string `json:"cluster_name"`
Nodes map[string]map[string]interface{} `json:"nodes"`
Expand All @@ -181,12 +181,6 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error {
// of ES and it's not know if the request will be routed to the same node as before.
var errs multierror.Errors
for nodeID, node := range nodesStruct.Nodes {
clusterID, err := elasticsearch.GetClusterID(m.HTTP, m.HTTP.GetURI(), nodeID)
if err != nil {
errs = append(errs, errors.Wrap(err, "could not fetch cluster id"))
continue
}

isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HTTP.GetURI())
if err != nil {
errs = append(errs, errors.Wrap(err, "error determining if connected Elasticsearch node is master"))
Expand All @@ -205,7 +199,7 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error {

event.RootFields = common.MapStr{
"timestamp": time.Now(),
"cluster_uuid": clusterID,
"cluster_uuid": info.ClusterID,
"interval_ms": m.Module().Config().Period.Nanoseconds() / 1000 / 1000,
"type": "node_stats",
"node_stats": nodeData,
Expand Down
14 changes: 12 additions & 2 deletions metricbeat/module/elasticsearch/node_stats/node_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,27 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {
return
}

info, err := elasticsearch.GetInfo(m.HTTP, m.getServiceURI())
if err != nil {
elastic.ReportAndLogError(err, r, m.Log)
return
}

if m.XPack {
err = eventsMappingXPack(r, m, content)
err = eventsMappingXPack(r, m, *info, content)
if err != nil {
m.Log.Error(err)
return
}
} else {
err = eventsMapping(r, content)
err = eventsMapping(r, *info, content)
if err != nil {
elastic.ReportAndLogError(err, r, m.Log)
return
}
}
}

func (m *MetricSet) getServiceURI() string {
return m.HostData().SanitizedURI + nodeStatsPath
}

0 comments on commit 9410148

Please sign in to comment.