Skip to content

Commit

Permalink
kotel: add messaging.kafka.fetch_records.count and messaging.kafka.pr…
Browse files Browse the repository at this point in the history
…oduce_records.count metrics
  • Loading branch information
sbuliarca committed May 31, 2023
1 parent 8406043 commit 8efdb35
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 4 deletions.
2 changes: 2 additions & 0 deletions plugin/kotel/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ messaging.kafka.write_bytes{node_id = "#{node}"}
messaging.kafka.read_errors.count{node_id = "#{node}"}
messaging.kafka.read_bytes.count{node_id = "#{node}"}
messaging.kafka.produce_bytes.count{node_id = "#{node}", topic = "#{topic}"}
messaging.kafka.produce_records.count{node_id = "#{node}", topic = "#{topic}"}
messaging.kafka.fetch_bytes.count{node_id = "#{node}", topic = "#{topic}"}
messaging.kafka.fetch_records.count{node_id = "#{node}", topic = "#{topic}"}
```

### Getting started
Expand Down
40 changes: 36 additions & 4 deletions plugin/kotel/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@ type instruments struct {
readErrs metric.Int64Counter
readBytes metric.Int64Counter

produceBytes metric.Int64Counter
fetchBytes metric.Int64Counter
produceBytes metric.Int64Counter
produceRecords metric.Int64Counter
fetchBytes metric.Int64Counter
fetchRecords metric.Int64Counter
}

func (m *Meter) newInstruments() instruments {
Expand Down Expand Up @@ -172,6 +174,15 @@ func (m *Meter) newInstruments() instruments {
log.Printf("failed to create produceBytes instrument, %v", err)
}

produceRecords, err := m.meter.Int64Counter(
"messaging.kafka.produce_records.count",
metric.WithUnit(dimensionless),
metric.WithDescription("Total number of produced records, by broker and topic"),
)
if err != nil {
log.Printf("failed to create produceRecords instrument, %v", err)
}

fetchBytes, err := m.meter.Int64Counter(
"messaging.kafka.fetch_bytes.count",
metric.WithUnit(bytes),
Expand All @@ -181,6 +192,15 @@ func (m *Meter) newInstruments() instruments {
log.Printf("failed to create fetchBytes instrument, %v", err)
}

fetchRecords, err := m.meter.Int64Counter(
"messaging.kafka.fetch_records.count",
metric.WithUnit(dimensionless),
metric.WithDescription("Total number of fetched records, by broker and topic"),
)
if err != nil {
log.Printf("failed to create fetchRecords instrument, %v", err)
}

return instruments{
connects: connects,
connectErrs: connectErrs,
Expand All @@ -192,8 +212,10 @@ func (m *Meter) newInstruments() instruments {
readErrs: readErrs,
readBytes: readBytes,

produceBytes: produceBytes,
fetchBytes: fetchBytes,
produceBytes: produceBytes,
produceRecords: produceRecords,
fetchBytes: fetchBytes,
fetchRecords: fetchRecords,
}
}

Expand Down Expand Up @@ -283,6 +305,11 @@ func (m *Meter) OnProduceBatchWritten(meta kgo.BrokerMetadata, topic string, _ i
int64(pbm.UncompressedBytes),
metric.WithAttributeSet(attributes),
)
m.instruments.produceRecords.Add(
context.Background(),
int64(pbm.NumRecords),
metric.WithAttributeSet(attributes),
)
}

func (m *Meter) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ int32, fbm kgo.FetchBatchMetrics) {
Expand All @@ -296,4 +323,9 @@ func (m *Meter) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ int32,
int64(fbm.UncompressedBytes),
metric.WithAttributeSet(attributes),
)
m.instruments.fetchRecords.Add(
context.Background(),
int64(fbm.NumRecords),
metric.WithAttributeSet(attributes),
)
}

0 comments on commit 8efdb35

Please sign in to comment.