Skip to content

Commit

Permalink
Cherry-pick #9166 to 6.x: Index cluster.id and cluster.name in elasti…
Browse files Browse the repository at this point in the history
…csearch/pending_tasks metricset (#9174)

Cherry-pick of PR #9166 to 6.x branch. Original message: 

This PR teaches the `elasticsearch/pending_tasks` metricset to index the Elasticsearch `cluster_uuid` and `cluster_name` as the module-level `cluster.id` and `cluster.name` fields, respectively.
  • Loading branch information
ycombinator authored Nov 21, 2018
1 parent eec92fc commit 4fd6ff7
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 32 deletions.
46 changes: 24 additions & 22 deletions metricbeat/module/elasticsearch/pending_tasks/_meta/data.json
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
{
"@timestamp": "2018-05-03T20:58:17.379Z",
"metricset": {
"rtt": 2487,
"namespace": "elasticsearch.cluster.pending_task",
"name": "pending_tasks",
"module": "elasticsearch",
"host": "localhost:9200"
},
"elasticsearch": {
"cluster": {
"pending_task": {
"insert_order": 47,
"priority": "HIGH",
"source": "put-mapping",
"time_in_queue.ms": 34
}
"@timestamp": "2018-05-03T20:58:17.379Z",
"metricset": {
"rtt": 2487,
"namespace": "elasticsearch.cluster.pending_task",
"name": "pending_tasks",
"module": "elasticsearch",
"host": "localhost:9200"
},
"elasticsearch": {
"cluster": {
"id": "3LbUkLkURz--FR-YO0wLNA",
"name": "es1",
"pending_task": {
"insert_order": 47,
"priority": "HIGH",
"source": "put-mapping",
"time_in_queue.ms": 34
}
}
},
"beat": {
"name": "host.example.com",
"hostname": "host.example.com"
}
},
"beat": {
"name": "host.example.com",
"hostname": "host.example.com"
}
}
}
6 changes: 5 additions & 1 deletion metricbeat/module/elasticsearch/pending_tasks/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var (
}
)

func eventsMapping(r mb.ReporterV2, content []byte) error {
func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) error {
tasksStruct := struct {
Tasks []map[string]interface{} `json:"tasks"`
}{}
Expand All @@ -63,6 +63,10 @@ func eventsMapping(r mb.ReporterV2, content []byte) error {
event.RootFields = common.MapStr{}
event.RootFields.Put("service.name", elasticsearch.ModuleName)

event.ModuleFields = common.MapStr{}
event.ModuleFields.Put("cluster.name", info.ClusterName)
event.ModuleFields.Put("cluster.id", info.ClusterID)

event.MetricSetFields, err = schema.Apply(task)
if err != nil {
event.Error = errors.Wrap(err, "failure applying task schema")
Expand Down
44 changes: 37 additions & 7 deletions metricbeat/module/elasticsearch/pending_tasks/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/mb"
mbtest "github.com/elastic/beats/metricbeat/mb/testing"
"github.com/elastic/beats/metricbeat/module/elasticsearch"
)

var info = elasticsearch.Info{
ClusterID: "1234",
ClusterName: "helloworld",
}

//Events Mapping

func TestEmptyQueueShouldGiveNoError(t *testing.T) {
Expand All @@ -40,7 +46,7 @@ func TestEmptyQueueShouldGiveNoError(t *testing.T) {
assert.NoError(t, err)

reporter := &mbtest.CapturingReporterV2{}
err = eventsMapping(reporter, content)
err = eventsMapping(reporter, info, content)
assert.NoError(t, err)
}

Expand All @@ -50,7 +56,7 @@ func TestNotEmptyQueueShouldGiveNoError(t *testing.T) {
assert.NoError(t, err)

reporter := &mbtest.CapturingReporterV2{}
err = eventsMapping(reporter, content)
err = eventsMapping(reporter, info, content)
assert.NoError(t, err)
assert.True(t, len(reporter.GetEvents()) >= 1)
assert.Zero(t, len(reporter.GetErrors()))
Expand All @@ -62,7 +68,7 @@ func TestEmptyQueueShouldGiveZeroEvent(t *testing.T) {
assert.NoError(t, err)

reporter := &mbtest.CapturingReporterV2{}
err = eventsMapping(reporter, content)
err = eventsMapping(reporter, info, content)
assert.Zero(t, len(reporter.GetEvents()))
assert.Zero(t, len(reporter.GetErrors()))
}
Expand All @@ -73,7 +79,7 @@ func TestNotEmptyQueueShouldGiveSeveralEvents(t *testing.T) {
assert.NoError(t, err)

reporter := &mbtest.CapturingReporterV2{}
err = eventsMapping(reporter, content)
err = eventsMapping(reporter, info, content)
assert.Equal(t, 3, len(reporter.GetEvents()))
assert.Zero(t, len(reporter.GetErrors()))
}
Expand All @@ -84,7 +90,7 @@ func TestInvalidJsonForRequiredFieldShouldThrowError(t *testing.T) {
assert.NoError(t, err)

reporter := &mbtest.CapturingReporterV2{}
err = eventsMapping(reporter, content)
err = eventsMapping(reporter, info, content)
assert.Error(t, err)
}

Expand All @@ -94,7 +100,7 @@ func TestInvalidJsonForBadFormatShouldThrowError(t *testing.T) {
assert.NoError(t, err)

reporter := &mbtest.CapturingReporterV2{}
err = eventsMapping(reporter, content)
err = eventsMapping(reporter, info, content)
assert.Error(t, err)
}

Expand All @@ -111,6 +117,12 @@ func TestEventsMappedMatchToContentReceived(t *testing.T) {
"name": "elasticsearch",
},
},
ModuleFields: common.MapStr{
"cluster": common.MapStr{
"id": "1234",
"name": "helloworld",
},
},
MetricSetFields: common.MapStr{
"priority": "URGENT",
"source": "create-index [foo_9], cause [api]",
Expand All @@ -128,6 +140,12 @@ func TestEventsMappedMatchToContentReceived(t *testing.T) {
"name": "elasticsearch",
},
},
ModuleFields: common.MapStr{
"cluster": common.MapStr{
"id": "1234",
"name": "helloworld",
},
},
MetricSetFields: common.MapStr{
"priority": "URGENT",
"source": "create-index [foo_9], cause [api]",
Expand All @@ -143,6 +161,12 @@ func TestEventsMappedMatchToContentReceived(t *testing.T) {
"name": "elasticsearch",
},
},
ModuleFields: common.MapStr{
"cluster": common.MapStr{
"id": "1234",
"name": "helloworld",
},
},
MetricSetFields: common.MapStr{"priority": "HIGH",
"source": "shard-started ([foo_2][1], node[tMTocMvQQgGCkj7QDHl3OA], [P], s[INITIALIZING]), reason [after recovery from shard_store]",
"time_in_queue.ms": int64(842),
Expand All @@ -156,6 +180,12 @@ func TestEventsMappedMatchToContentReceived(t *testing.T) {
"name": "elasticsearch",
},
},
ModuleFields: common.MapStr{
"cluster": common.MapStr{
"id": "1234",
"name": "helloworld",
},
},
MetricSetFields: common.MapStr{
"priority": "HIGH",
"source": "shard-started ([foo_2][0], node[tMTocMvQQgGCkj7QDHl3OA], [P], s[INITIALIZING]), reason [after recovery from shard_store]",
Expand All @@ -172,7 +202,7 @@ func TestEventsMappedMatchToContentReceived(t *testing.T) {
assert.NoError(t, err)

reporter := &mbtest.CapturingReporterV2{}
err = eventsMapping(reporter, content)
err = eventsMapping(reporter, info, content)

events := reporter.GetEvents()
if !reflect.DeepEqual(testCase.expected, events) {
Expand Down
14 changes: 12 additions & 2 deletions metricbeat/module/elasticsearch/pending_tasks/pending_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

// Fetch methods implements the data gathering and data conversion to the right format
func (m *MetricSet) Fetch(r mb.ReporterV2) {
isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI+pendingTasksPath)
isMaster, err := elasticsearch.IsMaster(m.HTTP, m.getServiceURI())
if err != nil {
err := errors.Wrap(err, "error determining if connected Elasticsearch node is master")
elastic.ReportAndLogError(err, r, m.Log)
Expand All @@ -76,15 +76,25 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {
return
}

info, err := elasticsearch.GetInfo(m.HTTP, m.getServiceURI())
if err != nil {
elastic.ReportAndLogError(err, r, m.Log)
return
}

content, err := m.HTTP.FetchContent()
if err != nil {
elastic.ReportAndLogError(err, r, m.Log)
return
}

err = eventsMapping(r, content)
err = eventsMapping(r, *info, content)
if err != nil {
m.Log.Error(err)
return
}
}

func (m *MetricSet) getServiceURI() string {
return m.HostData().SanitizedURI + pendingTasksPath
}

0 comments on commit 4fd6ff7

Please sign in to comment.