Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configurable number of 'most recent' date-stamped indices to gather in Elasticsearch input #8543

Merged
merged 3 commits into from
Dec 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions plugins/inputs/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ Note that specific statistics information can change between Elasticsearch versi
cluster_stats_only_from_master = true

## Indices to collect; can be one or more indices names or _all
## Use of wildcards is allowed. Use a wildcard at the end to retrieve index names that end with a changing value, like a date.
indices_include = ["_all"]

## One of "shards", "cluster", "indices"
Expand All @@ -74,6 +75,10 @@ Note that specific statistics information can change between Elasticsearch versi
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## Sets the number of most recent indices to return for indices that are configured with a date-stamped suffix.
## Each 'indices_include' entry ending with a wildcard (*) or glob matching pattern will group together all indices that match it, and ## sort them by the date or number after the wildcard. Metrics then are gathered for only the 'num_most_recent_indices' amount of most ## recent indices.
# num_most_recent_indices = 0
```

### Metrics
Expand Down
203 changes: 156 additions & 47 deletions plugins/inputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/tls"
"github.com/influxdata/telegraf/plugins/inputs"
Expand Down Expand Up @@ -115,6 +116,7 @@ const sampleConfig = `
cluster_stats_only_from_master = true

## Indices to collect; can be one or more indices names or _all
## Use of wildcards is allowed. Use a wildcard at the end to retrieve index names that end with a changing value, like a date.
indices_include = ["_all"]

## One of "shards", "cluster", "indices"
Expand All @@ -135,6 +137,11 @@ const sampleConfig = `
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## Sets the number of most recent indices to return for indices that are configured with a date-stamped suffix.
## Each 'indices_include' entry ending with a wildcard (*) or glob matching pattern will group together all indices that match it, and sort them
## by the date or number after the wildcard. Metrics then are gathered for only the 'num_most_recent_indices' amount of most recent indices.
# num_most_recent_indices = 0
`

// Elasticsearch is a plugin to read stats from one or many Elasticsearch
Expand All @@ -152,11 +159,14 @@ type Elasticsearch struct {
NodeStats []string `toml:"node_stats"`
Username string `toml:"username"`
Password string `toml:"password"`
NumMostRecentIndices int `toml:"num_most_recent_indices"`

tls.ClientConfig

client *http.Client
serverInfo map[string]serverInfo
serverInfoMutex sync.Mutex
indexMatchers map[string]filter.Filter
}
type serverInfo struct {
nodeID string
Expand Down Expand Up @@ -214,6 +224,19 @@ func (e *Elasticsearch) Description() string {
return "Read stats from one or more Elasticsearch servers or clusters"
}

// Init the plugin.
func (e *Elasticsearch) Init() error {
// Compile the configured indexes to match for sorting.
indexMatchers, err := e.compileIndexMatchers()
if err != nil {
return err
}

e.indexMatchers = indexMatchers

return nil
}

// Gather reads the stats from Elasticsearch and writes it to the
// Accumulator.
func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
Expand Down Expand Up @@ -527,66 +550,135 @@ func (e *Elasticsearch) gatherIndicesStats(url string, acc telegraf.Accumulator)
acc.AddFields("elasticsearch_indices_stats_"+m, jsonParser.Fields, map[string]string{"index_name": "_all"}, now)
}

// Individual Indices stats
for id, index := range indicesStats.Indices {
indexTag := map[string]string{"index_name": id}
stats := map[string]interface{}{
"primaries": index.Primaries,
"total": index.Total,
// Gather stats for each index.
err := e.gatherIndividualIndicesStats(indicesStats.Indices, now, acc)

return err
}

// gatherSortedIndicesStats gathers stats for all indices in no particular order.
func (e *Elasticsearch) gatherIndividualIndicesStats(indices map[string]indexStat, now time.Time, acc telegraf.Accumulator) error {
// Sort indices into buckets based on their configured prefix, if any matches.
categorizedIndexNames, err := e.categorizeIndices(indices)
if err != nil {
return err
}

for _, matchingIndices := range categorizedIndexNames {
// Establish the number of each category of indices to use. User can configure to use only the latest 'X' amount.
indicesCount := len(matchingIndices)
indicesToTrackCount := indicesCount

// Sort the indices if configured to do so.
if e.NumMostRecentIndices > 0 {
if e.NumMostRecentIndices < indicesToTrackCount {
indicesToTrackCount = e.NumMostRecentIndices
}
sort.Strings(matchingIndices)
}
for m, s := range stats {
f := jsonparser.JSONFlattener{}
// parse Json, getting strings and bools
err := f.FullFlattenJSON("", s, true, true)

// Gather only the number of indexes that have been configured, in descending order (most recent, if date-stamped).
for i := indicesCount - 1; i >= indicesCount-indicesToTrackCount; i-- {
indexName := matchingIndices[i]

err := e.gatherSingleIndexStats(indexName, indices[indexName], now, acc)
if err != nil {
return err
}
acc.AddFields("elasticsearch_indices_stats_"+m, f.Fields, indexTag, now)
}
}

if e.IndicesLevel == "shards" {
for shardNumber, shards := range index.Shards {
for _, shard := range shards {
return nil
}

// Get Shard Stats
flattened := jsonparser.JSONFlattener{}
err := flattened.FullFlattenJSON("", shard, true, true)
if err != nil {
return err
}
func (e *Elasticsearch) categorizeIndices(indices map[string]indexStat) (map[string][]string, error) {
categorizedIndexNames := map[string][]string{}

// determine shard tag and primary/replica designation
shardType := "replica"
if flattened.Fields["routing_primary"] == true {
shardType = "primary"
}
delete(flattened.Fields, "routing_primary")
// If all indices are configured to be gathered, bucket them all together.
if len(e.IndicesInclude) == 0 || e.IndicesInclude[0] == "_all" {
for indexName := range indices {
categorizedIndexNames["_all"] = append(categorizedIndexNames["_all"], indexName)
}

routingState, ok := flattened.Fields["routing_state"].(string)
if ok {
flattened.Fields["routing_state"] = mapShardStatusToCode(routingState)
}
return categorizedIndexNames, nil
}

routingNode, _ := flattened.Fields["routing_node"].(string)
shardTags := map[string]string{
"index_name": id,
"node_id": routingNode,
"shard_name": string(shardNumber),
"type": shardType,
}
// Bucket each returned index with its associated configured index (if any match).
for indexName := range indices {
match := indexName
for name, matcher := range e.indexMatchers {
// If a configured index matches one of the returned indexes, mark it as a match.
if matcher.Match(match) {
match = name
break
}
}

for key, field := range flattened.Fields {
switch field.(type) {
case string, bool:
delete(flattened.Fields, key)
}
}
// Bucket all matching indices together for sorting.
categorizedIndexNames[match] = append(categorizedIndexNames[match], indexName)
}

return categorizedIndexNames, nil
}

func (e *Elasticsearch) gatherSingleIndexStats(name string, index indexStat, now time.Time, acc telegraf.Accumulator) error {
indexTag := map[string]string{"index_name": name}
stats := map[string]interface{}{
"primaries": index.Primaries,
"total": index.Total,
}
for m, s := range stats {
f := jsonparser.JSONFlattener{}
// parse Json, getting strings and bools
err := f.FullFlattenJSON("", s, true, true)
if err != nil {
return err
}
acc.AddFields("elasticsearch_indices_stats_"+m, f.Fields, indexTag, now)
}

if e.IndicesLevel == "shards" {
for shardNumber, shards := range index.Shards {
for _, shard := range shards {

// Get Shard Stats
flattened := jsonparser.JSONFlattener{}
err := flattened.FullFlattenJSON("", shard, true, true)
if err != nil {
return err
}

acc.AddFields("elasticsearch_indices_stats_shards",
flattened.Fields,
shardTags,
now)
// determine shard tag and primary/replica designation
shardType := "replica"
if flattened.Fields["routing_primary"] == true {
shardType = "primary"
}
delete(flattened.Fields, "routing_primary")

routingState, ok := flattened.Fields["routing_state"].(string)
if ok {
flattened.Fields["routing_state"] = mapShardStatusToCode(routingState)
}

routingNode, _ := flattened.Fields["routing_node"].(string)
shardTags := map[string]string{
"index_name": name,
"node_id": routingNode,
"shard_name": string(shardNumber),
"type": shardType,
}

for key, field := range flattened.Fields {
switch field.(type) {
case string, bool:
delete(flattened.Fields, key)
}
}

acc.AddFields("elasticsearch_indices_stats_shards",
flattened.Fields,
shardTags,
now)
}
}
}
Expand Down Expand Up @@ -656,6 +748,23 @@ func (e *Elasticsearch) gatherJSONData(url string, v interface{}) error {
return nil
}

func (e *Elasticsearch) compileIndexMatchers() (map[string]filter.Filter, error) {
indexMatchers := map[string]filter.Filter{}
var err error

// Compile each configured index into a glob matcher.
for _, configuredIndex := range e.IndicesInclude {
if _, exists := indexMatchers[configuredIndex]; !exists {
indexMatchers[configuredIndex], err = filter.Compile([]string{configuredIndex})
if err != nil {
return nil, err
}
}
}

return indexMatchers, nil
}

func init() {
inputs.Add("elasticsearch", func() telegraf.Input {
return NewElasticsearch()
Expand Down
43 changes: 43 additions & 0 deletions plugins/inputs/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,49 @@ func TestGatherClusterIndicesStats(t *testing.T) {
map[string]string{"index_name": "twitter"})
}

func TestGatherDateStampedIndicesStats(t *testing.T) {
es := newElasticsearchWithClient()
es.IndicesInclude = []string{"twitter*", "influx*", "penguins"}
es.NumMostRecentIndices = 2
es.Servers = []string{"http://example.com:9200"}
es.client.Transport = newTransportMock(http.StatusOK, dateStampedIndicesResponse)
es.serverInfo = make(map[string]serverInfo)
es.serverInfo["http://example.com:9200"] = defaultServerInfo()
es.Init()

var acc testutil.Accumulator
if err := es.gatherIndicesStats(es.Servers[0]+"/"+strings.Join(es.IndicesInclude, ",")+"/_stats", &acc); err != nil {
t.Fatal(err)
}

// includes 2 most recent indices for "twitter", only expect the most recent two.
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
clusterIndicesExpected,
map[string]string{"index_name": "twitter_2020_08_02"})
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
clusterIndicesExpected,
map[string]string{"index_name": "twitter_2020_08_01"})
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
clusterIndicesExpected,
map[string]string{"index_name": "twitter_2020_07_31"})

// includes 2 most recent indices for "influx", only expect the most recent two.
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
clusterIndicesExpected,
map[string]string{"index_name": "influx2021.01.02"})
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
clusterIndicesExpected,
map[string]string{"index_name": "influx2021.01.01"})
acc.AssertDoesNotContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
clusterIndicesExpected,
map[string]string{"index_name": "influx2020.12.31"})

// not configured to sort the 'penguins' index, but ensure it is also included.
acc.AssertContainsTaggedFields(t, "elasticsearch_indices_stats_primaries",
clusterIndicesExpected,
map[string]string{"index_name": "penguins"})
}

func TestGatherClusterIndiceShardsStats(t *testing.T) {
es := newElasticsearchWithClient()
es.IndicesLevel = "shards"
Expand Down
Loading