Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: expand kafka broker metrics #24259

Closed
wants to merge 56 commits into from
Closed
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
9b7332f
feat: update kafka connect for NR
jcountsNR Jul 13, 2023
1212594
Merge pull request #1 from jcountsNR/test
jcountsNR Jul 13, 2023
21b3de0
Merge branch 'main' into main
jcountsNR Jul 25, 2023
f671fec
Merge branch 'main' into main
jcountsNR Jul 25, 2023
fdd6ac6
Merge branch 'main' into main
jcountsNR Jul 25, 2023
aa373ce
Merge branch 'open-telemetry:main' into main
jcountsNR Jul 26, 2023
ffcda1d
Merge branch 'open-telemetry:main' into main
jcountsNR Jul 26, 2023
277a38d
chore: attempt to fix sorting order 1
jcountsNR Jul 26, 2023
863cfd6
chore: enable brokers.count and disable brokers
jcountsNR Jul 26, 2023
12b6be5
chore: fix unit test issues
jcountsNR Jul 27, 2023
7f3dfa4
Merge branch 'main' into main
jcountsNR Jul 27, 2023
44a7d7d
fix
jcountsNR Jul 27, 2023
bac4529
fix test
jcountsNR Jul 27, 2023
b1f7f4f
Merge branch 'main' into main
jcountsNR Jul 27, 2023
69fa596
Merge branch 'open-telemetry:main' into main
jcountsNR Jul 31, 2023
9b5ce27
chore: generate run
jcountsNR Jul 31, 2023
a85f2c3
fix: fix linting issue
jcountsNR Jul 31, 2023
3d24044
chore: updated to messaging.kafka
jcountsNR Jul 31, 2023
f5ab9a2
Merge branch 'main' into main
jcountsNR Jul 31, 2023
075d149
fix: ran make generate again
jcountsNR Jul 31, 2023
7d92fea
Merge branch 'main' into main
jcountsNR Aug 1, 2023
25d0e71
chore: add messaging to scraper
jcountsNR Aug 1, 2023
06048d4
Merge branch 'main' into main
jcountsNR Aug 1, 2023
4a3a149
chore: run make gen
jcountsNR Aug 2, 2023
2616287
Merge branch 'main' into main
jcountsNR Aug 3, 2023
46cc504
chore: resolve merge conflict/description
jcountsNR Aug 3, 2023
0c09dd1
chore: make updates based on semantic convention
jcountsNR Aug 4, 2023
12f7942
Merge branch 'main' into main
jcountsNR Aug 4, 2023
172e172
Merge branch 'open-telemetry:main' into main
jcountsNR Aug 7, 2023
2b12c34
Merge branch 'open-telemetry:main' into old-state
jcountsNR Aug 7, 2023
0702096
Merge branch 'main' into old-state
jcountsNR Aug 7, 2023
42c74cd
Merge pull request #2 from jcountsNR/old-state
jcountsNR Aug 7, 2023
e137f3d
chore: revert to metrics based on sarama
jcountsNR Aug 7, 2023
380b044
chore: updates per request
jcountsNR Sep 5, 2023
a255ec1
Merge branch 'main' into main
jcountsNR Sep 5, 2023
1cf6c15
chore: fix description issue
jcountsNR Sep 7, 2023
65862db
chore: update expected to sum
jcountsNR Sep 7, 2023
ad34e0b
TEST MUST REVERT
jcountsNR Sep 7, 2023
a894856
test2
jcountsNR Sep 7, 2023
3391a70
test3
jcountsNR Sep 7, 2023
b5b8bc1
chore: run write
jcountsNR Sep 7, 2023
ec75617
chore update
jcountsNR Sep 21, 2023
dafb75d
update write expected
jcountsNR Sep 21, 2023
e8a0cb5
fix
jcountsNR Sep 21, 2023
051fa90
activate metrics
jcountsNR Sep 21, 2023
1a4993b
update to string instead of map
jcountsNR Sep 21, 2023
5085e98
setting by field
jcountsNR Sep 21, 2023
1fb7d9b
chore: remove enabled setbyuser
jcountsNR Sep 21, 2023
afea5c1
adding all metrics enabled
jcountsNR Sep 21, 2023
1009d64
chore - add sink
jcountsNR Sep 21, 2023
c5c8c3b
fix write expected
jcountsNR Sep 21, 2023
4b80265
chore enable all metrics to try and run write()
jcountsNR Sep 21, 2023
3fc9412
disable again
jcountsNR Sep 21, 2023
4298096
remove test config
jcountsNR Sep 21, 2023
2368562
Merge branch 'main' into main
jcountsNR Sep 21, 2023
38ed735
remove writeexpected
jcountsNR Oct 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/expand-kafka-broker-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkametricsreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "expanding the broker metrics that are scraped."

# One or more tracking issues related to the change
issues: [14166]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
82 changes: 81 additions & 1 deletion receiver/kafkametricsreceiver/broker_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@ type brokerScraper struct {
mb *metadata.MetricsBuilder
}

type saramaMetrics map[string]map[string]interface{} // saramaMetrics is a map of metric name to tags

var nrMetricsPrefix = [...]string{
"consumer-fetch-rate-for-broker-",
"incoming-byte-rate-for-broker-",
"outgoing-byte-rate-for-broker-",
"request-rate-for-broker-",
"response-rate-for-broker-",
"response-size-for-broker-",
"request-size-for-broker-",
"requests-in-flight-for-broker-",
"request-latency-in-ms-for-broker-",
}

func (s *brokerScraper) Name() string {
return brokersScraperName
}
Expand All @@ -42,6 +56,53 @@ func (s *brokerScraper) shutdown(context.Context) error {
return nil
}

func (s *brokerScraper) scrapeMetric(now pcommon.Timestamp, allMetrics saramaMetrics, brokerID int64, prefix string) {
key := fmt.Sprint(prefix, brokerID)

if metric, ok := allMetrics[key]; ok {
switch prefix {
case "consumer-fetch-rate-for-broker-":
if v, ok := metric["mean.rate"].(float64); ok {
s.mb.RecordMessagingKafkaBrokerConsumerFetchRateDataPoint(now, v, brokerID)
}
case "incoming-byte-rate-for-broker-":
if v, ok := metric["mean.rate"].(float64); ok {
s.mb.RecordMessagingKafkaBrokerIncomingByteRateDataPoint(now, v, brokerID)
}
case "outgoing-byte-rate-for-broker-":
if v, ok := metric["mean.rate"].(float64); ok {
s.mb.RecordMessagingKafkaBrokerOutgoingByteRateDataPoint(now, v, brokerID)
}
case "request-rate-for-broker-":
if v, ok := metric["mean.rate"].(float64); ok {
s.mb.RecordMessagingKafkaBrokerRequestRateDataPoint(now, v, brokerID)
}
case "response-rate-for-broker-":
if v, ok := metric["mean.rate"].(float64); ok {
s.mb.RecordMessagingKafkaBrokerResponseRateDataPoint(now, v, brokerID)
}
case "response-size-for-broker-":
if v, ok := metric["mean"].(float64); ok {
s.mb.RecordMessagingKafkaBrokerResponseSizeDataPoint(now, v, brokerID)
}
case "request-size-for-broker-":
if v, ok := metric["mean"].(float64); ok {
s.mb.RecordMessagingKafkaBrokerRequestSizeDataPoint(now, v, brokerID)
}
case "requests-in-flight-for-broker-":
if v, ok := metric["count"].(int64); ok {
s.mb.RecordMessagingKafkaBrokerRequestsInFlightDataPoint(now, v, brokerID)
}
case "request-latency-in-ms-for-broker-":
if v, ok := metric["mean"].(float64); ok {
s.mb.RecordMessagingKafkaBrokerRequestLatencyDataPoint(now, v, brokerID)
}
default:
fmt.Printf("undefined for prefix %s\n", prefix)
}
}
}

func (s *brokerScraper) scrape(context.Context) (pmetric.Metrics, error) {
if s.client == nil {
client, err := newSaramaClient(s.config.Brokers, s.saramaConfig)
Expand All @@ -53,7 +114,26 @@ func (s *brokerScraper) scrape(context.Context) (pmetric.Metrics, error) {

brokers := s.client.Brokers()

s.mb.RecordKafkaBrokersDataPoint(pcommon.NewTimestampFromTime(time.Now()), int64(len(brokers)))
allMetrics := make(map[string]map[string]interface{})

if s.saramaConfig != nil {
allMetrics = s.saramaConfig.MetricRegistry.GetAll()
}

now := pcommon.NewTimestampFromTime(time.Now())
for _, broker := range brokers {
brokerID := int64(broker.ID())
for _, prefix := range nrMetricsPrefix {
s.scrapeMetric(now, allMetrics, brokerID, prefix)
}
}

brokerCount := int64(len(brokers))
// kafka.brokers is deprecated. This should be removed in a future release.
s.mb.RecordKafkaBrokersDataPoint(now, brokerCount)

// messaging.kafka.broker.count should replace kafka.brokers.
s.mb.RecordMessagingKafkaBrokerCountDataPoint(now, brokerCount)

return s.mb.Emit(), nil
}
Expand Down
36 changes: 32 additions & 4 deletions receiver/kafkametricsreceiver/broker_scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,38 @@ func TestBrokerScraper_scrape(t *testing.T) {
require.NoError(t, bs.start(context.Background(), componenttest.NewNopHost()))
md, err := bs.scrape(context.Background())
assert.NoError(t, err)
expectedDp := int64(len(testBrokers))
receivedMetrics := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0)
receivedDp := receivedMetrics.Sum().DataPoints().At(0).IntValue()
assert.Equal(t, expectedDp, receivedDp)
require.Equal(t, 1, md.ResourceMetrics().Len())
require.Equal(t, 1, md.ResourceMetrics().At(0).ScopeMetrics().Len())
ms := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics()
for i := 0; i < ms.Len(); i++ {
m := ms.At(i)
switch m.Name() {
case "kafka.brokers":
assert.Equal(t, m.Sum().DataPoints().At(0).IntValue(), int64(len(testBrokers)))
case "kafka.broker.count":
assert.Equal(t, m.Sum().DataPoints().At(0).IntValue(), int64(len(testBrokers)))
case "kafka.broker.consumer_fetch_rate":
assert.Equal(t, m.Gauge().DataPoints().At(0).DoubleValue(), int64(len(testBrokers)))
case "kafka.broker.incoming_byte_rate":
assert.Equal(t, m.Gauge().DataPoints().At(0).DoubleValue(), int64(len(testBrokers)))
case "kafka.broker.outgoing_byte_rate":
assert.Equal(t, m.Gauge().DataPoints().At(0).DoubleValue(), int64(len(testBrokers)))
case "kafka.broker.request_latency":
assert.Equal(t, m.Gauge().DataPoints().At(0).DoubleValue(), int64(len(testBrokers)))
case "kafka.broker.response_rate":
assert.Equal(t, m.Gauge().DataPoints().At(0).DoubleValue(), int64(len(testBrokers)))
case "kafka.broker.response_size":
assert.Equal(t, m.Gauge().DataPoints().At(0).DoubleValue(), int64(len(testBrokers)))
case "kafka.broker.request_rate":
assert.Equal(t, m.Gauge().DataPoints().At(0).DoubleValue(), int64(len(testBrokers)))
case "kafka.broker.request_size":
assert.Equal(t, m.Gauge().DataPoints().At(0).DoubleValue(), int64(len(testBrokers)))
case "kafka.broker.requests_in_flight":
assert.Equal(t, m.Sum().DataPoints().At(0).IntValue(), int64(len(testBrokers)))
case "kafka.broker.consumer_fetch_count":
assert.Equal(t, m.Sum().DataPoints().At(0).IntValue(), int64(len(testBrokers)))
}
}
}

func TestBrokersScraper_createBrokerScraper(t *testing.T) {
Expand Down
162 changes: 160 additions & 2 deletions receiver/kafkametricsreceiver/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ metrics:

### kafka.brokers

Number of brokers in the cluster.
[DEPRACATED] Number of brokers in the cluster.

| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic |
| ---- | ----------- | ---------- | ----------------------- | --------- |
| {brokers} | Sum | Int | Cumulative | false |
| {broker} | Sum | Int | Cumulative | false |

### kafka.consumer_group.lag

Expand Down Expand Up @@ -169,3 +169,161 @@ Number of partitions in topic.
| Name | Description | Values |
| ---- | ----------- | ------ |
| topic | The ID (integer) of a topic | Any Str |

## Optional Metrics

The following metrics are not emitted by default. Each of them can be enabled by applying the following configuration:

```yaml
metrics:
<metric_name>:
enabled: true
```

### messaging.kafka.broker.consumer_fetch_count

Count of consumer fetches

| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic |
| ---- | ----------- | ---------- | ----------------------- | --------- |
| {fetches} | Sum | Int | Cumulative | false |

#### Attributes

| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |

### messaging.kafka.broker.consumer_fetch_rate

Average consumer fetch Rate

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| {fetches}/s | Gauge | Double |

#### Attributes

| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |

### messaging.kafka.broker.count

Number of brokers in the cluster.

| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic |
| ---- | ----------- | ---------- | ----------------------- | --------- |
| {broker} | Sum | Int | Cumulative | true |

### messaging.kafka.broker.incoming_byte_rate

Average incoming Byte Rate in bytes/second

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| By/s | Gauge | Double |

#### Attributes

| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |

### messaging.kafka.broker.outgoing_byte_rate

Average outgoing Byte Rate in bytes/second.

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| 1 | Gauge | Double |

#### Attributes

| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |

### messaging.kafka.broker.request_latency

Average request latency in seconds

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| s | Gauge | Double |

#### Attributes

| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |

### messaging.kafka.broker.request_rate

Average request rate per second.

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| {requests}/s | Gauge | Double |

#### Attributes

| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |

### messaging.kafka.broker.request_size

Average request size in bytes

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| By | Gauge | Double |

#### Attributes

| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |

### messaging.kafka.broker.requests_in_flight

Requests in flight

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| {requests} | Gauge | Int |

#### Attributes

| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |

### messaging.kafka.broker.response_rate

Average response rate per second

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| {response}/s | Gauge | Double |

#### Attributes

| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |

### messaging.kafka.broker.response_size

Average response size in bytes

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| By | Gauge | Double |

#### Attributes

| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |
30 changes: 29 additions & 1 deletion receiver/kafkametricsreceiver/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,41 @@ func TestIntegration(t *testing.T) {
rCfg.Brokers = []string{fmt.Sprintf("%s:%s",
ci.HostForNamedContainer(t, "kafka"),
ci.MappedPortForNamedContainer(t, "kafka", kafkaPort))}
rCfg.Scrapers = []string{"brokers", "consumers", "topics"}
rCfg.Scrapers = []string{
"brokers",
"consumers",
"topics",
}
rCfg.Metrics.KafkaBrokers.Enabled = false
rCfg.Metrics.KafkaConsumerGroupLag.Enabled = true
rCfg.Metrics.KafkaConsumerGroupLagSum.Enabled = true
rCfg.Metrics.KafkaConsumerGroupMembers.Enabled = true
rCfg.Metrics.KafkaConsumerGroupOffset.Enabled = true
rCfg.Metrics.KafkaConsumerGroupOffsetSum.Enabled = true
rCfg.Metrics.KafkaPartitionCurrentOffset.Enabled = true
rCfg.Metrics.KafkaPartitionOldestOffset.Enabled = true
rCfg.Metrics.KafkaPartitionReplicas.Enabled = true
rCfg.Metrics.KafkaPartitionReplicasInSync.Enabled = true
rCfg.Metrics.KafkaTopicPartitions.Enabled = true
rCfg.Metrics.MessagingKafkaBrokerConsumerFetchCount.Enabled = true
rCfg.Metrics.MessagingKafkaBrokerConsumerFetchRate.Enabled = true
rCfg.Metrics.MessagingKafkaBrokerCount.Enabled = true
rCfg.Metrics.MessagingKafkaBrokerIncomingByteRate.Enabled = true
rCfg.Metrics.MessagingKafkaBrokerOutgoingByteRate.Enabled = true
rCfg.Metrics.MessagingKafkaBrokerRequestLatency.Enabled = true
rCfg.Metrics.MessagingKafkaBrokerRequestRate.Enabled = true
rCfg.Metrics.MessagingKafkaBrokerRequestSize.Enabled = true
rCfg.Metrics.MessagingKafkaBrokerRequestsInFlight.Enabled = true
rCfg.Metrics.MessagingKafkaBrokerResponseRate.Enabled = true
rCfg.Metrics.MessagingKafkaBrokerResponseSize.Enabled = true
Comment on lines +76 to +96
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have all the metrics enabled, but the expected.yaml output doesn't have anything new. It means the integration test doesn't work some reason

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I never got it to write anything to expected.yaml, but I was getting metrics from it when I ran the collector. That does seem to indicate it's an issue in the integration test, but the only thing I've changed here is just enabling the metrics.

}),

// scraperinttest.WriteExpected(), // TODO remove
scraperinttest.WithCompareOptions(
// pmetrictest.IgnoreMetricValues(),
pmetrictest.IgnoreStartTimestamp(),
pmetrictest.IgnoreTimestamp(),
),
scraperinttest.WriteExpected(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this. This line is intender to be executed once locally to produce the expected.yaml file

).Run(t)
}
Loading
Loading