From 617953c9f494dd3aff780666d4dd6f52d1364b84 Mon Sep 17 00:00:00 2001 From: Adam Boguszewski Date: Thu, 17 Nov 2022 11:17:11 +0100 Subject: [PATCH 1/7] feat: add support for scraping cluster stats by the client --- receiver/elasticsearchreceiver/client.go | 22 ++ receiver/elasticsearchreceiver/client_test.go | 119 +++++++ .../internal/mocks/elasticsearchClient.go | 23 ++ .../internal/model/clusterstats.go | 46 +++ .../testdata/sample_payloads/cluster.json | 330 ++++++++++++++++++ 5 files changed, 540 insertions(+) create mode 100644 receiver/elasticsearchreceiver/internal/model/clusterstats.go create mode 100644 receiver/elasticsearchreceiver/testdata/sample_payloads/cluster.json diff --git a/receiver/elasticsearchreceiver/client.go b/receiver/elasticsearchreceiver/client.go index 8b51ded70d9f..f8f7e4a75098 100644 --- a/receiver/elasticsearchreceiver/client.go +++ b/receiver/elasticsearchreceiver/client.go @@ -42,6 +42,7 @@ type elasticsearchClient interface { ClusterHealth(ctx context.Context) (*model.ClusterHealth, error) IndexStats(ctx context.Context, indices []string) (*model.IndexStats, error) ClusterMetadata(ctx context.Context) (*model.ClusterMetadataResponse, error) + ClusterStats(ctx context.Context, nodes []string) (*model.ClusterStats, error) } // defaultElasticsearchClient is the main implementation of elasticsearchClient. @@ -154,6 +155,27 @@ func (c defaultElasticsearchClient) ClusterMetadata(ctx context.Context) (*model return &versionResponse, err } +func (c defaultElasticsearchClient) ClusterStats(ctx context.Context, nodes []string) (*model.ClusterStats, error) { + var nodesSpec string + if len(nodes) > 0 { + nodesSpec = strings.Join(nodes, ",") + } else { + nodesSpec = "_all" + } + + clusterStatsPath := fmt.Sprintf("_cluster/stats/%s", nodesSpec) + + body, err := c.doRequest(ctx, clusterStatsPath) + if err != nil { + return nil, err + } + + clusterStats := model.ClusterStats{} + err = json.Unmarshal(body, &clusterStats) + + return &clusterStats, err +} + func (c defaultElasticsearchClient) doRequest(ctx context.Context, path string) ([]byte, error) { endpoint, err := c.endpoint.Parse(path) if err != nil { diff --git a/receiver/elasticsearchreceiver/client_test.go b/receiver/elasticsearchreceiver/client_test.go index 953c4dd425a1..f45297ed1920 100644 --- a/receiver/elasticsearchreceiver/client_test.go +++ b/receiver/elasticsearchreceiver/client_test.go @@ -476,6 +476,116 @@ func TestIndexStatsBadAuthentication(t *testing.T) { require.ErrorIs(t, err, errUnauthorized) } +func TestClusterStatsNoPassword(t *testing.T) { + clusterJSON, err := os.ReadFile("./testdata/sample_payloads/cluster.json") + require.NoError(t, err) + + actualClusterStats := model.ClusterStats{} + require.NoError(t, json.Unmarshal(clusterJSON, &actualClusterStats)) + + elasticsearchMock := mockServer(t, "", "") + defer elasticsearchMock.Close() + + client, err := newElasticsearchClient(componenttest.NewNopTelemetrySettings(), Config{ + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: elasticsearchMock.URL, + }, + }, componenttest.NewNopHost()) + require.NoError(t, err) + ctx := context.Background() + clusterStats, err := client.ClusterStats(ctx, []string{"_all"}) + require.NoError(t, err) + + require.Equal(t, &actualClusterStats, clusterStats) +} + +func TestClusterStatsNilNodes(t *testing.T) { + clusterJSON, err := os.ReadFile("./testdata/sample_payloads/cluster.json") + require.NoError(t, err) + + actualClusterStats := model.ClusterStats{} + require.NoError(t, json.Unmarshal(clusterJSON, &actualClusterStats)) + + elasticsearchMock := mockServer(t, "", "") + defer elasticsearchMock.Close() + + client, err := newElasticsearchClient(componenttest.NewNopTelemetrySettings(), Config{ + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: elasticsearchMock.URL, + }, + }, componenttest.NewNopHost()) + require.NoError(t, err) + + ctx := context.Background() + clusterStats, err := client.ClusterStats(ctx, nil) + require.NoError(t, err) + + require.Equal(t, &actualClusterStats, clusterStats) +} + +func TestClusterStatsAuthentication(t *testing.T) { + clusterJSON, err := os.ReadFile("./testdata/sample_payloads/cluster.json") + require.NoError(t, err) + + actualClusterStats := model.ClusterStats{} + require.NoError(t, json.Unmarshal(clusterJSON, &actualClusterStats)) + + username := "user" + password := "pass" + + elasticsearchMock := mockServer(t, username, password) + defer elasticsearchMock.Close() + + client, err := newElasticsearchClient(componenttest.NewNopTelemetrySettings(), Config{ + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: elasticsearchMock.URL, + }, + Username: username, + Password: password, + }, componenttest.NewNopHost()) + require.NoError(t, err) + + ctx := context.Background() + clusterStats, err := client.ClusterStats(ctx, []string{"_all"}) + require.NoError(t, err) + + require.Equal(t, &actualClusterStats, clusterStats) +} + +func TestClusterStatsNoAuthentication(t *testing.T) { + elasticsearchMock := mockServer(t, "user", "pass") + defer elasticsearchMock.Close() + + client, err := newElasticsearchClient(componenttest.NewNopTelemetrySettings(), Config{ + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: elasticsearchMock.URL, + }, + }, componenttest.NewNopHost()) + require.NoError(t, err) + + ctx := context.Background() + _, err = client.ClusterStats(ctx, []string{"_all"}) + require.ErrorIs(t, err, errUnauthenticated) +} + +func TestClusterStatsBadAuthentication(t *testing.T) { + elasticsearchMock := mockServer(t, "user", "pass") + defer elasticsearchMock.Close() + + client, err := newElasticsearchClient(componenttest.NewNopTelemetrySettings(), Config{ + HTTPClientSettings: confighttp.HTTPClientSettings{ + Endpoint: elasticsearchMock.URL, + }, + Username: "bad_user", + Password: "bad_pass", + }, componenttest.NewNopHost()) + require.NoError(t, err) + + ctx := context.Background() + _, err = client.ClusterStats(ctx, []string{"_all"}) + require.ErrorIs(t, err, errUnauthorized) +} + // mockServer gives a mock elasticsearch server for testing; if username or password is included, they will be required for the client. // otherwise, authorization is ignored. func mockServer(t *testing.T, username, password string) *httptest.Server { @@ -487,6 +597,8 @@ func mockServer(t *testing.T, username, password string) *httptest.Server { require.NoError(t, err) metadata, err := os.ReadFile("./testdata/sample_payloads/metadata.json") require.NoError(t, err) + cluster, err := os.ReadFile("./testdata/sample_payloads/cluster.json") + require.NoError(t, err) elasticsearchMock := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { if username != "" || password != "" { @@ -521,6 +633,13 @@ func mockServer(t *testing.T, username, password string) *httptest.Server { return } + if strings.HasPrefix(req.URL.Path, "/_cluster/stats") { + rw.WriteHeader(200) + _, err = rw.Write(cluster) + require.NoError(t, err) + return + } + // metadata check if req.URL.Path == "/" { rw.WriteHeader(200) diff --git a/receiver/elasticsearchreceiver/internal/mocks/elasticsearchClient.go b/receiver/elasticsearchreceiver/internal/mocks/elasticsearchClient.go index 4852e632018c..10c290bdff8d 100644 --- a/receiver/elasticsearchreceiver/internal/mocks/elasticsearchClient.go +++ b/receiver/elasticsearchreceiver/internal/mocks/elasticsearchClient.go @@ -61,6 +61,29 @@ func (_m *MockElasticsearchClient) ClusterMetadata(ctx context.Context) (*model. return r0, r1 } +// ClusterStats provides a mock function with given fields: ctx, nodes +func (_m *MockElasticsearchClient) ClusterStats(ctx context.Context, nodes []string) (*model.ClusterStats, error) { + ret := _m.Called(ctx, nodes) + + var r0 *model.ClusterStats + if rf, ok := ret.Get(0).(func(context.Context, []string) *model.ClusterStats); ok { + r0 = rf(ctx, nodes) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*model.ClusterStats) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, []string) error); ok { + r1 = rf(ctx, nodes) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // IndexStats provides a mock function with given fields: ctx, indices func (_m *MockElasticsearchClient) IndexStats(ctx context.Context, indices []string) (*model.IndexStats, error) { ret := _m.Called(ctx, indices) diff --git a/receiver/elasticsearchreceiver/internal/model/clusterstats.go b/receiver/elasticsearchreceiver/internal/model/clusterstats.go new file mode 100644 index 000000000000..340341442e22 --- /dev/null +++ b/receiver/elasticsearchreceiver/internal/model/clusterstats.go @@ -0,0 +1,46 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package model // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/elasticsearchreceiver/internal/model" + +// ClusterStats represents a response from elasticsearch's /_cluster/stats endpoint. +// The struct is not exhaustive; It does not provide all values returned by elasticsearch, +// only the ones relevant to the metrics retrieved by the scraper. +// The structures don't match the ones used in nodes and index responses. +type ClusterStats struct { + ClusterName string `json:"cluster_name"` + IndicesStats ClusterIndicesStats `json:"indices"` + NodesStats ClusterNodesStats `json:"nodes"` +} + +type ClusterIndicesStats struct { + FieldDataCache ClusterCacheInfo `json:"fielddata"` + QueryCache ClusterCacheInfo `json:"query_cache"` +} + +type ClusterCacheInfo struct { + Evictions int64 `json:"evictions"` +} + +type ClusterNodesStats struct { + JVMInfo ClusterNodesJVMInfo `json:"jvm"` +} + +type ClusterNodesJVMInfo struct { + JVMMemoryInfo ClusterNodesJVMMemInfo `json:"mem"` +} + +type ClusterNodesJVMMemInfo struct { + HeapUsedInBy int64 `json:"heap_used_in_bytes"` +} diff --git a/receiver/elasticsearchreceiver/testdata/sample_payloads/cluster.json b/receiver/elasticsearchreceiver/testdata/sample_payloads/cluster.json new file mode 100644 index 000000000000..cbc2ab1152fa --- /dev/null +++ b/receiver/elasticsearchreceiver/testdata/sample_payloads/cluster.json @@ -0,0 +1,330 @@ +{ + "_nodes" : { + "total" : 1, + "successful" : 1, + "failed" : 0 + }, + "cluster_name" : "docker-cluster", + "cluster_uuid" : "A8Fcp6KfTweCBKzFBkVe9w", + "timestamp" : 1668503134584, + "status" : "yellow", + "indices" : { + "count" : 7, + "shards" : { + "total" : 7, + "primaries" : 7, + "replication" : 0.0, + "index" : { + "shards" : { + "min" : 1, + "max" : 1, + "avg" : 1.0 + }, + "primaries" : { + "min" : 1, + "max" : 1, + "avg" : 1.0 + }, + "replication" : { + "min" : 0.0, + "max" : 0.0, + "avg" : 0.0 + } + } + }, + "docs" : { + "count" : 16, + "deleted" : 0 + }, + "store" : { + "size" : "87kb", + "size_in_bytes" : 89166, + "total_data_set_size" : "87kb", + "total_data_set_size_in_bytes" : 89166, + "reserved" : "0b", + "reserved_in_bytes" : 0 + }, + "fielddata" : { + "memory_size" : "0b", + "memory_size_in_bytes" : 0, + "evictions" : 2 + }, + "query_cache" : { + "memory_size" : "0b", + "memory_size_in_bytes" : 0, + "total_count" : 0, + "hit_count" : 0, + "miss_count" : 0, + "cache_size" : 0, + "cache_count" : 0, + "evictions" : 3 + }, + "completion" : { + "size" : "0b", + "size_in_bytes" : 0 + }, + "segments" : { + "count" : 10, + "memory" : "29.9kb", + "memory_in_bytes" : 30656, + "terms_memory" : "22.5kb", + "terms_memory_in_bytes" : 23040, + "stored_fields_memory" : "4.7kb", + "stored_fields_memory_in_bytes" : 4880, + "term_vectors_memory" : "488b", + "term_vectors_memory_in_bytes" : 488, + "norms_memory" : "1.1kb", + "norms_memory_in_bytes" : 1216, + "points_memory" : "0b", + "points_memory_in_bytes" : 0, + "doc_values_memory" : "1kb", + "doc_values_memory_in_bytes" : 1032, + "index_writer_memory" : "0b", + "index_writer_memory_in_bytes" : 0, + "version_map_memory" : "0b", + "version_map_memory_in_bytes" : 0, + "fixed_bit_set" : "0b", + "fixed_bit_set_memory_in_bytes" : 0, + "max_unsafe_auto_id_timestamp" : -1, + "file_sizes" : { } + }, + "mappings" : { + "field_types" : [ + { + "name" : "boolean", + "count" : 1, + "index_count" : 1, + "script_count" : 0 + }, + { + "name" : "constant_keyword", + "count" : 3, + "index_count" : 1, + "script_count" : 0 + }, + { + "name" : "date", + "count" : 7, + "index_count" : 3, + "script_count" : 0 + }, + { + "name" : "float", + "count" : 1, + "index_count" : 1, + "script_count" : 0 + }, + { + "name" : "ip", + "count" : 1, + "index_count" : 1, + "script_count" : 0 + }, + { + "name" : "keyword", + "count" : 30, + "index_count" : 5, + "script_count" : 0 + }, + { + "name" : "long", + "count" : 2, + "index_count" : 2, + "script_count" : 0 + }, + { + "name" : "object", + "count" : 14, + "index_count" : 3, + "script_count" : 0 + }, + { + "name" : "point", + "count" : 1, + "index_count" : 1, + "script_count" : 0 + }, + { + "name" : "text", + "count" : 17, + "index_count" : 6, + "script_count" : 0 + } + ], + "runtime_field_types" : [ ] + }, + "analysis" : { + "char_filter_types" : [ ], + "tokenizer_types" : [ ], + "filter_types" : [ ], + "analyzer_types" : [ + { + "name" : "custom", + "count" : 1, + "index_count" : 1 + } + ], + "built_in_char_filters" : [ ], + "built_in_tokenizers" : [ + { + "name" : "whitespace", + "count" : 1, + "index_count" : 1 + } + ], + "built_in_filters" : [ + { + "name" : "lowercase", + "count" : 1, + "index_count" : 1 + }, + { + "name" : "type_as_payload", + "count" : 1, + "index_count" : 1 + } + ], + "built_in_analyzers" : [ ] + }, + "versions" : [ + { + "version" : "7.17.6", + "index_count" : 7, + "primary_shard_count" : 7, + "total_primary_size" : "87kb", + "total_primary_bytes" : 89166 + } + ] + }, + "nodes" : { + "count" : { + "total" : 1, + "coordinating_only" : 0, + "data" : 1, + "data_cold" : 1, + "data_content" : 1, + "data_frozen" : 1, + "data_hot" : 1, + "data_warm" : 1, + "ingest" : 1, + "master" : 1, + "ml" : 1, + "remote_cluster_client" : 1, + "transform" : 1, + "voting_only" : 0 + }, + "versions" : [ + "7.17.6" + ], + "os" : { + "available_processors" : 2, + "allocated_processors" : 2, + "names" : [ + { + "name" : "Linux", + "count" : 1 + } + ], + "pretty_names" : [ + { + "pretty_name" : "Ubuntu 20.04.4 LTS", + "count" : 1 + } + ], + "architectures" : [ + { + "arch" : "amd64", + "count" : 1 + } + ], + "mem" : { + "total" : "7.7gb", + "total_in_bytes" : 8349937664, + "free" : "1.1gb", + "free_in_bytes" : 1257656320, + "used" : "6.6gb", + "used_in_bytes" : 7092281344, + "free_percent" : 15, + "used_percent" : 85 + } + }, + "process" : { + "cpu" : { + "percent" : 0 + }, + "open_file_descriptors" : { + "min" : 308, + "max" : 308, + "avg" : 308 + } + }, + "jvm" : { + "max_uptime" : "6.3m", + "max_uptime_in_millis" : 379751, + "versions" : [ + { + "version" : "18.0.2", + "vm_name" : "OpenJDK 64-Bit Server VM", + "vm_version" : "18.0.2+9-61", + "vm_vendor" : "Oracle Corporation", + "bundled_jdk" : true, + "using_bundled_jdk" : true, + "count" : 1 + } + ], + "mem" : { + "heap_used" : "271.9mb", + "heap_used_in_bytes" : 285158912, + "heap_max" : "3.8gb", + "heap_max_in_bytes" : 4177526784 + }, + "threads" : 40 + }, + "fs" : { + "total" : "58.7gb", + "total_in_bytes" : 63089455104, + "free" : "21.7gb", + "free_in_bytes" : 23328845824, + "available" : "18.7gb", + "available_in_bytes" : 20090896384 + }, + "plugins" : [ ], + "network_types" : { + "transport_types" : { + "security4" : 1 + }, + "http_types" : { + "security4" : 1 + } + }, + "discovery_types" : { + "single-node" : 1 + }, + "packaging_types" : [ + { + "flavor" : "default", + "type" : "docker", + "count" : 1 + } + ], + "ingest" : { + "number_of_pipelines" : 1, + "processor_stats" : { + "gsub" : { + "count" : 0, + "failed" : 0, + "current" : 0, + "time" : "0s", + "time_in_millis" : 0 + }, + "script" : { + "count" : 0, + "failed" : 0, + "current" : 0, + "time" : "0s", + "time_in_millis" : 0 + } + } + } + } +} From 39108867c62ca3f105bf0a44f1c6c0c6a8a831af Mon Sep 17 00:00:00 2001 From: Adam Boguszewski Date: Thu, 17 Nov 2022 12:40:52 +0100 Subject: [PATCH 2/7] feat: add config option for cluster-level metrics --- receiver/elasticsearchreceiver/config.go | 6 +++++- receiver/elasticsearchreceiver/config_test.go | 7 ++++--- receiver/elasticsearchreceiver/factory.go | 7 ++++--- receiver/elasticsearchreceiver/testdata/config.yaml | 1 + 4 files changed, 14 insertions(+), 7 deletions(-) diff --git a/receiver/elasticsearchreceiver/config.go b/receiver/elasticsearchreceiver/config.go index 382ebb4986c8..6f14c64f37f8 100644 --- a/receiver/elasticsearchreceiver/config.go +++ b/receiver/elasticsearchreceiver/config.go @@ -47,7 +47,11 @@ type Config struct { // See https://www.elastic.co/guide/en/elasticsearch/reference/7.9/cluster.html#cluster-nodes for which selectors may be used here. // If Nodes is empty, no nodes will be scraped. Nodes []string `mapstructure:"nodes"` - // SkipClusterMetrics indicates whether cluster level metrics from /_cluster/health should be scraped or not. + // ClusterNodes defines the nodes to scrape from /_cluster/stats endpoint. + // See https://www.elastic.co/guide/en/elasticsearch/reference/7.9/cluster.html#cluster-nodes for which selectors may be used here. + // If ClusterNodes is empty, no nodes will be scraped. + ClusterMetricsNodes []string `mapstructure:"cluster_metrics_nodes"` + // SkipClusterMetrics indicates whether cluster level metrics from /_cluster/* endpoints should be scraped or not. SkipClusterMetrics bool `mapstructure:"skip_cluster_metrics"` // Indices defines the indices to scrape. // See https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-stats.html#index-stats-api-path-params diff --git a/receiver/elasticsearchreceiver/config_test.go b/receiver/elasticsearchreceiver/config_test.go index b5f141016073..c384b1325199 100644 --- a/receiver/elasticsearchreceiver/config_test.go +++ b/receiver/elasticsearchreceiver/config_test.go @@ -169,9 +169,10 @@ func TestLoadConfig(t *testing.T) { { id: component.NewIDWithName(typeStr, ""), expected: &Config{ - SkipClusterMetrics: true, - Nodes: []string{"_local"}, - Indices: []string{".geoip_databases"}, + SkipClusterMetrics: true, + Nodes: []string{"_local"}, + ClusterMetricsNodes: []string{"_local2"}, + Indices: []string{".geoip_databases"}, ScraperControllerSettings: scraperhelper.ScraperControllerSettings{ ReceiverSettings: config.NewReceiverSettings(component.NewID(typeStr)), CollectionInterval: 2 * time.Minute, diff --git a/receiver/elasticsearchreceiver/factory.go b/receiver/elasticsearchreceiver/factory.go index 30be17ef4a2f..3ed17219b39a 100644 --- a/receiver/elasticsearchreceiver/factory.go +++ b/receiver/elasticsearchreceiver/factory.go @@ -54,9 +54,10 @@ func createDefaultConfig() component.Config { Endpoint: defaultEndpoint, Timeout: defaultHTTPClientTimeout, }, - Metrics: metadata.DefaultMetricsSettings(), - Nodes: []string{"_all"}, - Indices: []string{"_all"}, + Metrics: metadata.DefaultMetricsSettings(), + Nodes: []string{"_all"}, + ClusterMetricsNodes: []string{"_all"}, + Indices: []string{"_all"}, } } diff --git a/receiver/elasticsearchreceiver/testdata/config.yaml b/receiver/elasticsearchreceiver/testdata/config.yaml index 3c71cc01f508..fab2f0451aec 100644 --- a/receiver/elasticsearchreceiver/testdata/config.yaml +++ b/receiver/elasticsearchreceiver/testdata/config.yaml @@ -4,6 +4,7 @@ elasticsearch: elasticsearch.node.fs.disk.available: enabled: false nodes: [ "_local" ] + cluster_metrics_nodes: ["_local2"] skip_cluster_metrics: true indices: [ ".geoip_databases" ] endpoint: http://example.com:9200 From 6a2a9acfd80160429ded52618d9e2fd3d4dd9f52 Mon Sep 17 00:00:00 2001 From: Adam Boguszewski Date: Thu, 17 Nov 2022 13:18:49 +0100 Subject: [PATCH 3/7] chore: extract scraping cluster health metrics to a separate function --- receiver/elasticsearchreceiver/scraper.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/receiver/elasticsearchreceiver/scraper.go b/receiver/elasticsearchreceiver/scraper.go index 2d8a8d8fd62e..ba818258507d 100644 --- a/receiver/elasticsearchreceiver/scraper.go +++ b/receiver/elasticsearchreceiver/scraper.go @@ -350,6 +350,12 @@ func (r *elasticsearchScraper) scrapeClusterMetrics(ctx context.Context, now pco return } + r.scrapeClusterHealthMetrics(ctx, now, errs) + + r.mb.EmitForResource(metadata.WithElasticsearchClusterName(r.clusterName)) +} + +func (r *elasticsearchScraper) scrapeClusterHealthMetrics(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { clusterHealth, err := r.client.ClusterHealth(ctx) if err != nil { errs.AddPartial(4, err) @@ -389,8 +395,6 @@ func (r *elasticsearchScraper) scrapeClusterMetrics(ctx context.Context, now pco default: errs.AddPartial(1, fmt.Errorf("health status %s: %w", clusterHealth.Status, errUnknownClusterStatus)) } - - r.mb.EmitForResource(metadata.WithElasticsearchClusterName(clusterHealth.ClusterName)) } func (r *elasticsearchScraper) scrapeIndicesMetrics(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { From 536ed3b8c1a2b219e2462212ba350db41d60deff Mon Sep 17 00:00:00 2001 From: Adam Boguszewski Date: Thu, 17 Nov 2022 13:25:24 +0100 Subject: [PATCH 4/7] feat: add scraping cluster stats metrics --- receiver/elasticsearchreceiver/scraper.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/receiver/elasticsearchreceiver/scraper.go b/receiver/elasticsearchreceiver/scraper.go index ba818258507d..9684195793e9 100644 --- a/receiver/elasticsearchreceiver/scraper.go +++ b/receiver/elasticsearchreceiver/scraper.go @@ -351,10 +351,23 @@ func (r *elasticsearchScraper) scrapeClusterMetrics(ctx context.Context, now pco } r.scrapeClusterHealthMetrics(ctx, now, errs) + r.scrapeClusterStatsMetrics(ctx, now, errs) r.mb.EmitForResource(metadata.WithElasticsearchClusterName(r.clusterName)) } +func (r *elasticsearchScraper) scrapeClusterStatsMetrics(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { + if len(r.cfg.ClusterMetricsNodes) == 0 { + return + } + + _, err := r.client.ClusterStats(ctx, r.cfg.ClusterMetricsNodes) + if err != nil { + errs.AddPartial(0, err) + return + } +} + func (r *elasticsearchScraper) scrapeClusterHealthMetrics(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { clusterHealth, err := r.client.ClusterHealth(ctx) if err != nil { From d11a3f2f6e806645fe8683d0cc202e5d855437ab Mon Sep 17 00:00:00 2001 From: Adam Boguszewski Date: Thu, 17 Nov 2022 13:42:33 +0100 Subject: [PATCH 5/7] feat: add emitting cluster-level metrics --- .../elasticsearchreceiver/documentation.md | 14 ++++ .../internal/metadata/generated_metrics.go | 65 +++++++++++++++++++ .../metadata/generated_metrics_test.go | 22 +++++++ receiver/elasticsearchreceiver/metadata.yaml | 9 +++ receiver/elasticsearchreceiver/scraper.go | 13 +++- .../elasticsearchreceiver/scraper_test.go | 25 ++++++- .../testdata/expected_metrics/full.json | 51 +++++++++++++++ .../testdata/expected_metrics/noNodes.json | 14 ++++ .../testdata/integration/expected.7_16_3.json | 14 ++++ .../testdata/integration/expected.7_9_3.json | 14 ++++ 10 files changed, 237 insertions(+), 4 deletions(-) diff --git a/receiver/elasticsearchreceiver/documentation.md b/receiver/elasticsearchreceiver/documentation.md index bbcfcbb8428f..79808abb158c 100644 --- a/receiver/elasticsearchreceiver/documentation.md +++ b/receiver/elasticsearchreceiver/documentation.md @@ -775,6 +775,20 @@ metrics: enabled: true ``` +### elasticsearch.cluster.indices.cache.evictions + +The number of evictions from the cache for indices in cluster. + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| {evictions} | Sum | Int | Cumulative | true | + +#### Attributes + +| Name | Description | Values | +| ---- | ----------- | ------ | +| cache_name | The name of cache. | Str: ``fielddata``, ``query`` | + ### elasticsearch.index.cache.evictions The number of evictions from the cache for an index. diff --git a/receiver/elasticsearchreceiver/internal/metadata/generated_metrics.go b/receiver/elasticsearchreceiver/internal/metadata/generated_metrics.go index 81c323dc6cd0..929476a5847b 100644 --- a/receiver/elasticsearchreceiver/internal/metadata/generated_metrics.go +++ b/receiver/elasticsearchreceiver/internal/metadata/generated_metrics.go @@ -43,6 +43,7 @@ type MetricsSettings struct { ElasticsearchClusterDataNodes MetricSettings `mapstructure:"elasticsearch.cluster.data_nodes"` ElasticsearchClusterHealth MetricSettings `mapstructure:"elasticsearch.cluster.health"` ElasticsearchClusterInFlightFetch MetricSettings `mapstructure:"elasticsearch.cluster.in_flight_fetch"` + ElasticsearchClusterIndicesCacheEvictions MetricSettings `mapstructure:"elasticsearch.cluster.indices.cache.evictions"` ElasticsearchClusterNodes MetricSettings `mapstructure:"elasticsearch.cluster.nodes"` ElasticsearchClusterPendingTasks MetricSettings `mapstructure:"elasticsearch.cluster.pending_tasks"` ElasticsearchClusterPublishedStatesDifferences MetricSettings `mapstructure:"elasticsearch.cluster.published_states.differences"` @@ -144,6 +145,9 @@ func DefaultMetricsSettings() MetricsSettings { ElasticsearchClusterInFlightFetch: MetricSettings{ Enabled: true, }, + ElasticsearchClusterIndicesCacheEvictions: MetricSettings{ + Enabled: false, + }, ElasticsearchClusterNodes: MetricSettings{ Enabled: true, }, @@ -1274,6 +1278,59 @@ func newMetricElasticsearchClusterInFlightFetch(settings MetricSettings) metricE return m } +type metricElasticsearchClusterIndicesCacheEvictions struct { + data pmetric.Metric // data buffer for generated metric. + settings MetricSettings // metric settings provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills elasticsearch.cluster.indices.cache.evictions metric with initial data. +func (m *metricElasticsearchClusterIndicesCacheEvictions) init() { + m.data.SetName("elasticsearch.cluster.indices.cache.evictions") + m.data.SetDescription("The number of evictions from the cache for indices in cluster.") + m.data.SetUnit("{evictions}") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(true) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + m.data.Sum().DataPoints().EnsureCapacity(m.capacity) +} + +func (m *metricElasticsearchClusterIndicesCacheEvictions) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, cacheNameAttributeValue string) { + if !m.settings.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) + dp.Attributes().PutStr("cache_name", cacheNameAttributeValue) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricElasticsearchClusterIndicesCacheEvictions) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricElasticsearchClusterIndicesCacheEvictions) emit(metrics pmetric.MetricSlice) { + if m.settings.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricElasticsearchClusterIndicesCacheEvictions(settings MetricSettings) metricElasticsearchClusterIndicesCacheEvictions { + m := metricElasticsearchClusterIndicesCacheEvictions{settings: settings} + if settings.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + type metricElasticsearchClusterNodes struct { data pmetric.Metric // data buffer for generated metric. settings MetricSettings // metric settings provided by user. @@ -5376,6 +5433,7 @@ type MetricsBuilder struct { metricElasticsearchClusterDataNodes metricElasticsearchClusterDataNodes metricElasticsearchClusterHealth metricElasticsearchClusterHealth metricElasticsearchClusterInFlightFetch metricElasticsearchClusterInFlightFetch + metricElasticsearchClusterIndicesCacheEvictions metricElasticsearchClusterIndicesCacheEvictions metricElasticsearchClusterNodes metricElasticsearchClusterNodes metricElasticsearchClusterPendingTasks metricElasticsearchClusterPendingTasks metricElasticsearchClusterPublishedStatesDifferences metricElasticsearchClusterPublishedStatesDifferences @@ -5478,6 +5536,7 @@ func NewMetricsBuilder(settings MetricsSettings, buildInfo component.BuildInfo, metricElasticsearchClusterDataNodes: newMetricElasticsearchClusterDataNodes(settings.ElasticsearchClusterDataNodes), metricElasticsearchClusterHealth: newMetricElasticsearchClusterHealth(settings.ElasticsearchClusterHealth), metricElasticsearchClusterInFlightFetch: newMetricElasticsearchClusterInFlightFetch(settings.ElasticsearchClusterInFlightFetch), + metricElasticsearchClusterIndicesCacheEvictions: newMetricElasticsearchClusterIndicesCacheEvictions(settings.ElasticsearchClusterIndicesCacheEvictions), metricElasticsearchClusterNodes: newMetricElasticsearchClusterNodes(settings.ElasticsearchClusterNodes), metricElasticsearchClusterPendingTasks: newMetricElasticsearchClusterPendingTasks(settings.ElasticsearchClusterPendingTasks), metricElasticsearchClusterPublishedStatesDifferences: newMetricElasticsearchClusterPublishedStatesDifferences(settings.ElasticsearchClusterPublishedStatesDifferences), @@ -5636,6 +5695,7 @@ func (mb *MetricsBuilder) EmitForResource(rmo ...ResourceMetricsOption) { mb.metricElasticsearchClusterDataNodes.emit(ils.Metrics()) mb.metricElasticsearchClusterHealth.emit(ils.Metrics()) mb.metricElasticsearchClusterInFlightFetch.emit(ils.Metrics()) + mb.metricElasticsearchClusterIndicesCacheEvictions.emit(ils.Metrics()) mb.metricElasticsearchClusterNodes.emit(ils.Metrics()) mb.metricElasticsearchClusterPendingTasks.emit(ils.Metrics()) mb.metricElasticsearchClusterPublishedStatesDifferences.emit(ils.Metrics()) @@ -5764,6 +5824,11 @@ func (mb *MetricsBuilder) RecordElasticsearchClusterInFlightFetchDataPoint(ts pc mb.metricElasticsearchClusterInFlightFetch.recordDataPoint(mb.startTime, ts, val) } +// RecordElasticsearchClusterIndicesCacheEvictionsDataPoint adds a data point to elasticsearch.cluster.indices.cache.evictions metric. +func (mb *MetricsBuilder) RecordElasticsearchClusterIndicesCacheEvictionsDataPoint(ts pcommon.Timestamp, val int64, cacheNameAttributeValue AttributeCacheName) { + mb.metricElasticsearchClusterIndicesCacheEvictions.recordDataPoint(mb.startTime, ts, val, cacheNameAttributeValue.String()) +} + // RecordElasticsearchClusterNodesDataPoint adds a data point to elasticsearch.cluster.nodes metric. func (mb *MetricsBuilder) RecordElasticsearchClusterNodesDataPoint(ts pcommon.Timestamp, val int64) { mb.metricElasticsearchClusterNodes.recordDataPoint(mb.startTime, ts, val) diff --git a/receiver/elasticsearchreceiver/internal/metadata/generated_metrics_test.go b/receiver/elasticsearchreceiver/internal/metadata/generated_metrics_test.go index 59a276e8af1e..34cf17f17ed8 100644 --- a/receiver/elasticsearchreceiver/internal/metadata/generated_metrics_test.go +++ b/receiver/elasticsearchreceiver/internal/metadata/generated_metrics_test.go @@ -36,6 +36,8 @@ func TestDefaultMetrics(t *testing.T) { enabledMetrics["elasticsearch.cluster.in_flight_fetch"] = true mb.RecordElasticsearchClusterInFlightFetchDataPoint(ts, 1) + mb.RecordElasticsearchClusterIndicesCacheEvictionsDataPoint(ts, 1, AttributeCacheName(1)) + enabledMetrics["elasticsearch.cluster.nodes"] = true mb.RecordElasticsearchClusterNodesDataPoint(ts, 1) @@ -283,6 +285,7 @@ func TestAllMetrics(t *testing.T) { ElasticsearchClusterDataNodes: MetricSettings{Enabled: true}, ElasticsearchClusterHealth: MetricSettings{Enabled: true}, ElasticsearchClusterInFlightFetch: MetricSettings{Enabled: true}, + ElasticsearchClusterIndicesCacheEvictions: MetricSettings{Enabled: true}, ElasticsearchClusterNodes: MetricSettings{Enabled: true}, ElasticsearchClusterPendingTasks: MetricSettings{Enabled: true}, ElasticsearchClusterPublishedStatesDifferences: MetricSettings{Enabled: true}, @@ -371,6 +374,7 @@ func TestAllMetrics(t *testing.T) { mb.RecordElasticsearchClusterDataNodesDataPoint(ts, 1) mb.RecordElasticsearchClusterHealthDataPoint(ts, 1, AttributeHealthStatus(1)) mb.RecordElasticsearchClusterInFlightFetchDataPoint(ts, 1) + mb.RecordElasticsearchClusterIndicesCacheEvictionsDataPoint(ts, 1, AttributeCacheName(1)) mb.RecordElasticsearchClusterNodesDataPoint(ts, 1) mb.RecordElasticsearchClusterPendingTasksDataPoint(ts, 1) mb.RecordElasticsearchClusterPublishedStatesDifferencesDataPoint(ts, 1, AttributeClusterPublishedDifferenceState(1)) @@ -565,6 +569,22 @@ func TestAllMetrics(t *testing.T) { assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) assert.Equal(t, int64(1), dp.IntValue()) validatedMetrics["elasticsearch.cluster.in_flight_fetch"] = struct{}{} + case "elasticsearch.cluster.indices.cache.evictions": + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "The number of evictions from the cache for indices in cluster.", ms.At(i).Description()) + assert.Equal(t, "{evictions}", ms.At(i).Unit()) + assert.Equal(t, true, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + attrVal, ok := dp.Attributes().Get("cache_name") + assert.True(t, ok) + assert.Equal(t, "fielddata", attrVal.Str()) + validatedMetrics["elasticsearch.cluster.indices.cache.evictions"] = struct{}{} case "elasticsearch.cluster.nodes": assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) @@ -1725,6 +1745,7 @@ func TestNoMetrics(t *testing.T) { ElasticsearchClusterDataNodes: MetricSettings{Enabled: false}, ElasticsearchClusterHealth: MetricSettings{Enabled: false}, ElasticsearchClusterInFlightFetch: MetricSettings{Enabled: false}, + ElasticsearchClusterIndicesCacheEvictions: MetricSettings{Enabled: false}, ElasticsearchClusterNodes: MetricSettings{Enabled: false}, ElasticsearchClusterPendingTasks: MetricSettings{Enabled: false}, ElasticsearchClusterPublishedStatesDifferences: MetricSettings{Enabled: false}, @@ -1812,6 +1833,7 @@ func TestNoMetrics(t *testing.T) { mb.RecordElasticsearchClusterDataNodesDataPoint(ts, 1) mb.RecordElasticsearchClusterHealthDataPoint(ts, 1, AttributeHealthStatus(1)) mb.RecordElasticsearchClusterInFlightFetchDataPoint(ts, 1) + mb.RecordElasticsearchClusterIndicesCacheEvictionsDataPoint(ts, 1, AttributeCacheName(1)) mb.RecordElasticsearchClusterNodesDataPoint(ts, 1) mb.RecordElasticsearchClusterPendingTasksDataPoint(ts, 1) mb.RecordElasticsearchClusterPublishedStatesDifferencesDataPoint(ts, 1, AttributeClusterPublishedDifferenceState(1)) diff --git a/receiver/elasticsearchreceiver/metadata.yaml b/receiver/elasticsearchreceiver/metadata.yaml index 10fd373e80cf..62bb0e80b8de 100644 --- a/receiver/elasticsearchreceiver/metadata.yaml +++ b/receiver/elasticsearchreceiver/metadata.yaml @@ -724,6 +724,15 @@ metrics: value_type: int attributes: [ cluster_state_update_state, cluster_state_update_type ] enabled: true + elasticsearch.cluster.indices.cache.evictions: + description: The number of evictions from the cache for indices in cluster. + unit: "{evictions}" + sum: + monotonic: true + aggregation: cumulative + value_type: int + attributes: [cache_name] + enabled: false elasticsearch.node.ingest.documents: description: Total number of documents ingested during the lifetime of this node. unit: "{documents}" diff --git a/receiver/elasticsearchreceiver/scraper.go b/receiver/elasticsearchreceiver/scraper.go index 9684195793e9..3b020d709af8 100644 --- a/receiver/elasticsearchreceiver/scraper.go +++ b/receiver/elasticsearchreceiver/scraper.go @@ -361,11 +361,20 @@ func (r *elasticsearchScraper) scrapeClusterStatsMetrics(ctx context.Context, no return } - _, err := r.client.ClusterStats(ctx, r.cfg.ClusterMetricsNodes) + clusterStats, err := r.client.ClusterStats(ctx, r.cfg.ClusterMetricsNodes) if err != nil { - errs.AddPartial(0, err) + errs.AddPartial(3, err) return } + + r.mb.RecordJvmMemoryHeapUsedDataPoint(now, clusterStats.NodesStats.JVMInfo.JVMMemoryInfo.HeapUsedInBy) + + r.mb.RecordElasticsearchClusterIndicesCacheEvictionsDataPoint( + now, clusterStats.IndicesStats.FieldDataCache.Evictions, metadata.AttributeCacheNameFielddata, + ) + r.mb.RecordElasticsearchClusterIndicesCacheEvictionsDataPoint( + now, clusterStats.IndicesStats.QueryCache.Evictions, metadata.AttributeCacheNameQuery, + ) } func (r *elasticsearchScraper) scrapeClusterHealthMetrics(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { diff --git a/receiver/elasticsearchreceiver/scraper_test.go b/receiver/elasticsearchreceiver/scraper_test.go index ed9114a66cbc..209b708977c4 100644 --- a/receiver/elasticsearchreceiver/scraper_test.go +++ b/receiver/elasticsearchreceiver/scraper_test.go @@ -72,6 +72,8 @@ func TestScraper(t *testing.T) { config.Metrics.ElasticsearchIndexCacheEvictions.Enabled = true config.Metrics.ElasticsearchIndexDocuments.Enabled = true + config.Metrics.ElasticsearchClusterIndicesCacheEvictions.Enabled = true + sc := newElasticSearchScraper(componenttest.NewNopReceiverCreateSettings(), config) err := sc.start(context.Background(), componenttest.NewNopHost()) @@ -80,6 +82,7 @@ func TestScraper(t *testing.T) { mockClient := mocks.MockElasticsearchClient{} mockClient.On("ClusterMetadata", mock.Anything).Return(clusterMetadata(t), nil) mockClient.On("ClusterHealth", mock.Anything).Return(clusterHealth(t), nil) + mockClient.On("ClusterStats", mock.Anything, []string{"_all"}).Return(clusterStats(t), nil) mockClient.On("NodeStats", mock.Anything, []string{"_all"}).Return(nodeStats(t), nil) mockClient.On("IndexStats", mock.Anything, []string{"_all"}).Return(indexStats(t), nil) @@ -108,6 +111,7 @@ func TestScraperSkipClusterMetrics(t *testing.T) { mockClient := mocks.MockElasticsearchClient{} mockClient.On("ClusterMetadata", mock.Anything).Return(clusterMetadata(t), nil) mockClient.On("ClusterHealth", mock.Anything).Return(clusterHealth(t), nil) + mockClient.On("ClusterStats", mock.Anything, []string{}).Return(clusterStats(t), nil) mockClient.On("NodeStats", mock.Anything, []string{"_all"}).Return(nodeStats(t), nil) mockClient.On("IndexStats", mock.Anything, []string{"_all"}).Return(indexStats(t), nil) @@ -136,6 +140,7 @@ func TestScraperNoNodesMetrics(t *testing.T) { mockClient := mocks.MockElasticsearchClient{} mockClient.On("ClusterMetadata", mock.Anything).Return(clusterMetadata(t), nil) mockClient.On("ClusterHealth", mock.Anything).Return(clusterHealth(t), nil) + mockClient.On("ClusterStats", mock.Anything, []string{"_all"}).Return(clusterStats(t), nil) mockClient.On("NodeStats", mock.Anything, []string{}).Return(nodeStats(t), nil) mockClient.On("IndexStats", mock.Anything, []string{"_all"}).Return(indexStats(t), nil) @@ -189,6 +194,7 @@ func TestScrapingError(t *testing.T) { mockClient.On("ClusterMetadata", mock.Anything).Return(clusterMetadata(t), nil) mockClient.On("NodeStats", mock.Anything, []string{"_all"}).Return(nil, err404) mockClient.On("ClusterHealth", mock.Anything).Return(clusterHealth(t), nil) + mockClient.On("ClusterStats", mock.Anything, []string{"_all"}).Return(clusterStats(t), nil) mockClient.On("IndexStats", mock.Anything, []string{"_all"}).Return(indexStats(t), nil) sc := newElasticSearchScraper(componenttest.NewNopReceiverCreateSettings(), createDefaultConfig().(*Config)) @@ -214,6 +220,7 @@ func TestScrapingError(t *testing.T) { mockClient.On("ClusterMetadata", mock.Anything).Return(clusterMetadata(t), nil) mockClient.On("NodeStats", mock.Anything, []string{"_all"}).Return(nodeStats(t), nil) mockClient.On("ClusterHealth", mock.Anything).Return(nil, err404) + mockClient.On("ClusterStats", mock.Anything, []string{"_all"}).Return(clusterStats(t), nil) mockClient.On("IndexStats", mock.Anything, []string{"_all"}).Return(indexStats(t), nil) sc := newElasticSearchScraper(componenttest.NewNopReceiverCreateSettings(), createDefaultConfig().(*Config)) @@ -229,7 +236,7 @@ func TestScrapingError(t *testing.T) { }, }, { - desc: "Node stats, index stats and cluster health fails", + desc: "Node stats, index stats, cluster stats and cluster health fails", run: func(t *testing.T) { t.Parallel() @@ -240,6 +247,7 @@ func TestScrapingError(t *testing.T) { mockClient.On("ClusterMetadata", mock.Anything).Return(clusterMetadata(t), nil) mockClient.On("NodeStats", mock.Anything, []string{"_all"}).Return(nil, err500) mockClient.On("ClusterHealth", mock.Anything).Return(nil, err404) + mockClient.On("ClusterStats", mock.Anything, []string{"_all"}).Return(nil, err404) mockClient.On("IndexStats", mock.Anything, []string{"_all"}).Return(nil, err500) sc := newElasticSearchScraper(componenttest.NewNopReceiverCreateSettings(), createDefaultConfig().(*Config)) @@ -266,6 +274,7 @@ func TestScrapingError(t *testing.T) { mockClient.On("ClusterMetadata", mock.Anything).Return(nil, err404) mockClient.On("NodeStats", mock.Anything, []string{"_all"}).Return(nodeStats(t), nil) mockClient.On("ClusterHealth", mock.Anything).Return(clusterHealth(t), nil) + mockClient.On("ClusterStats", mock.Anything, []string{"_all"}).Return(clusterStats(t), nil) mockClient.On("IndexStats", mock.Anything, []string{"_all"}).Return(indexStats(t), nil) sc := newElasticSearchScraper(componenttest.NewNopReceiverCreateSettings(), createDefaultConfig().(*Config)) @@ -280,7 +289,7 @@ func TestScrapingError(t *testing.T) { }, }, { - desc: "ClusterMetadata, node stats, index stats and cluster health fails", + desc: "ClusterMetadata, node stats, index stats, cluster stats and cluster health fail", run: func(t *testing.T) { t.Parallel() @@ -292,6 +301,7 @@ func TestScrapingError(t *testing.T) { mockClient.On("NodeStats", mock.Anything, []string{"_all"}).Return(nil, err500) mockClient.On("ClusterHealth", mock.Anything).Return(nil, err404) mockClient.On("IndexStats", mock.Anything, []string{"_all"}).Return(nil, err500) + mockClient.On("ClusterStats", mock.Anything, []string{"_all"}).Return(nil, err500) sc := newElasticSearchScraper(componenttest.NewNopReceiverCreateSettings(), createDefaultConfig().(*Config)) err := sc.start(context.Background(), componenttest.NewNopHost()) @@ -318,6 +328,7 @@ func TestScrapingError(t *testing.T) { mockClient.On("ClusterMetadata", mock.Anything).Return(clusterMetadata(t), nil) mockClient.On("NodeStats", mock.Anything, []string{"_all"}).Return(nodeStats(t), nil) mockClient.On("ClusterHealth", mock.Anything).Return(ch, nil) + mockClient.On("ClusterStats", mock.Anything, []string{"_all"}).Return(clusterStats(t), nil) mockClient.On("IndexStats", mock.Anything, []string{"_all"}).Return(indexStats(t), nil) sc := newElasticSearchScraper(componenttest.NewNopReceiverCreateSettings(), createDefaultConfig().(*Config)) @@ -348,6 +359,16 @@ func clusterHealth(t *testing.T) *model.ClusterHealth { return &clusterHealth } +func clusterStats(t *testing.T) *model.ClusterStats { + statsJSON, err := os.ReadFile("./testdata/sample_payloads/cluster.json") + require.NoError(t, err) + + clusterStats := model.ClusterStats{} + require.NoError(t, json.Unmarshal(statsJSON, &clusterStats)) + + return &clusterStats +} + func nodeStats(t *testing.T) *model.NodeStats { nodeJSON, err := os.ReadFile("./testdata/sample_payloads/nodes_linux.json") require.NoError(t, err) diff --git a/receiver/elasticsearchreceiver/testdata/expected_metrics/full.json b/receiver/elasticsearchreceiver/testdata/expected_metrics/full.json index 7cbc04fd8301..a773305a8f03 100644 --- a/receiver/elasticsearchreceiver/testdata/expected_metrics/full.json +++ b/receiver/elasticsearchreceiver/testdata/expected_metrics/full.json @@ -2545,6 +2545,57 @@ ] }, "unit": "{shards}" + }, + { + "description": "The current heap memory usage", + "gauge": { + "dataPoints": [ + { + "asInt": "285158912", + "startTimeUnixNano": "1661811689941624000", + "timeUnixNano": "1661811689943245000" + } + ] + }, + "name": "jvm.memory.heap.used", + "unit": "By" + }, + { + "description": "The number of evictions from the cache for indices in cluster.", + "name": "elasticsearch.cluster.indices.cache.evictions", + "sum": { + "aggregationTemporality": "AGGREGATION_TEMPORALITY_CUMULATIVE", + "isMonotonic": true, + "dataPoints": [ + { + "asInt": "2", + "attributes": [ + { + "key": "cache_name", + "value": { + "stringValue": "fielddata" + } + } + ], + "startTimeUnixNano": "1661811689941624000", + "timeUnixNano": "1661811689943245000" + }, + { + "asInt": "3", + "attributes": [ + { + "key": "cache_name", + "value": { + "stringValue": "query" + } + } + ], + "startTimeUnixNano": "1661811689941624000", + "timeUnixNano": "1661811689943245000" + } + ] + }, + "unit": "{evictions}" } ], "scope": { diff --git a/receiver/elasticsearchreceiver/testdata/expected_metrics/noNodes.json b/receiver/elasticsearchreceiver/testdata/expected_metrics/noNodes.json index 6b11c943b060..687611fed4f5 100644 --- a/receiver/elasticsearchreceiver/testdata/expected_metrics/noNodes.json +++ b/receiver/elasticsearchreceiver/testdata/expected_metrics/noNodes.json @@ -210,6 +210,20 @@ ] }, "unit": "{shards}" + }, + { + "description": "The current heap memory usage", + "gauge": { + "dataPoints": [ + { + "asInt": "285158912", + "startTimeUnixNano": "1661811689941624000", + "timeUnixNano": "1661811689943245000" + } + ] + }, + "name": "jvm.memory.heap.used", + "unit": "By" } ], "scope": { diff --git a/receiver/elasticsearchreceiver/testdata/integration/expected.7_16_3.json b/receiver/elasticsearchreceiver/testdata/integration/expected.7_16_3.json index 7bcce5df5824..12c9d8a3a2e8 100644 --- a/receiver/elasticsearchreceiver/testdata/integration/expected.7_16_3.json +++ b/receiver/elasticsearchreceiver/testdata/integration/expected.7_16_3.json @@ -5278,6 +5278,20 @@ ] }, "unit": "{shards}" + }, + { + "description": "The current heap memory usage", + "gauge": { + "dataPoints": [ + { + "asInt": "285158912", + "startTimeUnixNano": "1661811689941624000", + "timeUnixNano": "1661811689943245000" + } + ] + }, + "name": "jvm.memory.heap.used", + "unit": "By" } ], "scope": { diff --git a/receiver/elasticsearchreceiver/testdata/integration/expected.7_9_3.json b/receiver/elasticsearchreceiver/testdata/integration/expected.7_9_3.json index 8bdbd220fd19..05f5ef2ee775 100644 --- a/receiver/elasticsearchreceiver/testdata/integration/expected.7_9_3.json +++ b/receiver/elasticsearchreceiver/testdata/integration/expected.7_9_3.json @@ -4059,6 +4059,20 @@ ] }, "unit": "{shards}" + }, + { + "description": "The current heap memory usage", + "gauge": { + "dataPoints": [ + { + "asInt": "285158912", + "startTimeUnixNano": "1661811689941624000", + "timeUnixNano": "1661811689943245000" + } + ] + }, + "name": "jvm.memory.heap.used", + "unit": "By" } ], "scope": { From 6f8f20623fbe0d7f5b53d8641f0f036957e5320b Mon Sep 17 00:00:00 2001 From: Adam Boguszewski Date: Thu, 17 Nov 2022 14:18:50 +0100 Subject: [PATCH 6/7] chore: add changelog --- .chloggen/elastic-cluster-level-metrics.yaml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 .chloggen/elastic-cluster-level-metrics.yaml diff --git a/.chloggen/elastic-cluster-level-metrics.yaml b/.chloggen/elastic-cluster-level-metrics.yaml new file mode 100644 index 000000000000..b430b2ced30b --- /dev/null +++ b/.chloggen/elastic-cluster-level-metrics.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: add scraping metrics on cluster level + +# One or more tracking issues related to the change +issues: [14635] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: The receiver now emits jvm and cache eviction metrics on cluster level scraped from new endpoint /_cluster/stats. From f3f48b83815efd20002cb2bafa13e694d972ffd7 Mon Sep 17 00:00:00 2001 From: Adam Boguszewski Date: Thu, 1 Dec 2022 13:51:27 +0100 Subject: [PATCH 7/7] remove additional config --- receiver/elasticsearchreceiver/README.md | 2 +- receiver/elasticsearchreceiver/config.go | 4 ---- receiver/elasticsearchreceiver/config_test.go | 7 +++---- receiver/elasticsearchreceiver/factory.go | 7 +++---- receiver/elasticsearchreceiver/scraper.go | 4 ++-- receiver/elasticsearchreceiver/scraper_test.go | 2 +- .../elasticsearchreceiver/testdata/config.yaml | 1 - .../testdata/expected_metrics/noNodes.json | 14 -------------- 8 files changed, 10 insertions(+), 31 deletions(-) diff --git a/receiver/elasticsearchreceiver/README.md b/receiver/elasticsearchreceiver/README.md index 574447f66ea0..1c7a00723621 100644 --- a/receiver/elasticsearchreceiver/README.md +++ b/receiver/elasticsearchreceiver/README.md @@ -19,7 +19,7 @@ See the [Elasticsearch docs](https://www.elastic.co/guide/en/elasticsearch/refer The following settings are optional: - `metrics` (default: see `DefaultMetricsSettings` [here](./internal/metadata/generated_metrics.go): Allows enabling and disabling specific metrics from being collected in this receiver. -- `nodes` (default: `["_all"]`): Allows specifying node filters that define which nodes are scraped for node-level metrics. See [the Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/7.9/cluster.html#cluster-nodes) for allowed filters. If this option is left explicitly empty, then no node-level metrics will be scraped. +- `nodes` (default: `["_all"]`): Allows specifying node filters that define which nodes are scraped for node-level and cluster-level metrics. See [the Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/7.9/cluster.html#cluster-nodes) for allowed filters. If this option is left explicitly empty, then no node-level metrics will be scraped and cluster-level metrics will scrape only metrics related to cluster's health. - `skip_cluster_metrics` (default: `false`): If true, cluster-level metrics will not be scraped. - `indices` (default: `["_all"]`): Allows specifying index filters that define which indices are scraped for index-level metrics. See [the Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-stats.html#index-stats-api-path-params) for allowed filters. If this option is left explicitly empty, then no index-level metrics will be scraped. - `endpoint` (default = `http://localhost:9200`): The base URL of the Elasticsearch API for the cluster to monitor. diff --git a/receiver/elasticsearchreceiver/config.go b/receiver/elasticsearchreceiver/config.go index 6f14c64f37f8..a3fa9817fa52 100644 --- a/receiver/elasticsearchreceiver/config.go +++ b/receiver/elasticsearchreceiver/config.go @@ -47,10 +47,6 @@ type Config struct { // See https://www.elastic.co/guide/en/elasticsearch/reference/7.9/cluster.html#cluster-nodes for which selectors may be used here. // If Nodes is empty, no nodes will be scraped. Nodes []string `mapstructure:"nodes"` - // ClusterNodes defines the nodes to scrape from /_cluster/stats endpoint. - // See https://www.elastic.co/guide/en/elasticsearch/reference/7.9/cluster.html#cluster-nodes for which selectors may be used here. - // If ClusterNodes is empty, no nodes will be scraped. - ClusterMetricsNodes []string `mapstructure:"cluster_metrics_nodes"` // SkipClusterMetrics indicates whether cluster level metrics from /_cluster/* endpoints should be scraped or not. SkipClusterMetrics bool `mapstructure:"skip_cluster_metrics"` // Indices defines the indices to scrape. diff --git a/receiver/elasticsearchreceiver/config_test.go b/receiver/elasticsearchreceiver/config_test.go index c384b1325199..b5f141016073 100644 --- a/receiver/elasticsearchreceiver/config_test.go +++ b/receiver/elasticsearchreceiver/config_test.go @@ -169,10 +169,9 @@ func TestLoadConfig(t *testing.T) { { id: component.NewIDWithName(typeStr, ""), expected: &Config{ - SkipClusterMetrics: true, - Nodes: []string{"_local"}, - ClusterMetricsNodes: []string{"_local2"}, - Indices: []string{".geoip_databases"}, + SkipClusterMetrics: true, + Nodes: []string{"_local"}, + Indices: []string{".geoip_databases"}, ScraperControllerSettings: scraperhelper.ScraperControllerSettings{ ReceiverSettings: config.NewReceiverSettings(component.NewID(typeStr)), CollectionInterval: 2 * time.Minute, diff --git a/receiver/elasticsearchreceiver/factory.go b/receiver/elasticsearchreceiver/factory.go index 3ed17219b39a..30be17ef4a2f 100644 --- a/receiver/elasticsearchreceiver/factory.go +++ b/receiver/elasticsearchreceiver/factory.go @@ -54,10 +54,9 @@ func createDefaultConfig() component.Config { Endpoint: defaultEndpoint, Timeout: defaultHTTPClientTimeout, }, - Metrics: metadata.DefaultMetricsSettings(), - Nodes: []string{"_all"}, - ClusterMetricsNodes: []string{"_all"}, - Indices: []string{"_all"}, + Metrics: metadata.DefaultMetricsSettings(), + Nodes: []string{"_all"}, + Indices: []string{"_all"}, } } diff --git a/receiver/elasticsearchreceiver/scraper.go b/receiver/elasticsearchreceiver/scraper.go index 3b020d709af8..04f0999c65bd 100644 --- a/receiver/elasticsearchreceiver/scraper.go +++ b/receiver/elasticsearchreceiver/scraper.go @@ -357,11 +357,11 @@ func (r *elasticsearchScraper) scrapeClusterMetrics(ctx context.Context, now pco } func (r *elasticsearchScraper) scrapeClusterStatsMetrics(ctx context.Context, now pcommon.Timestamp, errs *scrapererror.ScrapeErrors) { - if len(r.cfg.ClusterMetricsNodes) == 0 { + if len(r.cfg.Nodes) == 0 { return } - clusterStats, err := r.client.ClusterStats(ctx, r.cfg.ClusterMetricsNodes) + clusterStats, err := r.client.ClusterStats(ctx, r.cfg.Nodes) if err != nil { errs.AddPartial(3, err) return diff --git a/receiver/elasticsearchreceiver/scraper_test.go b/receiver/elasticsearchreceiver/scraper_test.go index 209b708977c4..61dc63e908f2 100644 --- a/receiver/elasticsearchreceiver/scraper_test.go +++ b/receiver/elasticsearchreceiver/scraper_test.go @@ -140,7 +140,7 @@ func TestScraperNoNodesMetrics(t *testing.T) { mockClient := mocks.MockElasticsearchClient{} mockClient.On("ClusterMetadata", mock.Anything).Return(clusterMetadata(t), nil) mockClient.On("ClusterHealth", mock.Anything).Return(clusterHealth(t), nil) - mockClient.On("ClusterStats", mock.Anything, []string{"_all"}).Return(clusterStats(t), nil) + mockClient.On("ClusterStats", mock.Anything, []string{}).Return(clusterStats(t), nil) mockClient.On("NodeStats", mock.Anything, []string{}).Return(nodeStats(t), nil) mockClient.On("IndexStats", mock.Anything, []string{"_all"}).Return(indexStats(t), nil) diff --git a/receiver/elasticsearchreceiver/testdata/config.yaml b/receiver/elasticsearchreceiver/testdata/config.yaml index fab2f0451aec..3c71cc01f508 100644 --- a/receiver/elasticsearchreceiver/testdata/config.yaml +++ b/receiver/elasticsearchreceiver/testdata/config.yaml @@ -4,7 +4,6 @@ elasticsearch: elasticsearch.node.fs.disk.available: enabled: false nodes: [ "_local" ] - cluster_metrics_nodes: ["_local2"] skip_cluster_metrics: true indices: [ ".geoip_databases" ] endpoint: http://example.com:9200 diff --git a/receiver/elasticsearchreceiver/testdata/expected_metrics/noNodes.json b/receiver/elasticsearchreceiver/testdata/expected_metrics/noNodes.json index 687611fed4f5..6b11c943b060 100644 --- a/receiver/elasticsearchreceiver/testdata/expected_metrics/noNodes.json +++ b/receiver/elasticsearchreceiver/testdata/expected_metrics/noNodes.json @@ -210,20 +210,6 @@ ] }, "unit": "{shards}" - }, - { - "description": "The current heap memory usage", - "gauge": { - "dataPoints": [ - { - "asInt": "285158912", - "startTimeUnixNano": "1661811689941624000", - "timeUnixNano": "1661811689943245000" - } - ] - }, - "name": "jvm.memory.heap.used", - "unit": "By" } ], "scope": {