From 74b6099ebf38b39eb777c3f87b7e7584515bfd7a Mon Sep 17 00:00:00 2001
From: Adam Boguszewski <108867528+aboguszewski-sumo@users.noreply.github.com>
Date: Fri, 11 Nov 2022 16:58:25 +0100
Subject: [PATCH] [receiver/elasticsearch]: add fielddata memory size metrics
on index level (#14922)
feat: add fielddata memory size metrics on index level
---
.chloggen/elasticsearch-fielddata.yaml | 16 +++
.../elasticsearchreceiver/documentation.md | 1 +
.../internal/metadata/generated_metrics.go | 66 +++++++++++++
receiver/elasticsearchreceiver/metadata.yaml | 9 ++
receiver/elasticsearchreceiver/scraper.go | 9 +-
.../elasticsearchreceiver/scraper_test.go | 1 +
.../testdata/expected_metrics/full.json | 98 +++++++++++++++++++
.../testdata/sample_payloads/indices.json | 8 +-
8 files changed, 203 insertions(+), 5 deletions(-)
create mode 100644 .chloggen/elasticsearch-fielddata.yaml
diff --git a/.chloggen/elasticsearch-fielddata.yaml b/.chloggen/elasticsearch-fielddata.yaml
new file mode 100644
index 000000000000..77f06a93439c
--- /dev/null
+++ b/.chloggen/elasticsearch-fielddata.yaml
@@ -0,0 +1,16 @@
+# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
+change_type: enhancement
+
+# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
+component: elasticsearchreceiver
+
+# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
+note: add fielddata memory size metrics on index level
+
+# One or more tracking issues related to the change
+issues: [14635]
+
+# (Optional) One or more lines of additional information to render under the primary note.
+# These lines will be padded with 2 spaces and then inserted directly into the document.
+# Use pipe (|) for multiline entries.
+subtext:
diff --git a/receiver/elasticsearchreceiver/documentation.md b/receiver/elasticsearchreceiver/documentation.md
index 1adc5190f163..88193b743dc0 100644
--- a/receiver/elasticsearchreceiver/documentation.md
+++ b/receiver/elasticsearchreceiver/documentation.md
@@ -22,6 +22,7 @@ These are the metrics available for this scraper.
| **elasticsearch.cluster.state_queue** | Number of cluster states in queue. | 1 | Sum(Int) |
- cluster_state_queue_state
|
| **elasticsearch.cluster.state_update.count** | The number of cluster state update attempts that changed the cluster state since the node started. | 1 | Sum(Int) | - cluster_state_update_state
|
| **elasticsearch.cluster.state_update.time** | The cumulative amount of time updating the cluster state since the node started. | ms | Sum(Int) | - cluster_state_update_state
- cluster_state_update_type
|
+| elasticsearch.index.cache.memory.usage | The size in bytes of the cache for an index. | By | Sum(Int) | - cache_name
- index_aggregation_type
|
| **elasticsearch.index.operations.completed** | The number of operations completed for an index. | {operations} | Sum(Int) | - operation
- index_aggregation_type
|
| elasticsearch.index.operations.merge.docs_count | The total number of documents in merge operations for an index. | {documents} | Sum(Int) | |
| elasticsearch.index.operations.merge.size | The total size of merged segments for an index. | By | Sum(Int) | |
diff --git a/receiver/elasticsearchreceiver/internal/metadata/generated_metrics.go b/receiver/elasticsearchreceiver/internal/metadata/generated_metrics.go
index 46d0222bc859..abdb326cdee0 100644
--- a/receiver/elasticsearchreceiver/internal/metadata/generated_metrics.go
+++ b/receiver/elasticsearchreceiver/internal/metadata/generated_metrics.go
@@ -51,6 +51,7 @@ type MetricsSettings struct {
ElasticsearchClusterStateQueue MetricSettings `mapstructure:"elasticsearch.cluster.state_queue"`
ElasticsearchClusterStateUpdateCount MetricSettings `mapstructure:"elasticsearch.cluster.state_update.count"`
ElasticsearchClusterStateUpdateTime MetricSettings `mapstructure:"elasticsearch.cluster.state_update.time"`
+ ElasticsearchIndexCacheMemoryUsage MetricSettings `mapstructure:"elasticsearch.index.cache.memory.usage"`
ElasticsearchIndexOperationsCompleted MetricSettings `mapstructure:"elasticsearch.index.operations.completed"`
ElasticsearchIndexOperationsMergeDocsCount MetricSettings `mapstructure:"elasticsearch.index.operations.merge.docs_count"`
ElasticsearchIndexOperationsMergeSize MetricSettings `mapstructure:"elasticsearch.index.operations.merge.size"`
@@ -163,6 +164,9 @@ func DefaultMetricsSettings() MetricsSettings {
ElasticsearchClusterStateUpdateTime: MetricSettings{
Enabled: true,
},
+ ElasticsearchIndexCacheMemoryUsage: MetricSettings{
+ Enabled: false,
+ },
ElasticsearchIndexOperationsCompleted: MetricSettings{
Enabled: true,
},
@@ -1665,6 +1669,60 @@ func newMetricElasticsearchClusterStateUpdateTime(settings MetricSettings) metri
return m
}
+type metricElasticsearchIndexCacheMemoryUsage struct {
+ data pmetric.Metric // data buffer for generated metric.
+ settings MetricSettings // metric settings provided by user.
+ capacity int // max observed number of data points added to the metric.
+}
+
+// init fills elasticsearch.index.cache.memory.usage metric with initial data.
+func (m *metricElasticsearchIndexCacheMemoryUsage) init() {
+ m.data.SetName("elasticsearch.index.cache.memory.usage")
+ m.data.SetDescription("The size in bytes of the cache for an index.")
+ m.data.SetUnit("By")
+ m.data.SetEmptySum()
+ m.data.Sum().SetIsMonotonic(false)
+ m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
+ m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
+}
+
+func (m *metricElasticsearchIndexCacheMemoryUsage) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, cacheNameAttributeValue string, indexAggregationTypeAttributeValue string) {
+ if !m.settings.Enabled {
+ return
+ }
+ dp := m.data.Sum().DataPoints().AppendEmpty()
+ dp.SetStartTimestamp(start)
+ dp.SetTimestamp(ts)
+ dp.SetIntValue(val)
+ dp.Attributes().PutStr("cache_name", cacheNameAttributeValue)
+ dp.Attributes().PutStr("aggregation", indexAggregationTypeAttributeValue)
+}
+
+// updateCapacity saves max length of data point slices that will be used for the slice capacity.
+func (m *metricElasticsearchIndexCacheMemoryUsage) updateCapacity() {
+ if m.data.Sum().DataPoints().Len() > m.capacity {
+ m.capacity = m.data.Sum().DataPoints().Len()
+ }
+}
+
+// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
+func (m *metricElasticsearchIndexCacheMemoryUsage) emit(metrics pmetric.MetricSlice) {
+ if m.settings.Enabled && m.data.Sum().DataPoints().Len() > 0 {
+ m.updateCapacity()
+ m.data.MoveTo(metrics.AppendEmpty())
+ m.init()
+ }
+}
+
+func newMetricElasticsearchIndexCacheMemoryUsage(settings MetricSettings) metricElasticsearchIndexCacheMemoryUsage {
+ m := metricElasticsearchIndexCacheMemoryUsage{settings: settings}
+ if settings.Enabled {
+ m.data = pmetric.NewMetric()
+ m.init()
+ }
+ return m
+}
+
type metricElasticsearchIndexOperationsCompleted struct {
data pmetric.Metric // data buffer for generated metric.
settings MetricSettings // metric settings provided by user.
@@ -5088,6 +5146,7 @@ type MetricsBuilder struct {
metricElasticsearchClusterStateQueue metricElasticsearchClusterStateQueue
metricElasticsearchClusterStateUpdateCount metricElasticsearchClusterStateUpdateCount
metricElasticsearchClusterStateUpdateTime metricElasticsearchClusterStateUpdateTime
+ metricElasticsearchIndexCacheMemoryUsage metricElasticsearchIndexCacheMemoryUsage
metricElasticsearchIndexOperationsCompleted metricElasticsearchIndexOperationsCompleted
metricElasticsearchIndexOperationsMergeDocsCount metricElasticsearchIndexOperationsMergeDocsCount
metricElasticsearchIndexOperationsMergeSize metricElasticsearchIndexOperationsMergeSize
@@ -5185,6 +5244,7 @@ func NewMetricsBuilder(settings MetricsSettings, buildInfo component.BuildInfo,
metricElasticsearchClusterStateQueue: newMetricElasticsearchClusterStateQueue(settings.ElasticsearchClusterStateQueue),
metricElasticsearchClusterStateUpdateCount: newMetricElasticsearchClusterStateUpdateCount(settings.ElasticsearchClusterStateUpdateCount),
metricElasticsearchClusterStateUpdateTime: newMetricElasticsearchClusterStateUpdateTime(settings.ElasticsearchClusterStateUpdateTime),
+ metricElasticsearchIndexCacheMemoryUsage: newMetricElasticsearchIndexCacheMemoryUsage(settings.ElasticsearchIndexCacheMemoryUsage),
metricElasticsearchIndexOperationsCompleted: newMetricElasticsearchIndexOperationsCompleted(settings.ElasticsearchIndexOperationsCompleted),
metricElasticsearchIndexOperationsMergeDocsCount: newMetricElasticsearchIndexOperationsMergeDocsCount(settings.ElasticsearchIndexOperationsMergeDocsCount),
metricElasticsearchIndexOperationsMergeSize: newMetricElasticsearchIndexOperationsMergeSize(settings.ElasticsearchIndexOperationsMergeSize),
@@ -5338,6 +5398,7 @@ func (mb *MetricsBuilder) EmitForResource(rmo ...ResourceMetricsOption) {
mb.metricElasticsearchClusterStateQueue.emit(ils.Metrics())
mb.metricElasticsearchClusterStateUpdateCount.emit(ils.Metrics())
mb.metricElasticsearchClusterStateUpdateTime.emit(ils.Metrics())
+ mb.metricElasticsearchIndexCacheMemoryUsage.emit(ils.Metrics())
mb.metricElasticsearchIndexOperationsCompleted.emit(ils.Metrics())
mb.metricElasticsearchIndexOperationsMergeDocsCount.emit(ils.Metrics())
mb.metricElasticsearchIndexOperationsMergeSize.emit(ils.Metrics())
@@ -5493,6 +5554,11 @@ func (mb *MetricsBuilder) RecordElasticsearchClusterStateUpdateTimeDataPoint(ts
mb.metricElasticsearchClusterStateUpdateTime.recordDataPoint(mb.startTime, ts, val, clusterStateUpdateStateAttributeValue, clusterStateUpdateTypeAttributeValue.String())
}
+// RecordElasticsearchIndexCacheMemoryUsageDataPoint adds a data point to elasticsearch.index.cache.memory.usage metric.
+func (mb *MetricsBuilder) RecordElasticsearchIndexCacheMemoryUsageDataPoint(ts pcommon.Timestamp, val int64, cacheNameAttributeValue AttributeCacheName, indexAggregationTypeAttributeValue AttributeIndexAggregationType) {
+ mb.metricElasticsearchIndexCacheMemoryUsage.recordDataPoint(mb.startTime, ts, val, cacheNameAttributeValue.String(), indexAggregationTypeAttributeValue.String())
+}
+
// RecordElasticsearchIndexOperationsCompletedDataPoint adds a data point to elasticsearch.index.operations.completed metric.
func (mb *MetricsBuilder) RecordElasticsearchIndexOperationsCompletedDataPoint(ts pcommon.Timestamp, val int64, operationAttributeValue AttributeOperation, indexAggregationTypeAttributeValue AttributeIndexAggregationType) {
mb.metricElasticsearchIndexOperationsCompleted.recordDataPoint(mb.startTime, ts, val, operationAttributeValue.String(), indexAggregationTypeAttributeValue.String())
diff --git a/receiver/elasticsearchreceiver/metadata.yaml b/receiver/elasticsearchreceiver/metadata.yaml
index 3cb84fb7cf4d..f411e2d62ed9 100644
--- a/receiver/elasticsearchreceiver/metadata.yaml
+++ b/receiver/elasticsearchreceiver/metadata.yaml
@@ -869,3 +869,12 @@ metrics:
value_type: int
attributes: [index_aggregation_type]
enabled: false
+ elasticsearch.index.cache.memory.usage:
+ description: The size in bytes of the cache for an index.
+ unit: By
+ sum:
+ monotonic: false
+ aggregation: cumulative
+ value_type: int
+ attributes: [cache_name, index_aggregation_type]
+ enabled: false
diff --git a/receiver/elasticsearchreceiver/scraper.go b/receiver/elasticsearchreceiver/scraper.go
index 37b9f0201979..62c09efea0d3 100644
--- a/receiver/elasticsearchreceiver/scraper.go
+++ b/receiver/elasticsearchreceiver/scraper.go
@@ -341,7 +341,7 @@ func (r *elasticsearchScraper) scrapeIndicesMetrics(ctx context.Context, now pco
indexStats, err := r.client.IndexStats(ctx, r.cfg.Indices)
if err != nil {
- errs.AddPartial(16, err)
+ errs.AddPartial(18, err)
return
}
@@ -452,5 +452,12 @@ func (r *elasticsearchScraper) scrapeOneIndexMetrics(now pcommon.Timestamp, name
now, stats.Total.TranslogStats.SizeInBy, metadata.AttributeIndexAggregationTypeTotal,
)
+ r.mb.RecordElasticsearchIndexCacheMemoryUsageDataPoint(
+ now, stats.Primaries.FieldDataCache.MemorySizeInBy, metadata.AttributeCacheNameFielddata, metadata.AttributeIndexAggregationTypePrimaryShards,
+ )
+ r.mb.RecordElasticsearchIndexCacheMemoryUsageDataPoint(
+ now, stats.Total.FieldDataCache.MemorySizeInBy, metadata.AttributeCacheNameFielddata, metadata.AttributeIndexAggregationTypeTotal,
+ )
+
r.mb.EmitForResource(metadata.WithElasticsearchIndexName(name), metadata.WithElasticsearchClusterName(r.clusterName))
}
diff --git a/receiver/elasticsearchreceiver/scraper_test.go b/receiver/elasticsearchreceiver/scraper_test.go
index e430bfa29463..4b32592d352d 100644
--- a/receiver/elasticsearchreceiver/scraper_test.go
+++ b/receiver/elasticsearchreceiver/scraper_test.go
@@ -55,6 +55,7 @@ func TestScraper(t *testing.T) {
config.Metrics.ElasticsearchIndexSegmentsMemory.Enabled = true
config.Metrics.ElasticsearchIndexTranslogOperations.Enabled = true
config.Metrics.ElasticsearchIndexTranslogSize.Enabled = true
+ config.Metrics.ElasticsearchIndexCacheMemoryUsage.Enabled = true
sc := newElasticSearchScraper(componenttest.NewNopReceiverCreateSettings(), config)
diff --git a/receiver/elasticsearchreceiver/testdata/expected_metrics/full.json b/receiver/elasticsearchreceiver/testdata/expected_metrics/full.json
index 80a3d73c9ed9..ae33c2f227b3 100644
--- a/receiver/elasticsearchreceiver/testdata/expected_metrics/full.json
+++ b/receiver/elasticsearchreceiver/testdata/expected_metrics/full.json
@@ -2958,6 +2958,55 @@
]
},
"unit": "By"
+ },
+ {
+ "description": "The size in bytes of the cache for an index.",
+ "name": "elasticsearch.index.cache.memory.usage",
+ "sum": {
+ "aggregationTemporality": "AGGREGATION_TEMPORALITY_CUMULATIVE",
+ "isMonotonic": false,
+ "dataPoints": [
+ {
+ "asInt": "3",
+ "attributes": [
+ {
+ "key": "cache_name",
+ "value": {
+ "stringValue": "fielddata"
+ }
+ },
+ {
+ "key": "aggregation",
+ "value": {
+ "stringValue": "primary_shards"
+ }
+ }
+ ],
+ "startTimeUnixNano": "1661811689941624000",
+ "timeUnixNano": "1661811689943245000"
+ },
+ {
+ "asInt": "3",
+ "attributes": [
+ {
+ "key": "cache_name",
+ "value": {
+ "stringValue": "fielddata"
+ }
+ },
+ {
+ "key": "aggregation",
+ "value": {
+ "stringValue": "total"
+ }
+ }
+ ],
+ "startTimeUnixNano": "1661811689941624000",
+ "timeUnixNano": "1661811689943245000"
+ }
+ ]
+ },
+ "unit": "By"
}
],
"scope": {
@@ -3460,6 +3509,55 @@
]
},
"unit": "By"
+ },
+ {
+ "description": "The size in bytes of the cache for an index.",
+ "name": "elasticsearch.index.cache.memory.usage",
+ "sum": {
+ "aggregationTemporality": "AGGREGATION_TEMPORALITY_CUMULATIVE",
+ "isMonotonic": false,
+ "dataPoints": [
+ {
+ "asInt": "3",
+ "attributes": [
+ {
+ "key": "cache_name",
+ "value": {
+ "stringValue": "fielddata"
+ }
+ },
+ {
+ "key": "aggregation",
+ "value": {
+ "stringValue": "primary_shards"
+ }
+ }
+ ],
+ "startTimeUnixNano": "1661811689941624000",
+ "timeUnixNano": "1661811689943245000"
+ },
+ {
+ "asInt": "3",
+ "attributes": [
+ {
+ "key": "cache_name",
+ "value": {
+ "stringValue": "fielddata"
+ }
+ },
+ {
+ "key": "aggregation",
+ "value": {
+ "stringValue": "total"
+ }
+ }
+ ],
+ "startTimeUnixNano": "1661811689941624000",
+ "timeUnixNano": "1661811689943245000"
+ }
+ ]
+ },
+ "unit": "By"
}
],
"scope": {
diff --git a/receiver/elasticsearchreceiver/testdata/sample_payloads/indices.json b/receiver/elasticsearchreceiver/testdata/sample_payloads/indices.json
index dbb5c0d6ef06..3a03d846e2d6 100644
--- a/receiver/elasticsearchreceiver/testdata/sample_payloads/indices.json
+++ b/receiver/elasticsearchreceiver/testdata/sample_payloads/indices.json
@@ -93,7 +93,7 @@
"evictions" : 0
},
"fielddata" : {
- "memory_size_in_bytes" : 0,
+ "memory_size_in_bytes" : 3,
"evictions" : 0
},
"completion" : {
@@ -221,7 +221,7 @@
"evictions" : 0
},
"fielddata" : {
- "memory_size_in_bytes" : 0,
+ "memory_size_in_bytes" : 3,
"evictions" : 0
},
"completion" : {
@@ -353,7 +353,7 @@
"evictions" : 0
},
"fielddata" : {
- "memory_size_in_bytes" : 0,
+ "memory_size_in_bytes" : 3,
"evictions" : 0
},
"completion" : {
@@ -481,7 +481,7 @@
"evictions" : 0
},
"fielddata" : {
- "memory_size_in_bytes" : 0,
+ "memory_size_in_bytes" : 3,
"evictions" : 0
},
"completion" : {