diff --git a/receiver/kafkametricsreceiver/broker_scraper_test.go b/receiver/kafkametricsreceiver/broker_scraper_test.go index 040ab4c60a3d..b546d6c41dda 100644 --- a/receiver/kafkametricsreceiver/broker_scraper_test.go +++ b/receiver/kafkametricsreceiver/broker_scraper_test.go @@ -104,10 +104,38 @@ func TestBrokerScraper_scrape(t *testing.T) { require.NoError(t, bs.start(context.Background(), componenttest.NewNopHost())) md, err := bs.scrape(context.Background()) assert.NoError(t, err) - expectedDp := int64(len(testBrokers)) - receivedMetrics := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0) - receivedDp := receivedMetrics.Gauge().DataPoints().At(0).IntValue() - assert.Equal(t, expectedDp, receivedDp) + require.Equal(t, 1, md.ResourceMetrics().Len()) + require.Equal(t, 1, md.ResourceMetrics().At(0).ScopeMetrics().Len()) + ms := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + for i := 0; i < ms.Len(); i++ { + m := ms.At(i) + switch m.Name() { + case "kafka.brokers": + assert.Equal(t, m.Sum().DataPoints().At(0).IntValue(), int64(len(testBrokers))) + case "kafka.broker.count": + assert.Equal(t, m.Sum().DataPoints().At(0).IntValue(), int64(len(testBrokers))) + case "kafka.broker.consumer_fetch_rate": + assert.Equal(t, m.Gauge().DataPoints().At(0).DoubleValue(), int64(len(testBrokers))) + case "kafka.broker.incoming_byte_rate": + assert.Equal(t, m.Gauge().DataPoints().At(0).DoubleValue(), int64(len(testBrokers))) + case "kafka.broker.outgoing_byte_rate": + assert.Equal(t, m.Gauge().DataPoints().At(0).DoubleValue(), int64(len(testBrokers))) + case "kafka.broker.request_latency": + assert.Equal(t, m.Gauge().DataPoints().At(0).DoubleValue(), int64(len(testBrokers))) + case "kafka.broker.response_rate": + assert.Equal(t, m.Gauge().DataPoints().At(0).DoubleValue(), int64(len(testBrokers))) + case "kafka.broker.response_size": + assert.Equal(t, m.Gauge().DataPoints().At(0).DoubleValue(), int64(len(testBrokers))) + case "kafka.broker.request_rate": + assert.Equal(t, m.Gauge().DataPoints().At(0).DoubleValue(), int64(len(testBrokers))) + case "kafka.broker.request_size": + assert.Equal(t, m.Gauge().DataPoints().At(0).DoubleValue(), int64(len(testBrokers))) + case "kafka.broker.requests_in_flight": + assert.Equal(t, m.Sum().DataPoints().At(0).IntValue(), int64(len(testBrokers))) + case "kafka.broker.consumer_fetch_count": + assert.Equal(t, m.Sum().DataPoints().At(0).IntValue(), int64(len(testBrokers))) + } + } } func TestBrokersScraper_createBrokerScraper(t *testing.T) { diff --git a/receiver/kafkametricsreceiver/documentation.md b/receiver/kafkametricsreceiver/documentation.md index e23dacb86563..32d02f558606 100644 --- a/receiver/kafkametricsreceiver/documentation.md +++ b/receiver/kafkametricsreceiver/documentation.md @@ -14,11 +14,11 @@ metrics: ### kafka.brokers -[DEPRECATED] Number of brokers in the cluster. +[depracated] Number of brokers in the cluster. -| Unit | Metric Type | Value Type | -| ---- | ----------- | ---------- | -| {brokers} | Gauge | Int | +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| {broker} | Sum | Int | Cumulative | false | ### kafka.consumer_group.lag @@ -186,7 +186,7 @@ Count of consumer fetches | Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | | ---- | ----------- | ---------- | ----------------------- | --------- | -| {fetches} | Sum | Double | Cumulative | false | +| {fetches} | Sum | Int | Cumulative | false | #### Attributes @@ -214,15 +214,15 @@ Number of brokers in the cluster. | Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | | ---- | ----------- | ---------- | ----------------------- | --------- | -| {brokers} | Sum | Int | Cumulative | false | +| {broker} | Sum | Int | Cumulative | false | ### messaging.kafka.broker.incoming_byte_rate -Average tncoming Byte Rate in bytes/second +Average incoming Byte Rate in bytes/second | Unit | Metric Type | Value Type | | ---- | ----------- | ---------- | -| 1 | Gauge | Double | +| By/s | Gauge | Double | #### Attributes @@ -246,11 +246,11 @@ Average outgoing Byte Rate in bytes/second. ### messaging.kafka.broker.request_latency -Average request latency in ms +Average request latency in seconds | Unit | Metric Type | Value Type | | ---- | ----------- | ---------- | -| ms | Gauge | Double | +| s | Gauge | Double | #### Attributes diff --git a/receiver/kafkametricsreceiver/internal/metadata/generated_metrics.go b/receiver/kafkametricsreceiver/internal/metadata/generated_metrics.go index cdec46b37f4e..b262442eb4c4 100644 --- a/receiver/kafkametricsreceiver/internal/metadata/generated_metrics.go +++ b/receiver/kafkametricsreceiver/internal/metadata/generated_metrics.go @@ -20,16 +20,18 @@ type metricKafkaBrokers struct { // init fills kafka.brokers metric with initial data. func (m *metricKafkaBrokers) init() { m.data.SetName("kafka.brokers") - m.data.SetDescription("[DEPRECATED] Number of brokers in the cluster.") - m.data.SetUnit("{brokers}") - m.data.SetEmptyGauge() + m.data.SetDescription("[depracated] Number of brokers in the cluster.") + m.data.SetUnit("{broker}") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(false) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) } func (m *metricKafkaBrokers) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { if !m.config.Enabled { return } - dp := m.data.Gauge().DataPoints().AppendEmpty() + dp := m.data.Sum().DataPoints().AppendEmpty() dp.SetStartTimestamp(start) dp.SetTimestamp(ts) dp.SetIntValue(val) @@ -37,14 +39,14 @@ func (m *metricKafkaBrokers) recordDataPoint(start pcommon.Timestamp, ts pcommon // updateCapacity saves max length of data point slices that will be used for the slice capacity. func (m *metricKafkaBrokers) updateCapacity() { - if m.data.Gauge().DataPoints().Len() > m.capacity { - m.capacity = m.data.Gauge().DataPoints().Len() + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() } } // emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. func (m *metricKafkaBrokers) emit(metrics pmetric.MetricSlice) { - if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { m.updateCapacity() m.data.MoveTo(metrics.AppendEmpty()) m.init() @@ -605,14 +607,14 @@ func (m *metricMessagingKafkaBrokerConsumerFetchCount) init() { m.data.Sum().DataPoints().EnsureCapacity(m.capacity) } -func (m *metricMessagingKafkaBrokerConsumerFetchCount) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val float64, brokerAttributeValue int64) { +func (m *metricMessagingKafkaBrokerConsumerFetchCount) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, brokerAttributeValue int64) { if !m.config.Enabled { return } dp := m.data.Sum().DataPoints().AppendEmpty() dp.SetStartTimestamp(start) dp.SetTimestamp(ts) - dp.SetDoubleValue(val) + dp.SetIntValue(val) dp.Attributes().PutInt("broker", brokerAttributeValue) } @@ -702,7 +704,7 @@ type metricMessagingKafkaBrokerCount struct { func (m *metricMessagingKafkaBrokerCount) init() { m.data.SetName("messaging.kafka.broker.count") m.data.SetDescription("Number of brokers in the cluster.") - m.data.SetUnit("{brokers}") + m.data.SetUnit("{broker}") m.data.SetEmptySum() m.data.Sum().SetIsMonotonic(false) m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) @@ -752,8 +754,8 @@ type metricMessagingKafkaBrokerIncomingByteRate struct { // init fills messaging.kafka.broker.incoming_byte_rate metric with initial data. func (m *metricMessagingKafkaBrokerIncomingByteRate) init() { m.data.SetName("messaging.kafka.broker.incoming_byte_rate") - m.data.SetDescription("Average tncoming Byte Rate in bytes/second") - m.data.SetUnit("1") + m.data.SetDescription("Average incoming Byte Rate in bytes/second") + m.data.SetUnit("By/s") m.data.SetEmptyGauge() m.data.Gauge().DataPoints().EnsureCapacity(m.capacity) } @@ -854,8 +856,8 @@ type metricMessagingKafkaBrokerRequestLatency struct { // init fills messaging.kafka.broker.request_latency metric with initial data. func (m *metricMessagingKafkaBrokerRequestLatency) init() { m.data.SetName("messaging.kafka.broker.request_latency") - m.data.SetDescription("Average request latency in ms") - m.data.SetUnit("ms") + m.data.SetDescription("Average request latency in seconds") + m.data.SetUnit("s") m.data.SetEmptyGauge() m.data.Gauge().DataPoints().EnsureCapacity(m.capacity) } @@ -1411,7 +1413,7 @@ func (mb *MetricsBuilder) RecordKafkaTopicPartitionsDataPoint(ts pcommon.Timesta } // RecordMessagingKafkaBrokerConsumerFetchCountDataPoint adds a data point to messaging.kafka.broker.consumer_fetch_count metric. -func (mb *MetricsBuilder) RecordMessagingKafkaBrokerConsumerFetchCountDataPoint(ts pcommon.Timestamp, val float64, brokerAttributeValue int64) { +func (mb *MetricsBuilder) RecordMessagingKafkaBrokerConsumerFetchCountDataPoint(ts pcommon.Timestamp, val int64, brokerAttributeValue int64) { mb.metricMessagingKafkaBrokerConsumerFetchCount.recordDataPoint(mb.startTime, ts, val, brokerAttributeValue) } diff --git a/receiver/kafkametricsreceiver/internal/metadata/generated_metrics_test.go b/receiver/kafkametricsreceiver/internal/metadata/generated_metrics_test.go index dcb02051c03f..a017059fb83a 100644 --- a/receiver/kafkametricsreceiver/internal/metadata/generated_metrics_test.go +++ b/receiver/kafkametricsreceiver/internal/metadata/generated_metrics_test.go @@ -204,11 +204,13 @@ func TestMetricsBuilder(t *testing.T) { case "kafka.brokers": assert.False(t, validatedMetrics["kafka.brokers"], "Found a duplicate in the metrics slice: kafka.brokers") validatedMetrics["kafka.brokers"] = true - assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) - assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) - assert.Equal(t, "[DEPRECATED] Number of brokers in the cluster.", ms.At(i).Description()) - assert.Equal(t, "{brokers}", ms.At(i).Unit()) - dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "[depracated] Number of brokers in the cluster.", ms.At(i).Description()) + assert.Equal(t, "{broker}", ms.At(i).Unit()) + assert.Equal(t, false, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) assert.Equal(t, start, dp.StartTimestamp()) assert.Equal(t, ts, dp.Timestamp()) assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) @@ -413,8 +415,8 @@ func TestMetricsBuilder(t *testing.T) { dp := ms.At(i).Sum().DataPoints().At(0) assert.Equal(t, start, dp.StartTimestamp()) assert.Equal(t, ts, dp.Timestamp()) - assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) - assert.Equal(t, float64(1), dp.DoubleValue()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) attrVal, ok := dp.Attributes().Get("broker") assert.True(t, ok) assert.EqualValues(t, 6, attrVal.Int()) @@ -439,7 +441,7 @@ func TestMetricsBuilder(t *testing.T) { assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) assert.Equal(t, "Number of brokers in the cluster.", ms.At(i).Description()) - assert.Equal(t, "{brokers}", ms.At(i).Unit()) + assert.Equal(t, "{broker}", ms.At(i).Unit()) assert.Equal(t, false, ms.At(i).Sum().IsMonotonic()) assert.Equal(t, pmetric.AggregationTemporalityCumulative, ms.At(i).Sum().AggregationTemporality()) dp := ms.At(i).Sum().DataPoints().At(0) @@ -452,8 +454,8 @@ func TestMetricsBuilder(t *testing.T) { validatedMetrics["messaging.kafka.broker.incoming_byte_rate"] = true assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) - assert.Equal(t, "Average tncoming Byte Rate in bytes/second", ms.At(i).Description()) - assert.Equal(t, "1", ms.At(i).Unit()) + assert.Equal(t, "Average incoming Byte Rate in bytes/second", ms.At(i).Description()) + assert.Equal(t, "By/s", ms.At(i).Unit()) dp := ms.At(i).Gauge().DataPoints().At(0) assert.Equal(t, start, dp.StartTimestamp()) assert.Equal(t, ts, dp.Timestamp()) @@ -482,8 +484,8 @@ func TestMetricsBuilder(t *testing.T) { validatedMetrics["messaging.kafka.broker.request_latency"] = true assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) - assert.Equal(t, "Average request latency in ms", ms.At(i).Description()) - assert.Equal(t, "ms", ms.At(i).Unit()) + assert.Equal(t, "Average request latency in seconds", ms.At(i).Description()) + assert.Equal(t, "s", ms.At(i).Unit()) dp := ms.At(i).Gauge().DataPoints().At(0) assert.Equal(t, start, dp.StartTimestamp()) assert.Equal(t, ts, dp.Timestamp()) diff --git a/receiver/kafkametricsreceiver/metadata.yaml b/receiver/kafkametricsreceiver/metadata.yaml index 61e68271ef6e..812738a99b4c 100644 --- a/receiver/kafkametricsreceiver/metadata.yaml +++ b/receiver/kafkametricsreceiver/metadata.yaml @@ -27,17 +27,19 @@ metrics: # brokers scraper kafka.brokers: enabled: true - description: "[DEPRECATED] Number of brokers in the cluster." - unit: "{brokers}" - gauge: + description: '[depracated] Number of brokers in the cluster.' + unit: "{broker}" + sum: + monotonic: false value_type: int + aggregation_temporality: cumulative warnings: if_enabled: The metric is deprecated and will be removed. Use `messaging.kafka.broker.count` messaging.kafka.broker.count: enabled: false description: Number of brokers in the cluster. - unit: "{brokers}" + unit: "{broker}" sum: monotonic: false value_type: int @@ -55,8 +57,8 @@ metrics: if_enabled_not_set: This metric will be enabled by default in the next versions. messaging.kafka.broker.incoming_byte_rate: enabled: false - description: Average tncoming Byte Rate in bytes/second - unit: 1 + description: Average incoming Byte Rate in bytes/second + unit: By/s gauge: value_type: double attributes: [broker] @@ -73,8 +75,8 @@ metrics: if_enabled_not_set: This metric will be enabled by default in the next versions. messaging.kafka.broker.request_latency: enabled: false - description: Average request latency in ms - unit: "ms" + description: Average request latency in seconds + unit: "s" gauge: value_type: double attributes: [broker] @@ -130,7 +132,7 @@ metrics: description: Count of consumer fetches unit: "{fetches}" sum: - value_type: double + value_type: int aggregation_temporality: cumulative attributes: [broker] warnings: