Skip to content

Commit

Permalink
kafka receiver exposes some metrics by partition (fixes open-telemetr…
Browse files Browse the repository at this point in the history
…y#30177) (open-telemetry#30268)

**Description:**

Fixes
[open-telemetry#30177](open-telemetry#30177)
The Kafka receiver now exposes the following metrics according to
partition. Beforehand, if a collector were consuming from 10 partitions,
one of each metric would be rendered, with its value fluctuating
according to the state of each partition's consumer. Now the metrics
endpoint will expose 10 sets of metrics, each with a `partition` tag.

* kafka_receiver_messages
* kafka_receiver_current_offset
* kafka_receiver_offset_lag

**Testing:**
* Unit tests were run
* Stats endpoint observed manually for correctness
* Scraped stats charted in Prometheus to ensure stability

Example output:
```
otelcol_kafka_receiver_messages{name="",partition="0",service_instance_id="db3bae97-1856-4b49-85cb-8559db48f345",service_name="otelcontribcol",service_version="0.91.0-dev"} 29
otelcol_kafka_receiver_messages{name="",partition="1",service_instance_id="db3bae97-1856-4b49-85cb-8559db48f345",service_name="otelcontribcol",service_version="0.91.0-dev"} 32
otelcol_kafka_receiver_messages{name="",partition="10",service_instance_id="db3bae97-1856-4b49-85cb-8559db48f345",service_name="otelcontribcol",service_version="0.91.0-dev"} 32
otelcol_kafka_receiver_messages{name="",partition="11",service_instance_id="db3bae97-1856-4b49-85cb-8559db48f345",service_name="otelcontribcol",service_version="0.91.0-dev"} 28
otelcol_kafka_receiver_messages{name="",partition="12",service_instance_id="db3bae97-1856-4b49-85cb-8559db48f345",service_name="otelcontribcol",service_version="0.91.0-dev"} 36
otelcol_kafka_receiver_messages{name="",partition="13",service_instance_id="db3bae97-1856-4b49-85cb-8559db48f345",service_name="otelcontribcol",service_version="0.91.0-dev"} 38
```

**Documentation:**
None added

Co-authored-by: Sean Marciniak <[email protected]>
  • Loading branch information
2 people authored and mfyuce committed Jan 18, 2024
1 parent d41b0cf commit 7f7fecf
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 23 deletions.
31 changes: 31 additions & 0 deletions .chloggen/fix_kafka_receiver_metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkareceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: The Kafka receiver now exports some partition-specific metrics per-partition, with a `partition` tag

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30177]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The following metrics now render per partition:
- kafka_receiver_messages
- kafka_receiver_current_offset
- kafka_receiver_offset_lag
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
17 changes: 14 additions & 3 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collect
import (
"context"
"fmt"
"strconv"
"sync"

"github.com/IBM/sarama"
Expand Down Expand Up @@ -446,7 +447,10 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
}

ctx := c.obsrecv.StartTracesOp(session.Context())
statsTags := []tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())}
statsTags := []tag.Mutator{
tag.Upsert(tagInstanceName, c.id.String()),
tag.Upsert(tagPartition, strconv.Itoa(int(claim.Partition()))),
}
_ = stats.RecordWithTags(ctx, statsTags,
statMessageCount.M(1),
statMessageOffset.M(message.Offset),
Expand Down Expand Up @@ -526,7 +530,10 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS
}

ctx := c.obsrecv.StartMetricsOp(session.Context())
statsTags := []tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())}
statsTags := []tag.Mutator{
tag.Upsert(tagInstanceName, c.id.String()),
tag.Upsert(tagPartition, strconv.Itoa(int(claim.Partition()))),
}
_ = stats.RecordWithTags(ctx, statsTags,
statMessageCount.M(1),
statMessageOffset.M(message.Offset),
Expand Down Expand Up @@ -610,9 +617,13 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
}

ctx := c.obsrecv.StartLogsOp(session.Context())
statsTags := []tag.Mutator{
tag.Upsert(tagInstanceName, c.id.String()),
tag.Upsert(tagPartition, strconv.Itoa(int(claim.Partition()))),
}
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(tagInstanceName, c.id.String())},
statsTags,
statMessageCount.M(1),
statMessageOffset.M(message.Offset),
statMessageOffsetLag.M(claim.HighWaterMarkOffset()-message.Offset-1))
Expand Down
20 changes: 11 additions & 9 deletions receiver/kafkareceiver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

var (
tagInstanceName, _ = tag.NewKey("name")
tagPartition, _ = tag.NewKey("partition")

statMessageCount = stats.Int64("kafka_receiver_messages", "Number of received messages", stats.UnitDimensionless)
statMessageOffset = stats.Int64("kafka_receiver_current_offset", "Current message offset", stats.UnitDimensionless)
Expand All @@ -26,69 +27,70 @@ var (

// metricViews return metric views for Kafka receiver.
func metricViews() []*view.View {
tagKeys := []tag.Key{tagInstanceName}
partitionAgnosticTagKeys := []tag.Key{tagInstanceName}
partitionSpecificTagKeys := []tag.Key{tagInstanceName, tagPartition}

countMessages := &view.View{
Name: statMessageCount.Name(),
Measure: statMessageCount,
Description: statMessageCount.Description(),
TagKeys: tagKeys,
TagKeys: partitionSpecificTagKeys,
Aggregation: view.Sum(),
}

lastValueOffset := &view.View{
Name: statMessageOffset.Name(),
Measure: statMessageOffset,
Description: statMessageOffset.Description(),
TagKeys: tagKeys,
TagKeys: partitionSpecificTagKeys,
Aggregation: view.LastValue(),
}

lastValueOffsetLag := &view.View{
Name: statMessageOffsetLag.Name(),
Measure: statMessageOffsetLag,
Description: statMessageOffsetLag.Description(),
TagKeys: tagKeys,
TagKeys: partitionSpecificTagKeys,
Aggregation: view.LastValue(),
}

countPartitionStart := &view.View{
Name: statPartitionStart.Name(),
Measure: statPartitionStart,
Description: statPartitionStart.Description(),
TagKeys: tagKeys,
TagKeys: partitionAgnosticTagKeys,
Aggregation: view.Sum(),
}

countPartitionClose := &view.View{
Name: statPartitionClose.Name(),
Measure: statPartitionClose,
Description: statPartitionClose.Description(),
TagKeys: tagKeys,
TagKeys: partitionAgnosticTagKeys,
Aggregation: view.Sum(),
}

countUnmarshalFailedMetricPoints := &view.View{
Name: statUnmarshalFailedMetricPoints.Name(),
Measure: statUnmarshalFailedMetricPoints,
Description: statUnmarshalFailedMetricPoints.Description(),
TagKeys: tagKeys,
TagKeys: partitionAgnosticTagKeys,
Aggregation: view.Sum(),
}

countUnmarshalFailedLogRecords := &view.View{
Name: statUnmarshalFailedLogRecords.Name(),
Measure: statUnmarshalFailedLogRecords,
Description: statUnmarshalFailedLogRecords.Description(),
TagKeys: tagKeys,
TagKeys: partitionAgnosticTagKeys,
Aggregation: view.Sum(),
}

countUnmarshalFailedSpans := &view.View{
Name: statUnmarshalFailedSpans.Name(),
Measure: statUnmarshalFailedSpans,
Description: statUnmarshalFailedSpans.Description(),
TagKeys: tagKeys,
TagKeys: partitionAgnosticTagKeys,
Aggregation: view.Sum(),
}

Expand Down
29 changes: 18 additions & 11 deletions receiver/kafkareceiver/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,26 @@ import (
"github.com/stretchr/testify/assert"
)

type expectedView struct {
name string
tagCount int
}

func TestMetrics(t *testing.T) {
metricViews := metricViews()
viewNames := []string{
"kafka_receiver_messages",
"kafka_receiver_current_offset",
"kafka_receiver_offset_lag",
"kafka_receiver_partition_start",
"kafka_receiver_partition_close",
"kafka_receiver_unmarshal_failed_metric_points",
"kafka_receiver_unmarshal_failed_log_records",
"kafka_receiver_unmarshal_failed_spans",
viewNames := []expectedView{
{name: "kafka_receiver_messages", tagCount: 2},
{name: "kafka_receiver_current_offset", tagCount: 2},
{name: "kafka_receiver_offset_lag", tagCount: 2},
{name: "kafka_receiver_partition_start", tagCount: 1},
{name: "kafka_receiver_partition_close", tagCount: 1},
{name: "kafka_receiver_unmarshal_failed_metric_points", tagCount: 1},
{name: "kafka_receiver_unmarshal_failed_log_records", tagCount: 1},
{name: "kafka_receiver_unmarshal_failed_spans", tagCount: 1},
}
for i, viewName := range viewNames {
assert.Equal(t, viewName, metricViews[i].Name)

for i, expectedView := range viewNames {
assert.Equal(t, expectedView.name, metricViews[i].Name)
assert.Equal(t, expectedView.tagCount, len(metricViews[i].TagKeys))
}
}

0 comments on commit 7f7fecf

Please sign in to comment.