From 8efdb3559a0a169eb1e8423353fa976e78a28d88 Mon Sep 17 00:00:00 2001 From: Sorin Buliarca Date: Wed, 31 May 2023 11:04:02 +0300 Subject: [PATCH] kotel: add messaging.kafka.fetch_records.count and messaging.kafka.produce_records.count metrics --- plugin/kotel/README.md | 2 ++ plugin/kotel/meter.go | 40 ++++++++++++++++++++++++++++++++++++---- 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/plugin/kotel/README.md b/plugin/kotel/README.md index 10bc515d..4ed1b5ae 100644 --- a/plugin/kotel/README.md +++ b/plugin/kotel/README.md @@ -152,7 +152,9 @@ messaging.kafka.write_bytes{node_id = "#{node}"} messaging.kafka.read_errors.count{node_id = "#{node}"} messaging.kafka.read_bytes.count{node_id = "#{node}"} messaging.kafka.produce_bytes.count{node_id = "#{node}", topic = "#{topic}"} +messaging.kafka.produce_records.count{node_id = "#{node}", topic = "#{topic}"} messaging.kafka.fetch_bytes.count{node_id = "#{node}", topic = "#{topic}"} +messaging.kafka.fetch_records.count{node_id = "#{node}", topic = "#{topic}"} ``` ### Getting started diff --git a/plugin/kotel/meter.go b/plugin/kotel/meter.go index 4480b70f..7ac92abb 100644 --- a/plugin/kotel/meter.go +++ b/plugin/kotel/meter.go @@ -88,8 +88,10 @@ type instruments struct { readErrs metric.Int64Counter readBytes metric.Int64Counter - produceBytes metric.Int64Counter - fetchBytes metric.Int64Counter + produceBytes metric.Int64Counter + produceRecords metric.Int64Counter + fetchBytes metric.Int64Counter + fetchRecords metric.Int64Counter } func (m *Meter) newInstruments() instruments { @@ -172,6 +174,15 @@ func (m *Meter) newInstruments() instruments { log.Printf("failed to create produceBytes instrument, %v", err) } + produceRecords, err := m.meter.Int64Counter( + "messaging.kafka.produce_records.count", + metric.WithUnit(dimensionless), + metric.WithDescription("Total number of produced records, by broker and topic"), + ) + if err != nil { + log.Printf("failed to create produceRecords instrument, %v", err) + } + fetchBytes, err := m.meter.Int64Counter( "messaging.kafka.fetch_bytes.count", metric.WithUnit(bytes), @@ -181,6 +192,15 @@ func (m *Meter) newInstruments() instruments { log.Printf("failed to create fetchBytes instrument, %v", err) } + fetchRecords, err := m.meter.Int64Counter( + "messaging.kafka.fetch_records.count", + metric.WithUnit(dimensionless), + metric.WithDescription("Total number of fetched records, by broker and topic"), + ) + if err != nil { + log.Printf("failed to create fetchRecords instrument, %v", err) + } + return instruments{ connects: connects, connectErrs: connectErrs, @@ -192,8 +212,10 @@ func (m *Meter) newInstruments() instruments { readErrs: readErrs, readBytes: readBytes, - produceBytes: produceBytes, - fetchBytes: fetchBytes, + produceBytes: produceBytes, + produceRecords: produceRecords, + fetchBytes: fetchBytes, + fetchRecords: fetchRecords, } } @@ -283,6 +305,11 @@ func (m *Meter) OnProduceBatchWritten(meta kgo.BrokerMetadata, topic string, _ i int64(pbm.UncompressedBytes), metric.WithAttributeSet(attributes), ) + m.instruments.produceRecords.Add( + context.Background(), + int64(pbm.NumRecords), + metric.WithAttributeSet(attributes), + ) } func (m *Meter) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ int32, fbm kgo.FetchBatchMetrics) { @@ -296,4 +323,9 @@ func (m *Meter) OnFetchBatchRead(meta kgo.BrokerMetadata, topic string, _ int32, int64(fbm.UncompressedBytes), metric.WithAttributeSet(attributes), ) + m.instruments.fetchRecords.Add( + context.Background(), + int64(fbm.NumRecords), + metric.WithAttributeSet(attributes), + ) }