Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistency in Elastic stack metricsets' code #8308

Merged
merged 30 commits into from
Oct 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
654c691
Consistent error handling in elasticsearch/cluster_stats metricset
ycombinator Sep 13, 2018
b81e68c
Consistent error handling in elasticsearch/index_recovery metricset
ycombinator Sep 13, 2018
d31db81
Remove punctuation from error messages
ycombinator Sep 14, 2018
da50d8b
Inline variable
ycombinator Sep 14, 2018
a22bc35
Reordering imports
ycombinator Sep 14, 2018
4ab37c7
Adding periods to the ends of godoc comments
ycombinator Sep 14, 2018
6818949
More consistency cleanup
ycombinator Sep 18, 2018
c580f27
More consistency fixes
ycombinator Sep 18, 2018
9594363
More consistency fixes
ycombinator Sep 18, 2018
3deed0b
Fixing API path
ycombinator Sep 18, 2018
45cab6a
Consistent code in elasticsearch/pending_tasks metricset
ycombinator Sep 18, 2018
451fa56
More code consistency
ycombinator Sep 18, 2018
c40655f
Consistent code in elasticsearch/shard metricset
ycombinator Sep 18, 2018
e32d2b1
Consistent code in elasticsearch/ccr metricset
ycombinator Sep 26, 2018
1e5c38d
Making code in kibana module metricsets consistent
ycombinator Sep 26, 2018
2baf502
Making fully-qualified metricset name consistent
ycombinator Sep 26, 2018
a847e4e
Use elasticsearch.ModuleName constant instead of string literal
ycombinator Sep 28, 2018
8701cb9
Making logstash/node metricset code consistent
ycombinator Sep 29, 2018
c3631b4
Making the logstash/node_stats metricset code consistent
ycombinator Sep 29, 2018
40f3406
Refactoring common reporting and error pattern into helper function
ycombinator Sep 29, 2018
acf6930
Updating unit tests
ycombinator Oct 1, 2018
db6a2d2
Changes from running make fmt
ycombinator Oct 1, 2018
12a4015
Re-running make fmt after downgrading golang to 10.3
ycombinator Oct 1, 2018
0befa82
Fixes due to make update
ycombinator Oct 1, 2018
a864b21
Updating LS module integration tests
ycombinator Oct 2, 2018
6b6bb3c
Update kibana/status integration test
ycombinator Oct 2, 2018
5291885
Report error with event (for non x-pack path)
ycombinator Oct 2, 2018
35447b1
Attaching errors to events
ycombinator Oct 4, 2018
3be2ae3
Fixing imports in integration tests
ycombinator Oct 4, 2018
ab3a9ce
Fixing error
ycombinator Oct 4, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions metricbeat/helper/elastic/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"strings"

"github.com/elastic/beats/libbeat/logp"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/mb"
)
Expand Down Expand Up @@ -108,3 +110,9 @@ func IsFeatureAvailable(currentProductVersion, featureAvailableInProductVersion

return !currentVersion.LessThan(wantVersion), nil
}

// ReportAndLogError reports and logs the given error
func ReportAndLogError(err error, r mb.ReporterV2, l *logp.Logger) {
r.Error(err)
l.Error(err)
}
27 changes: 15 additions & 12 deletions metricbeat/module/elasticsearch/ccr/ccr.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/helper/elastic"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/elasticsearch"
)
Expand Down Expand Up @@ -58,49 +58,52 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(r mb.ReporterV2) {
isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI+ccrStatsPath)
if err != nil {
r.Error(errors.Wrap(err, "error determining if connected Elasticsearch node is master"))
err = errors.Wrap(err, "error determining if connected Elasticsearch node is master")
elastic.ReportAndLogError(err, r, m.Log)
return
}

// Not master, no event sent
if !isMaster {
logp.Debug(elasticsearch.ModuleName, "trying to fetch ccr stats from a non master node.")
m.Log.Debug("trying to fetch ccr stats from a non-master node")
return
}

info, err := elasticsearch.GetInfo(m.HTTP, m.HostData().SanitizedURI+ccrStatsPath)
if err != nil {
r.Error(err)
elastic.ReportAndLogError(err, r, m.Log)
return
}

elasticsearchVersion := info.Version.Number
isCCRStatsAPIAvailable, err := elasticsearch.IsCCRStatsAPIAvailable(elasticsearchVersion)
if err != nil {
r.Error(err)
elastic.ReportAndLogError(err, r, m.Log)
return
}

if !isCCRStatsAPIAvailable {
const errorMsg = "the %v metricset is only supported with Elasticsearch >= %v. " +
"You are currently running Elasticsearch %v"
r.Error(fmt.Errorf(errorMsg, m.FullyQualifiedName(), elasticsearch.CCRStatsAPIAvailableVersion, elasticsearchVersion))
err = fmt.Errorf(errorMsg, m.FullyQualifiedName(), elasticsearch.CCRStatsAPIAvailableVersion, elasticsearchVersion)
elastic.ReportAndLogError(err, r, m.Log)
return
}

content, err := m.HTTP.FetchContent()
if err != nil {
r.Error(err)
elastic.ReportAndLogError(err, r, m.Log)
return
}

if m.XPack {
eventsMappingXPack(r, m, *info, content)
err = eventsMappingXPack(r, m, *info, content)
} else {
err = eventsMapping(r, *info, content)
if err != nil {
r.Error(err)
return
}
}

if err != nil {
m.Log.Error(err)
return
}
}
33 changes: 19 additions & 14 deletions metricbeat/module/elasticsearch/ccr/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,40 +60,45 @@ func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) err
return err
}

var errors multierror.Errors
var errs multierror.Errors
for _, followerShards := range data {

shards, ok := followerShards.([]interface{})
if !ok {
err := fmt.Errorf("shards is not an array")
errors = append(errors, err)
errs = append(errs, err)
r.Error(err)
continue
}

for _, s := range shards {
event := mb.Event{}
event.RootFields = common.MapStr{}
event.RootFields.Put("service.name", elasticsearch.ModuleName)

event.ModuleFields = common.MapStr{}
event.ModuleFields.Put("cluster.name", info.ClusterName)
event.ModuleFields.Put("cluster.id", info.ClusterID)

shard, ok := s.(map[string]interface{})
if !ok {
err := fmt.Errorf("shard is not an object")
errors = append(errors, err)
event.Error = fmt.Errorf("shard is not an object")
r.Event(event)
errs = append(errs, event.Error)
continue
}
event := mb.Event{}

event.MetricSetFields, err = schema.Apply(shard)
if err != nil {
errors = append(errors, err)
event.Error = errors.Wrap(err, "failure applying shard schema")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that we report partial documents events even on failure. We should probably document this somewhere.

r.Event(event)
errs = append(errs, event.Error)
continue
}

event.RootFields = common.MapStr{}
event.RootFields.Put("service.name", elasticsearch.ModuleName)

event.ModuleFields = common.MapStr{}
event.ModuleFields.Put("cluster.name", info.ClusterName)
event.ModuleFields.Put("cluster.id", info.ClusterID)

r.Event(event)
}
}

return errors.Err()
return errs.Err()
}
4 changes: 1 addition & 3 deletions metricbeat/module/elasticsearch/ccr/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info,
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
err = errors.Wrap(err, "failure parsing Elasticsearch CCR Stats API response")
r.Error(err)
return err
return errors.Wrap(err, "failure parsing Elasticsearch CCR Stats API response")
}

var errors multierror.Errors
Expand Down
17 changes: 11 additions & 6 deletions metricbeat/module/elasticsearch/cluster_stats/cluster_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/helper/elastic"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/elasticsearch"
)
Expand Down Expand Up @@ -57,19 +57,20 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(r mb.ReporterV2) {
isMaster, err := elasticsearch.IsMaster(m.HTTP, m.HostData().SanitizedURI+clusterStatsPath)
if err != nil {
r.Error(errors.Wrap(err, "error determining if connected Elasticsearch node is master"))
err := errors.Wrap(err, "error determining if connected Elasticsearch node is master")
elastic.ReportAndLogError(err, r, m.Log)
return
}

// Not master, no event sent
if !isMaster {
logp.Debug(elasticsearch.ModuleName, "trying to fetch cluster stats from a non master node.")
m.Log.Debug("trying to fetch cluster stats from a non-master node")
return
}

content, err := m.HTTP.FetchContent()
if err != nil {
r.Error(err)
elastic.ReportAndLogError(err, r, m.Log)
return
}

Expand All @@ -80,8 +81,12 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {
}

if m.MetricSet.XPack {
eventMappingXPack(r, m, *info, content)
err = eventMappingXPack(r, m, *info, content)
} else {
eventMapping(r, *info, content)
err = eventMapping(r, *info, content)
}

if err != nil {
m.Log.Error(err)
}
}
11 changes: 6 additions & 5 deletions metricbeat/module/elasticsearch/cluster_stats/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,16 @@ func eventMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) erro
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
err = errors.Wrap(err, "failure parsing Elasticsearch Cluster Stats API response")
r.Error(err)
event.Error = errors.Wrap(err, "failure parsing Elasticsearch Cluster Stats API response")
r.Event(event)
return event.Error
}

metricSetFields, err := schema.Apply(data)
if err != nil {
err = errors.Wrap(err, "failure applying cluster stats schema")
r.Error(err)
return err
event.Error = errors.Wrap(err, "failure applying cluster stats schema")
r.Event(event)
return event.Error
}

event.MetricSetFields = metricSetFields
Expand Down
28 changes: 14 additions & 14 deletions metricbeat/module/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ import (
"github.com/elastic/beats/metricbeat/helper/elastic"
)

// CCRStatsAPIAvailableVersion is the version of Elasticsearch since when the CCR stats API is available
// CCRStatsAPIAvailableVersion is the version of Elasticsearch since when the CCR stats API is available.
const CCRStatsAPIAvailableVersion = "6.5.0"

// Global clusterIdCache. Assumption is that the same node id never can belong to a different cluster id
// Global clusterIdCache. Assumption is that the same node id never can belong to a different cluster id.
var clusterIDCache = map[string]string{}

// ModuleName is the ame of this module
// ModuleName is the name of this module.
const ModuleName = "elasticsearch"

// Info construct contains the data from the Elasticsearch / endpoint
Expand All @@ -48,7 +48,7 @@ type Info struct {
} `json:"version"`
}

// NodeInfo struct cotains data about the node
// NodeInfo struct cotains data about the node.
type NodeInfo struct {
Host string `json:"host"`
TransportAddress string `json:"transport_address"`
Expand All @@ -57,7 +57,7 @@ type NodeInfo struct {
ID string
}

// GetClusterID fetches cluster id for given nodeID
// GetClusterID fetches cluster id for given nodeID.
func GetClusterID(http *helper.HTTP, uri string, nodeID string) (string, error) {
// Check if cluster id already cached. If yes, return it.
if clusterID, ok := clusterIDCache[nodeID]; ok {
Expand All @@ -73,7 +73,7 @@ func GetClusterID(http *helper.HTTP, uri string, nodeID string) (string, error)
return info.ClusterID, nil
}

// IsMaster checks if the given node host is a master node
// IsMaster checks if the given node host is a master node.
//
// The detection of the master is done in two steps:
// * Fetch node name from /_nodes/_local/name
Expand Down Expand Up @@ -130,7 +130,7 @@ func getMasterName(http *helper.HTTP, uri string) (string, error) {
return clusterStruct.MasterNode, nil
}

// GetInfo returns the data for the Elasticsearch / endpoint
// GetInfo returns the data for the Elasticsearch / endpoint.
func GetInfo(http *helper.HTTP, uri string) (*Info, error) {

content, err := fetchPath(http, uri, "/")
Expand All @@ -157,7 +157,7 @@ func fetchPath(http *helper.HTTP, uri, path string) ([]byte, error) {
return http.FetchContent()
}

// GetNodeInfo returns the node information
// GetNodeInfo returns the node information.
func GetNodeInfo(http *helper.HTTP, uri string, nodeID string) (*NodeInfo, error) {

content, err := fetchPath(http, uri, "/_nodes/_local/nodes")
Expand All @@ -184,7 +184,7 @@ func GetNodeInfo(http *helper.HTTP, uri string, nodeID string) (*NodeInfo, error

// GetLicense returns license information. Since we don't expect license information
// to change frequently, the information is cached for 1 minute to avoid
// hitting Elasticsearch frequently
// hitting Elasticsearch frequently.
func GetLicense(http *helper.HTTP, resetURI string) (common.MapStr, error) {
// First, check the cache
license := licenseCache.get()
Expand Down Expand Up @@ -218,7 +218,7 @@ func GetLicense(http *helper.HTTP, resetURI string) (common.MapStr, error) {
return licenseCache.get(), nil
}

// GetClusterState returns cluster state information
// GetClusterState returns cluster state information.
func GetClusterState(http *helper.HTTP, resetURI string, metrics []string) (common.MapStr, error) {
clusterStateURI := "_cluster/state"
if metrics != nil && len(metrics) > 0 {
Expand All @@ -235,7 +235,7 @@ func GetClusterState(http *helper.HTTP, resetURI string, metrics []string) (comm
return clusterState, err
}

// GetStackUsage returns stack usage information
// GetStackUsage returns stack usage information.
func GetStackUsage(http *helper.HTTP, resetURI string) (common.MapStr, error) {
content, err := fetchPath(http, resetURI, "_xpack/usage")
if err != nil {
Expand All @@ -248,7 +248,7 @@ func GetStackUsage(http *helper.HTTP, resetURI string) (common.MapStr, error) {
}

// PassThruField copies the field at the given path from the given source data object into
// the same path in the given target data object
// the same path in the given target data object.
func PassThruField(fieldPath string, sourceData, targetData common.MapStr) error {
fieldValue, err := sourceData.GetValue(fieldPath)
if err != nil {
Expand All @@ -260,12 +260,12 @@ func PassThruField(fieldPath string, sourceData, targetData common.MapStr) error
}

// IsCCRStatsAPIAvailable returns whether the CCR stats API is available in the given version
// of Elasticsearch
// of Elasticsearch.
func IsCCRStatsAPIAvailable(currentElasticsearchVersion string) (bool, error) {
return elastic.IsFeatureAvailable(currentElasticsearchVersion, CCRStatsAPIAvailableVersion)
}

// Global cache for license information. Assumption is that license information changes infrequently
// Global cache for license information. Assumption is that license information changes infrequently.
var licenseCache = &_licenseCache{}

type _licenseCache struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func getEnvPort() string {
// GetConfig returns config for elasticsearch module
func getConfig(metricset string) map[string]interface{} {
return map[string]interface{}{
"module": "elasticsearch",
"module": elasticsearch.ModuleName,
"metricsets": []string{metricset},
"hosts": []string{getEnvHost() + ":" + getEnvPort()},
"index_recovery.active_only": false,
Expand Down
19 changes: 12 additions & 7 deletions metricbeat/module/elasticsearch/index/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,23 @@ func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) err
var errs multierror.Errors
for name, index := range indicesStruct.Indices {
event := mb.Event{}
event.MetricSetFields, err = schema.Apply(index)
if err != nil {
r.Error(err)
errs = append(errs, errors.Wrap(err, "failure applying index schema"))
}
// Write name here as full name only available as key
event.MetricSetFields["name"] = name

event.RootFields = common.MapStr{}
event.RootFields.Put("service.name", elasticsearch.ModuleName)

event.ModuleFields = common.MapStr{}
event.ModuleFields.Put("cluster.name", info.ClusterName)
event.ModuleFields.Put("cluster.id", info.ClusterID)

event.MetricSetFields, err = schema.Apply(index)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The event can also contain an error. I think it would make sense to store/report the error alongside the data that it's associated with.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is now solved with the more resent commits?

if err != nil {
event.Error = errors.Wrap(err, "failure applying index schema")
r.Event(event)
errs = append(errs, event.Error)
continue
}
// Write name here as full name only available as key
event.MetricSetFields["name"] = name
r.Event(event)
}

Expand Down
Loading