Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better error messaging for Elastic stacks' metricsets #8551

Merged
merged 15 commits into from
Oct 4, 2018
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions metricbeat/mb/mb.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,12 @@ func (b *BaseMetricSet) Name() string {
return b.name
}

// FullyQualifiedName returns the complete name of the MetricSet, including the
// name of the module.
func (b *BaseMetricSet) FullyQualifiedName() string {
return b.Module().Name() + "/" + b.Name()
}

// Module returns the parent Module for the MetricSet.
func (b *BaseMetricSet) Module() Module {
return b.module
Expand Down
14 changes: 8 additions & 6 deletions metricbeat/module/elasticsearch/ccr/ccr.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ package ccr
import (
"fmt"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/elasticsearch"
)

func init() {
mb.Registry.MustAddMetricSet("elasticsearch", "ccr", New,
mb.Registry.MustAddMetricSet(elasticsearch.ModuleName, "ccr", New,
mb.WithHostParser(elasticsearch.HostParser),
)
}
Expand All @@ -43,7 +45,7 @@ type MetricSet struct {

// New create a new instance of the MetricSet
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
cfgwarn.Beta("The elasticsearch ccr metricset is beta")
cfgwarn.Beta("the " + base.FullyQualifiedName() + " metricset is beta")

ms, err := elasticsearch.NewMetricSet(base, ccrStatsPath)
if err != nil {
Expand All @@ -56,13 +58,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(r mb.ReporterV2) {
isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI+ccrStatsPath)
if err != nil {
r.Error(err)
r.Error(errors.Wrap(err, "error determining if connected Elasticsearch node is master"))
return
}

// Not master, no event sent
if !isMaster {
logp.Debug("elasticsearch", "Trying to fetch ccr stats from a non master node.")
logp.Debug(elasticsearch.ModuleName, "trying to fetch ccr stats from a non master node.")
return
}

Expand All @@ -80,9 +82,9 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {
}

if !isCCRStatsAPIAvailable {
const errorMsg = "the elasticsearch ccr metricset is only supported with Elasticsearch >= %v. " +
const errorMsg = "the %v metricset is only supported with Elasticsearch >= %v. " +
"You are currently running Elasticsearch %v"
r.Error(fmt.Errorf(errorMsg, elasticsearch.CCRStatsAPIAvailableVersion, elasticsearchVersion))
r.Error(fmt.Errorf(errorMsg, m.FullyQualifiedName(), elasticsearch.CCRStatsAPIAvailableVersion, elasticsearchVersion))
return
}

Expand Down
4 changes: 3 additions & 1 deletion metricbeat/module/elasticsearch/ccr/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
s "github.com/elastic/beats/libbeat/common/schema"
Expand Down Expand Up @@ -54,6 +55,7 @@ func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) err
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
err = errors.Wrap(err, "failure parsing Elasticsearch CCR Stats API response")
r.Error(err)
return err
}
Expand Down Expand Up @@ -83,7 +85,7 @@ func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) err
}

event.RootFields = common.MapStr{}
event.RootFields.Put("service.name", "elasticsearch")
event.RootFields.Put("service.name", elasticsearch.ModuleName)

event.ModuleFields = common.MapStr{}
event.ModuleFields.Put("cluster.name", info.ClusterName)
Expand Down
2 changes: 2 additions & 0 deletions metricbeat/module/elasticsearch/ccr/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/helper/elastic"
Expand All @@ -34,6 +35,7 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info,
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
err = errors.Wrap(err, "failure parsing Elasticsearch CCR Stats API response")
r.Error(err)
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package cluster_stats

import (
"fmt"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
Expand All @@ -44,7 +44,7 @@ type MetricSet struct {

// New create a new instance of the MetricSet
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
cfgwarn.Beta("The elasticsearch cluster_stats metricset is beta")
cfgwarn.Beta("the " + base.FullyQualifiedName() + " metricset is beta")

ms, err := elasticsearch.NewMetricSet(base, clusterStatsPath)
if err != nil {
Expand All @@ -57,13 +57,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(r mb.ReporterV2) {
isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI+clusterStatsPath)
if err != nil {
r.Error(fmt.Errorf("Error fetching master info: %s", err))
r.Error(errors.Wrap(err, "error determining if connected Elasticsearch node is master"))
return
}

// Not master, no event sent
if !isMaster {
logp.Debug("elasticsearch", "Trying to fetch cluster stats from a non master node.")
logp.Debug(elasticsearch.ModuleName, "trying to fetch cluster stats from a non master node.")
return
}

Expand Down
9 changes: 6 additions & 3 deletions metricbeat/module/elasticsearch/cluster_stats/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ package cluster_stats
import (
"encoding/json"

"github.com/elastic/beats/metricbeat/helper/elastic"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"

s "github.com/elastic/beats/libbeat/common/schema"
c "github.com/elastic/beats/libbeat/common/schema/mapstriface"
"github.com/elastic/beats/metricbeat/helper/elastic"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/elasticsearch"
)

var (
Expand Down Expand Up @@ -56,12 +57,14 @@ func eventMapping(r mb.ReporterV2, content []byte) error {
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
err = errors.Wrap(err, "failure parsing Elasticsearch Cluster Stats API response")
r.Error(err)
return err
}

metricSetFields, err := schema.Apply(data)
if err != nil {
err = errors.Wrap(err, "failure applying cluster stats schema")
r.Error(err)
return err
}
Expand All @@ -73,7 +76,7 @@ func eventMapping(r mb.ReporterV2, content []byte) error {

var event mb.Event
event.RootFields = common.MapStr{}
event.RootFields.Put("service.name", "elasticsearch")
event.RootFields.Put("service.name", elasticsearch.ModuleName)

event.ModuleFields = common.MapStr{}
event.ModuleFields.Put("cluster.name", clusterName)
Expand Down
23 changes: 12 additions & 11 deletions metricbeat/module/elasticsearch/cluster_stats/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ import (
"strings"
"time"

"github.com/elastic/beats/metricbeat/helper/elastic"
"github.com/elastic/beats/metricbeat/module/elasticsearch"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/helper/elastic"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/elasticsearch"
)

func clusterNeedsTLSEnabled(license, stackStats common.MapStr) (bool, error) {
Expand Down Expand Up @@ -145,7 +146,7 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error {
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
return err
return errors.Wrap(err, "failure parsing Elasticsearch Cluster Stats API response")
}

clusterStats := common.MapStr(data)
Expand All @@ -161,44 +162,44 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error {

info, err := elasticsearch.GetInfo(m.HTTP, m.HTTP.GetURI())
if err != nil {
return err
return errors.Wrap(err, "failed to get info from Elasticsearch")
}

license, err := elasticsearch.GetLicense(m.HTTP, m.HTTP.GetURI())
if err != nil {
return err
return errors.Wrap(err, "failed to get license from Elasticsearch")
}

clusterStateMetrics := []string{"version", "master_node", "nodes", "routing_table"}
clusterState, err := elasticsearch.GetClusterState(m.HTTP, m.HTTP.GetURI(), clusterStateMetrics)
if err != nil {
return err
return errors.Wrap(err, "failed to get cluster state from Elasticsearch")
}

if err = elasticsearch.PassThruField("status", clusterStats, clusterState); err != nil {
return err
return errors.Wrap(err, "failed to pass through status field")
}

nodesHash, err := computeNodesHash(clusterState)
if err != nil {
return err
return errors.Wrap(err, "failed to compute nodes hash")
}
clusterState.Put("nodes_hash", nodesHash)

usage, err := elasticsearch.GetStackUsage(m.HTTP, m.HTTP.GetURI())
if err != nil {
return err
return errors.Wrap(err, "failed to get stack usage from Elasticsearch")
}

clusterNeedsTLS, err := clusterNeedsTLSEnabled(license, usage)
if err != nil {
return err
return errors.Wrap(err, "failed to determine if cluster needs TLS enabled")
}
license.Put("cluster_needs_tls", clusterNeedsTLS) // This powers a cluster alert for enabling TLS on the ES transport protocol

isAPMFound, err := apmIndicesExist(clusterState)
if err != nil {
return err
return errors.Wrap(err, "failed to determine if APM indices exist")
}
delete(clusterState, "routing_table") // We don't want to index the routing table in monitoring indices

Expand Down
10 changes: 6 additions & 4 deletions metricbeat/module/elasticsearch/index/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
s "github.com/elastic/beats/libbeat/common/schema"
Expand Down Expand Up @@ -59,27 +60,28 @@ func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) err
var indicesStruct IndicesStruct
err := json.Unmarshal(content, &indicesStruct)
if err != nil {
err = errors.Wrap(err, "failure parsing Elasticsearch Stats API response")
r.Error(err)
return err
}

var errors multierror.Errors
var errs multierror.Errors
for name, index := range indicesStruct.Indices {
event := mb.Event{}
event.MetricSetFields, err = schema.Apply(index)
if err != nil {
r.Error(err)
errors = append(errors, err)
errs = append(errs, errors.Wrap(err, "failure applying index schema"))
}
// Write name here as full name only available as key
event.MetricSetFields["name"] = name
event.RootFields = common.MapStr{}
event.RootFields.Put("service.name", "elasticsearch")
event.RootFields.Put("service.name", elasticsearch.ModuleName)
event.ModuleFields = common.MapStr{}
event.ModuleFields.Put("cluster.name", info.ClusterName)
event.ModuleFields.Put("cluster.id", info.ClusterID)
r.Event(event)
}

return errors.Err()
return errs.Err()
}
22 changes: 12 additions & 10 deletions metricbeat/module/elasticsearch/index/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,29 +96,31 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info,
var indicesStruct IndicesStruct
err := json.Unmarshal(content, &indicesStruct)
if err != nil {
m.Log.Errorw("Failure parsing Indices Stats Elasticsearch API response", "error", err)
err = errors.Wrap(err, "failure parsing Indices Stats Elasticsearch API response")
m.Log.Error(err)
return err
}

clusterStateMetrics := []string{"metadata", "routing_table"}
clusterState, err := elasticsearch.GetClusterState(m.HTTP, m.HTTP.GetURI(), clusterStateMetrics)
if err != nil {
m.Log.Errorw("Failure retrieving cluster state from Elasticsearch", "error", err)
err = errors.Wrap(err, "failure retrieving cluster state from Elasticsearch")
m.Log.Error(err)
return err
}

for name, index := range indicesStruct.Indices {
event := mb.Event{}
indexStats, err := xpackSchema.Apply(index)
if err != nil {
m.Log.Errorw("Failure applying index stats schema", "error", err)
m.Log.Error(errors.Wrap(err, "failure applying index stats schema"))
continue
}
indexStats["index"] = name

err = addClusterStateFields(name, indexStats, clusterState)
if err != nil {
m.Log.Errorw("Failure adding cluster state fields", "error", err)
m.Log.Error(errors.Wrap(err, "failure adding cluster state fields"))
continue
}

Expand Down Expand Up @@ -201,16 +203,16 @@ func getIndexStatus(shards map[string]interface{}) (string, error) {
areAllPrimariesStarted := true
areAllReplicasStarted := true

for _, indexShard := range shards {
for indexName, indexShard := range shards {
is, ok := indexShard.([]interface{})
if !ok {
return "", fmt.Errorf("shards is not an array")
}

for _, shard := range is {
for shardIdx, shard := range is {
s, ok := shard.(map[string]interface{})
if !ok {
return "", fmt.Errorf("shards is not an array of shard objects")
return "", fmt.Errorf("%v.shards[%v] is not a map", indexName, shardIdx)
}

shard := common.MapStr(s)
Expand Down Expand Up @@ -250,16 +252,16 @@ func getIndexShardStats(shards common.MapStr) (common.MapStr, error) {
initializing := 0
relocating := 0

for _, indexShard := range shards {
for indexName, indexShard := range shards {
is, ok := indexShard.([]interface{})
if !ok {
return nil, fmt.Errorf("shards is not an array")
}

for _, shard := range is {
for shardIdx, shard := range is {
s, ok := shard.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("shards is not an array of shard objects")
return nil, fmt.Errorf("%v.shards[%v] is not a map", indexName, shardIdx)
}

shard := common.MapStr(s)
Expand Down
Loading