Skip to content

Commit

Permalink
Elasticsearch: add number_of_in_flight_fetch to cluster health
Browse files Browse the repository at this point in the history
The metric number_of_in_flight_fetch was not used in the cluster health.
Add it and sort the fields like they are printed by ElasticSearch.

Closes: influxdata#2779
  • Loading branch information
Jean-Louis Dupond committed Jun 18, 2019
1 parent 222de4f commit dbe04bd
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 27 deletions.
56 changes: 29 additions & 27 deletions plugins/inputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,31 +40,32 @@ type nodeStat struct {
}

type clusterHealth struct {
ClusterName string `json:"cluster_name"`
Status string `json:"status"`
TimedOut bool `json:"timed_out"`
NumberOfNodes int `json:"number_of_nodes"`
NumberOfDataNodes int `json:"number_of_data_nodes"`
ActivePrimaryShards int `json:"active_primary_shards"`
ActiveShards int `json:"active_shards"`
RelocatingShards int `json:"relocating_shards"`
InitializingShards int `json:"initializing_shards"`
UnassignedShards int `json:"unassigned_shards"`
ActiveShardsPercentAsNumber float64 `json:"active_shards_percent_as_number"`
ClusterName string `json:"cluster_name"`
DelayedUnassignedShards int `json:"delayed_unassigned_shards"`
InitializingShards int `json:"initializing_shards"`
NumberOfDataNodes int `json:"number_of_data_nodes"`
NumberOfInFlightFetch int `json:"number_of_in_flight_fetch"`
NumberOfNodes int `json:"number_of_nodes"`
NumberOfPendingTasks int `json:"number_of_pending_tasks"`
RelocatingShards int `json:"relocating_shards"`
Status string `json:"status"`
TaskMaxWaitingInQueueMillis int `json:"task_max_waiting_in_queue_millis"`
ActiveShardsPercentAsNumber float64 `json:"active_shards_percent_as_number"`
TimedOut bool `json:"timed_out"`
UnassignedShards int `json:"unassigned_shards"`
Indices map[string]indexHealth `json:"indices"`
}

type indexHealth struct {
Status string `json:"status"`
NumberOfShards int `json:"number_of_shards"`
NumberOfReplicas int `json:"number_of_replicas"`
ActivePrimaryShards int `json:"active_primary_shards"`
ActiveShards int `json:"active_shards"`
RelocatingShards int `json:"relocating_shards"`
InitializingShards int `json:"initializing_shards"`
NumberOfReplicas int `json:"number_of_replicas"`
NumberOfShards int `json:"number_of_shards"`
RelocatingShards int `json:"relocating_shards"`
Status string `json:"status"`
UnassignedShards int `json:"unassigned_shards"`
}

Expand Down Expand Up @@ -361,20 +362,21 @@ func (e *Elasticsearch) gatherClusterHealth(url string, acc telegraf.Accumulator
}
measurementTime := time.Now()
clusterFields := map[string]interface{}{
"status": healthStats.Status,
"status_code": mapHealthStatusToCode(healthStats.Status),
"timed_out": healthStats.TimedOut,
"number_of_nodes": healthStats.NumberOfNodes,
"number_of_data_nodes": healthStats.NumberOfDataNodes,
"active_primary_shards": healthStats.ActivePrimaryShards,
"active_shards": healthStats.ActiveShards,
"relocating_shards": healthStats.RelocatingShards,
"initializing_shards": healthStats.InitializingShards,
"unassigned_shards": healthStats.UnassignedShards,
"active_shards_percent_as_number": healthStats.ActiveShardsPercentAsNumber,
"delayed_unassigned_shards": healthStats.DelayedUnassignedShards,
"initializing_shards": healthStats.InitializingShards,
"number_of_data_nodes": healthStats.NumberOfDataNodes,
"number_of_in_flight_fetch": healthStats.NumberOfInFlightFetch,
"number_of_nodes": healthStats.NumberOfNodes,
"number_of_pending_tasks": healthStats.NumberOfPendingTasks,
"relocating_shards": healthStats.RelocatingShards,
"status": healthStats.Status,
"status_code": mapHealthStatusToCode(healthStats.Status),
"task_max_waiting_in_queue_millis": healthStats.TaskMaxWaitingInQueueMillis,
"active_shards_percent_as_number": healthStats.ActiveShardsPercentAsNumber,
"timed_out": healthStats.TimedOut,
"unassigned_shards": healthStats.UnassignedShards,
}
acc.AddFields(
"elasticsearch_cluster_health",
Expand All @@ -385,14 +387,14 @@ func (e *Elasticsearch) gatherClusterHealth(url string, acc telegraf.Accumulator

for name, health := range healthStats.Indices {
indexFields := map[string]interface{}{
"status": health.Status,
"status_code": mapHealthStatusToCode(health.Status),
"number_of_shards": health.NumberOfShards,
"number_of_replicas": health.NumberOfReplicas,
"active_primary_shards": health.ActivePrimaryShards,
"active_shards": health.ActiveShards,
"relocating_shards": health.RelocatingShards,
"initializing_shards": health.InitializingShards,
"number_of_replicas": health.NumberOfReplicas,
"number_of_shards": health.NumberOfShards,
"relocating_shards": health.RelocatingShards,
"status": health.Status,
"status_code": mapHealthStatusToCode(health.Status),
"unassigned_shards": health.UnassignedShards,
}
acc.AddFields(
Expand Down
3 changes: 3 additions & 0 deletions plugins/inputs/elasticsearch/testdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const clusterHealthResponse = `
"timed_out": false,
"number_of_nodes": 3,
"number_of_data_nodes": 3,
"number_of_in_flight_fetch": 0,
"active_primary_shards": 5,
"active_shards": 15,
"relocating_shards": 0,
Expand All @@ -26,6 +27,7 @@ const clusterHealthResponseWithIndices = `
"timed_out": false,
"number_of_nodes": 3,
"number_of_data_nodes": 3,
"number_of_in_flight_fetch": 0,
"active_primary_shards": 5,
"active_shards": 15,
"relocating_shards": 0,
Expand Down Expand Up @@ -66,6 +68,7 @@ var clusterHealthExpected = map[string]interface{}{
"timed_out": false,
"number_of_nodes": 3,
"number_of_data_nodes": 3,
"number_of_in_flight_fetch": 0,
"active_primary_shards": 5,
"active_shards": 15,
"relocating_shards": 0,
Expand Down

0 comments on commit dbe04bd

Please sign in to comment.