Skip to content

Commit

Permalink
Merge pull request #1 from jcountsNR/test
Browse files Browse the repository at this point in the history
feat: update kafka connect for NR
  • Loading branch information
jcountsNR authored Jul 13, 2023
2 parents 343fbff + 9b7332f commit 1212594
Show file tree
Hide file tree
Showing 9 changed files with 1,349 additions and 74 deletions.
16 changes: 16 additions & 0 deletions .chloggen/expand-kafka-broker-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkametricsreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "expanding the broker metrics that are scraped."

# One or more tracking issues related to the change
issues: [14166]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
82 changes: 81 additions & 1 deletion receiver/kafkametricsreceiver/broker_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata"
)
type saramaMetrics map[string]map[string]interface{} // saramaMetrics is a map of metric name to tags


type brokerScraper struct {
client sarama.Client
Expand All @@ -26,6 +28,18 @@ type brokerScraper struct {
mb *metadata.MetricsBuilder
}

var nrMetricsPrefix = [...]string{
"consumer-fetch-rate-for-broker-",
"incoming-byte-rate-for-broker-",
"outgoing-byte-rate-for-broker-",
"request-rate-for-broker-",
"response-rate-for-broker-",
"response-size-for-broker-",
"request-size-for-broker-",
"requests-in-flight-for-broker-",
"request-latency-in-ms-for-broker-",
}

func (s *brokerScraper) Name() string {
return brokersScraperName
}
Expand All @@ -42,6 +56,53 @@ func (s *brokerScraper) shutdown(context.Context) error {
return nil
}

func (s *brokerScraper) scrapeMetric(now pcommon.Timestamp, allMetrics saramaMetrics, brokerID int64, prefix string) {
key := fmt.Sprint(prefix, brokerID)

if metric, ok := allMetrics[key]; ok {
switch prefix {
case "consumer-fetch-rate-for-broker-":
if v, ok := metric["mean.rate"].(float64); ok {
s.mb.RecordKafkaBrokersConsumerFetchRateDataPoint(now, v, brokerID)
}
case "incoming-byte-rate-for-broker-":
if v, ok := metric["mean.rate"].(float64); ok {
s.mb.RecordKafkaBrokersIncomingByteRateDataPoint(now, v, brokerID)
}
case "outgoing-byte-rate-for-broker-":
if v, ok := metric["mean.rate"].(float64); ok {
s.mb.RecordKafkaBrokersOutgoingByteRateDataPoint(now, v, brokerID)
}
case "request-rate-for-broker-":
if v, ok := metric["mean.rate"].(float64); ok {
s.mb.RecordKafkaBrokersRequestRateDataPoint(now, v, brokerID)
}
case "response-rate-for-broker-":
if v, ok := metric["mean.rate"].(float64); ok {
s.mb.RecordKafkaBrokersResponseRateDataPoint(now, v, brokerID)
}
case "response-size-for-broker-":
if v, ok := metric["mean"].(float64); ok {
s.mb.RecordKafkaBrokersResponseSizeDataPoint(now, v, brokerID)
}
case "request-size-for-broker-":
if v, ok := metric["mean"].(float64); ok {
s.mb.RecordKafkaBrokersRequestSizeDataPoint(now, v, brokerID)
}
case "requests-in-flight-for-broker-":
if v, ok := metric["count"].(int64); ok {
s.mb.RecordKafkaBrokersRequestsInFlightDataPoint(now, v, brokerID)
}
case "request-latency-in-ms-for-broker-":
if v, ok := metric["mean"].(float64); ok {
s.mb.RecordKafkaBrokersRequestLatencyDataPoint(now, v, brokerID)
}
default:
fmt.Printf("undefined for prefix %s\n", prefix)
}
}
}

func (s *brokerScraper) scrape(context.Context) (pmetric.Metrics, error) {
if s.client == nil {
client, err := newSaramaClient(s.config.Brokers, s.saramaConfig)
Expand All @@ -53,7 +114,26 @@ func (s *brokerScraper) scrape(context.Context) (pmetric.Metrics, error) {

brokers := s.client.Brokers()

s.mb.RecordKafkaBrokersDataPoint(pcommon.NewTimestampFromTime(time.Now()), int64(len(brokers)))
allMetrics := make(map[string]map[string]interface{})

if s.saramaConfig != nil {
allMetrics = s.saramaConfig.MetricRegistry.GetAll()
}

now := pcommon.NewTimestampFromTime(time.Now())
for _, broker := range brokers {
brokerID := int64(broker.ID())
for _, prefix := range nrMetricsPrefix {
s.scrapeMetric(now, allMetrics, brokerID, prefix)
}
}

brokerCount := int64(len(brokers))
// kafka.brokers is deprecated. This should be removed in a future release.
s.mb.RecordKafkaBrokersDataPoint(now, brokerCount)

// kafka.brokers.count should replace kafka.brokers.
s.mb.RecordKafkaBrokersCountDataPoint(now, brokerCount)

return s.mb.Emit(), nil
}
Expand Down
153 changes: 149 additions & 4 deletions receiver/kafkametricsreceiver/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ metrics:
### kafka.brokers
Number of brokers in the cluster.
[DEPRECATED] Number of brokers in the cluster.
| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic |
| ---- | ----------- | ---------- | ----------------------- | --------- |
| {brokers} | Sum | Int | Cumulative | false |
| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| {brokers} | Gauge | Int |
### kafka.consumer_group.lag
Expand Down Expand Up @@ -169,3 +169,148 @@ Number of partitions in topic.
| Name | Description | Values |
| ---- | ----------- | ------ |
| topic | The ID (integer) of a topic | Any Str |
## Optional Metrics
The following metrics are not emitted by default. Each of them can be enabled by applying the following configuration:
```yaml
metrics:
<metric_name>:
enabled: true
```
### kafka.brokers.consumer_fetch_rate
Average consumer fetch Rate
| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| {fetches}/s | Gauge | Double |
#### Attributes
| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |
### kafka.brokers.count
Number of brokers in the cluster.
| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic |
| ---- | ----------- | ---------- | ----------------------- | --------- |
| {brokers} | Sum | Int | Cumulative | false |
### kafka.brokers.incoming_byte_rate
Average tncoming Byte Rate in bytes/second
| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| 1 | Gauge | Double |
#### Attributes
| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |
### kafka.brokers.outgoing_byte_rate
Average outgoing Byte Rate in bytes/second.
| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| 1 | Gauge | Double |
#### Attributes
| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |
### kafka.brokers.request_latency
Average request latency in ms
| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| ms | Gauge | Double |
#### Attributes
| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |
### kafka.brokers.request_rate
Average request rate per second.
| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| {requests}/s | Gauge | Double |
#### Attributes
| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |
### kafka.brokers.request_size
Average request size in bytes
| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| By | Gauge | Double |
#### Attributes
| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |
### kafka.brokers.requests_in_flight
Requests in flight
| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| {requests} | Gauge | Int |
#### Attributes
| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |
### kafka.brokers.response_rate
Average response rate per second
| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| {response}/s | Gauge | Double |
#### Attributes
| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |
### kafka.brokers.response_size
Average response size in bytes
| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| By | Gauge | Double |
#### Attributes
| Name | Description | Values |
| ---- | ----------- | ------ |
| broker | The ID (integer) of a broker | Any Int |
62 changes: 51 additions & 11 deletions receiver/kafkametricsreceiver/internal/metadata/generated_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 1212594

Please sign in to comment.