From f722fe4c82426e2048a931ca98e02de50647388c Mon Sep 17 00:00:00 2001 From: Mario Date: Thu, 8 Jun 2023 17:53:21 +0200 Subject: [PATCH] [servicegraphprocessor/servicegraphconnector] Change metric names to match the spec (#21098) * Change metrics to match Tempo spec * Update tests * Add chlog entry * Fix duration * Poke CI * Add legacy metrics under flag * Typo --- .chloggen/servicegraph-fix-metric-names.yaml | 18 +++ processor/servicegraphprocessor/factory.go | 22 +-- processor/servicegraphprocessor/processor.go | 133 +++++++++++++----- .../servicegraphprocessor/processor_test.go | 34 +++-- 4 files changed, 149 insertions(+), 58 deletions(-) create mode 100755 .chloggen/servicegraph-fix-metric-names.yaml diff --git a/.chloggen/servicegraph-fix-metric-names.yaml b/.chloggen/servicegraph-fix-metric-names.yaml new file mode 100755 index 000000000000..25afd8fabca7 --- /dev/null +++ b/.chloggen/servicegraph-fix-metric-names.yaml @@ -0,0 +1,18 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: servicegraphprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Change metric names to match the spec + +# One or more tracking issues related to the change +issues: [18743, 16578] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Latency metric `traces_service_graph_request_duration_seconds` are deprecated in favor of server and client metrics | + `traces_service_graph_server_request_seconds` and `traces_service_graph_client_request_seconds` | + respectively. Use the feature gate `processor.servicegraph.legacyLatencyMetricNames` to enable the old metric names. diff --git a/processor/servicegraphprocessor/factory.go b/processor/servicegraphprocessor/factory.go index 5166d0650ff5..df0a664e0277 100644 --- a/processor/servicegraphprocessor/factory.go +++ b/processor/servicegraphprocessor/factory.go @@ -13,18 +13,18 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/processor" - - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/servicegraphprocessor/internal/metadata" ) const ( // The value of "type" key in configuration. + typeStr = "servicegraph" // The stability level of the processor. - connectorStability = component.StabilityLevelDevelopment - virtualNodeFeatureGateID = "processor.servicegraph.virtualNode" + connectorStability = component.StabilityLevelDevelopment + virtualNodeFeatureGateID = "processor.servicegraph.virtualNode" + legacyLatencyMetricNamesFeatureGateID = "processor.servicegraph.legacyLatencyMetricNames" ) -var virtualNodeFeatureGate *featuregate.Gate +var virtualNodeFeatureGate, legacyMetricNamesFeatureGate *featuregate.Gate func init() { virtualNodeFeatureGate = featuregate.GlobalRegistry().MustRegister( @@ -33,6 +33,13 @@ func init() { featuregate.WithRegisterDescription("When enabled, when the edge expires, processor checks if it has peer attributes(`db.name, net.sock.peer.addr, net.peer.name, rpc.service, http.url, http.target`), and then aggregate the metrics with virtual node."), featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/17196"), ) + // TODO: Remove this feature gate when the legacy metric names are removed. + legacyMetricNamesFeatureGate = featuregate.GlobalRegistry().MustRegister( + legacyLatencyMetricNamesFeatureGateID, + featuregate.StageAlpha, // Alpha because we want it disabled by default. + featuregate.WithRegisterDescription("When enabled, processor uses legacy latency metric names."), + featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/18743,https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/16578"), + ) } // NewFactory creates a factory for the servicegraph processor. @@ -41,9 +48,9 @@ func NewFactory() processor.Factory { _ = view.Register(serviceGraphProcessorViews()...) return processor.NewFactory( - metadata.Type, + typeStr, createDefaultConfig, - processor.WithTraces(createTracesProcessor, metadata.TracesStability), + processor.WithTraces(createTracesProcessor, connectorStability), ) } @@ -52,7 +59,6 @@ func NewConnectorFactoryFunc(cfgType component.Type, tracesToMetricsStability co return func() connector.Factory { // TODO: Handle this err _ = view.Register(serviceGraphProcessorViews()...) - return connector.NewFactory( cfgType, createDefaultConfig, diff --git a/processor/servicegraphprocessor/processor.go b/processor/servicegraphprocessor/processor.go index 72c551b31948..07f01f93606d 100644 --- a/processor/servicegraphprocessor/processor.go +++ b/processor/servicegraphprocessor/processor.go @@ -58,13 +58,16 @@ type serviceGraphProcessor struct { startTime time.Time - seriesMutex sync.Mutex - reqTotal map[string]int64 - reqFailedTotal map[string]int64 - reqDurationSecondsSum map[string]float64 - reqDurationSecondsCount map[string]uint64 - reqDurationBounds []float64 - reqDurationSecondsBucketCounts map[string][]uint64 + seriesMutex sync.Mutex + reqTotal map[string]int64 + reqFailedTotal map[string]int64 + reqClientDurationSecondsCount map[string]uint64 + reqClientDurationSecondsSum map[string]float64 + reqClientDurationSecondsBucketCounts map[string][]uint64 + reqServerDurationSecondsCount map[string]uint64 + reqServerDurationSecondsSum map[string]float64 + reqServerDurationSecondsBucketCounts map[string][]uint64 + reqDurationBounds []float64 metricMutex sync.RWMutex keyToMetric map[string]metricSeries @@ -93,17 +96,20 @@ func newProcessor(logger *zap.Logger, config component.Config) *serviceGraphProc } return &serviceGraphProcessor{ - config: pConfig, - logger: logger, - startTime: time.Now(), - reqTotal: make(map[string]int64), - reqFailedTotal: make(map[string]int64), - reqDurationSecondsSum: make(map[string]float64), - reqDurationSecondsCount: make(map[string]uint64), - reqDurationBounds: bounds, - reqDurationSecondsBucketCounts: make(map[string][]uint64), - keyToMetric: make(map[string]metricSeries), - shutdownCh: make(chan interface{}), + config: pConfig, + logger: logger, + startTime: time.Now(), + reqTotal: make(map[string]int64), + reqFailedTotal: make(map[string]int64), + reqClientDurationSecondsCount: make(map[string]uint64), + reqClientDurationSecondsSum: make(map[string]float64), + reqClientDurationSecondsBucketCounts: make(map[string][]uint64), + reqServerDurationSecondsCount: make(map[string]uint64), + reqServerDurationSecondsSum: make(map[string]float64), + reqServerDurationSecondsBucketCounts: make(map[string][]uint64), + reqDurationBounds: bounds, + keyToMetric: make(map[string]metricSeries), + shutdownCh: make(chan interface{}), } } @@ -336,9 +342,6 @@ func (p *serviceGraphProcessor) aggregateMetricsForEdge(e *store.Edge) { metricKey := p.buildMetricKey(e.ClientService, e.ServerService, string(e.ConnectionType), e.Dimensions) dimensions := buildDimensions(e) - // TODO: Consider configuring server or client latency - duration := e.ServerLatencySec - p.seriesMutex.Lock() defer p.seriesMutex.Unlock() p.updateSeries(metricKey, dimensions) @@ -346,7 +349,7 @@ func (p *serviceGraphProcessor) aggregateMetricsForEdge(e *store.Edge) { if e.Failed { p.updateErrorMetrics(metricKey) } - p.updateDurationMetrics(metricKey, duration) + p.updateDurationMetrics(metricKey, e.ServerLatencySec, e.ClientLatencySec) } func (p *serviceGraphProcessor) updateSeries(key string, dimensions pcommon.Map) { @@ -373,14 +376,29 @@ func (p *serviceGraphProcessor) updateCountMetrics(key string) { p.reqTotal[key] func (p *serviceGraphProcessor) updateErrorMetrics(key string) { p.reqFailedTotal[key]++ } -func (p *serviceGraphProcessor) updateDurationMetrics(key string, duration float64) { +func (p *serviceGraphProcessor) updateDurationMetrics(key string, serverDuration, clientDuration float64) { + p.updateServerDurationMetrics(key, serverDuration) + p.updateClientDurationMetrics(key, clientDuration) +} + +func (p *serviceGraphProcessor) updateServerDurationMetrics(key string, duration float64) { index := sort.SearchFloat64s(p.reqDurationBounds, duration) // Search bucket index - if _, ok := p.reqDurationSecondsBucketCounts[key]; !ok { - p.reqDurationSecondsBucketCounts[key] = make([]uint64, len(p.reqDurationBounds)+1) + if _, ok := p.reqServerDurationSecondsBucketCounts[key]; !ok { + p.reqServerDurationSecondsBucketCounts[key] = make([]uint64, len(p.reqDurationBounds)+1) } - p.reqDurationSecondsSum[key] += duration - p.reqDurationSecondsCount[key]++ - p.reqDurationSecondsBucketCounts[key][index]++ + p.reqServerDurationSecondsSum[key] += duration + p.reqServerDurationSecondsCount[key]++ + p.reqServerDurationSecondsBucketCounts[key][index]++ +} + +func (p *serviceGraphProcessor) updateClientDurationMetrics(key string, duration float64) { + index := sort.SearchFloat64s(p.reqDurationBounds, duration) // Search bucket index + if _, ok := p.reqClientDurationSecondsBucketCounts[key]; !ok { + p.reqClientDurationSecondsBucketCounts[key] = make([]uint64, len(p.reqDurationBounds)+1) + } + p.reqClientDurationSecondsSum[key] += duration + p.reqClientDurationSecondsCount[key]++ + p.reqClientDurationSecondsBucketCounts[key][index]++ } func buildDimensions(e *store.Edge) pcommon.Map { @@ -460,9 +478,22 @@ func (p *serviceGraphProcessor) collectCountMetrics(ilm pmetric.ScopeMetrics) er } func (p *serviceGraphProcessor) collectLatencyMetrics(ilm pmetric.ScopeMetrics) error { - for key := range p.reqDurationSecondsCount { + // TODO: Remove this once legacy metric names are removed + if legacyMetricNamesFeatureGate.IsEnabled() { + return p.collectServerLatencyMetrics(ilm, "traces_service_graph_request_duration_seconds") + } + + if err := p.collectServerLatencyMetrics(ilm, "traces_service_graph_request_server_seconds"); err != nil { + return err + } + + return p.collectClientLatencyMetrics(ilm) +} + +func (p *serviceGraphProcessor) collectClientLatencyMetrics(ilm pmetric.ScopeMetrics) error { + for key := range p.reqServerDurationSecondsCount { mDuration := ilm.Metrics().AppendEmpty() - mDuration.SetName("traces_service_graph_request_duration_seconds") + mDuration.SetName("traces_service_graph_request_client_seconds") // TODO: Support other aggregation temporalities mDuration.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) @@ -472,12 +503,39 @@ func (p *serviceGraphProcessor) collectLatencyMetrics(ilm pmetric.ScopeMetrics) dpDuration.SetStartTimestamp(pcommon.NewTimestampFromTime(p.startTime)) dpDuration.SetTimestamp(timestamp) dpDuration.ExplicitBounds().FromRaw(p.reqDurationBounds) - dpDuration.BucketCounts().FromRaw(p.reqDurationSecondsBucketCounts[key]) - dpDuration.SetCount(p.reqDurationSecondsCount[key]) - dpDuration.SetSum(p.reqDurationSecondsSum[key]) + dpDuration.BucketCounts().FromRaw(p.reqServerDurationSecondsBucketCounts[key]) + dpDuration.SetCount(p.reqServerDurationSecondsCount[key]) + dpDuration.SetSum(p.reqServerDurationSecondsSum[key]) // TODO: Support exemplars + dimensions, ok := p.dimensionsForSeries(key) + if !ok { + return fmt.Errorf("failed to find dimensions for key %s", key) + } + dimensions.CopyTo(dpDuration.Attributes()) + } + return nil +} + +func (p *serviceGraphProcessor) collectServerLatencyMetrics(ilm pmetric.ScopeMetrics, mName string) error { + for key := range p.reqServerDurationSecondsCount { + mDuration := ilm.Metrics().AppendEmpty() + mDuration.SetName(mName) + // TODO: Support other aggregation temporalities + mDuration.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + + timestamp := pcommon.NewTimestampFromTime(time.Now()) + + dpDuration := mDuration.Histogram().DataPoints().AppendEmpty() + dpDuration.SetStartTimestamp(pcommon.NewTimestampFromTime(p.startTime)) + dpDuration.SetTimestamp(timestamp) + dpDuration.ExplicitBounds().FromRaw(p.reqDurationBounds) + dpDuration.BucketCounts().FromRaw(p.reqClientDurationSecondsBucketCounts[key]) + dpDuration.SetCount(p.reqClientDurationSecondsCount[key]) + dpDuration.SetSum(p.reqClientDurationSecondsSum[key]) + + // TODO: Support exemplars dimensions, ok := p.dimensionsForSeries(key) if !ok { return fmt.Errorf("failed to find dimensions for key %s", key) @@ -562,9 +620,12 @@ func (p *serviceGraphProcessor) cleanCache() { for _, key := range staleSeries { delete(p.reqTotal, key) delete(p.reqFailedTotal, key) - delete(p.reqDurationSecondsCount, key) - delete(p.reqDurationSecondsSum, key) - delete(p.reqDurationSecondsBucketCounts, key) + delete(p.reqClientDurationSecondsCount, key) + delete(p.reqClientDurationSecondsSum, key) + delete(p.reqClientDurationSecondsBucketCounts, key) + delete(p.reqServerDurationSecondsCount, key) + delete(p.reqServerDurationSecondsSum, key) + delete(p.reqServerDurationSecondsBucketCounts, key) } p.seriesMutex.Unlock() } diff --git a/processor/servicegraphprocessor/processor_test.go b/processor/servicegraphprocessor/processor_test.go index 1c3fae89b4cc..3f5cb7c71550 100644 --- a/processor/servicegraphprocessor/processor_test.go +++ b/processor/servicegraphprocessor/processor_test.go @@ -257,7 +257,7 @@ func TestConnectorConsume(t *testing.T) { } func verifyHappyCaseMetrics(t *testing.T, md pmetric.Metrics) { - assert.Equal(t, 2, md.MetricCount()) + assert.Equal(t, 3, md.MetricCount()) rms := md.ResourceMetrics() assert.Equal(t, 1, rms.Len()) @@ -266,13 +266,18 @@ func verifyHappyCaseMetrics(t *testing.T, md pmetric.Metrics) { assert.Equal(t, 1, sms.Len()) ms := sms.At(0).Metrics() - assert.Equal(t, 2, ms.Len()) + assert.Equal(t, 3, ms.Len()) mCount := ms.At(0) verifyCount(t, mCount) - mDuration := ms.At(1) - verifyDuration(t, mDuration) + mServerDuration := ms.At(1) + assert.Equal(t, "traces_service_graph_request_server_seconds", mServerDuration.Name()) + verifyDuration(t, mServerDuration) + + mClientDuration := ms.At(2) + assert.Equal(t, "traces_service_graph_request_client_seconds", mClientDuration.Name()) + verifyDuration(t, mClientDuration) } func verifyCount(t *testing.T, m pmetric.Metric) { @@ -296,8 +301,6 @@ func verifyCount(t *testing.T, m pmetric.Metric) { } func verifyDuration(t *testing.T, m pmetric.Metric) { - assert.Equal(t, "traces_service_graph_request_duration_seconds", m.Name()) - assert.Equal(t, pmetric.MetricTypeHistogram, m.Type()) dps := m.Histogram().DataPoints() assert.Equal(t, 1, dps.Len()) @@ -464,13 +467,16 @@ func (m *mockMetricsExporter) ConsumeMetrics(context.Context, pmetric.Metrics) e func TestUpdateDurationMetrics(t *testing.T) { p := serviceGraphProcessor{ - reqTotal: make(map[string]int64), - reqFailedTotal: make(map[string]int64), - reqDurationSecondsSum: make(map[string]float64), - reqDurationSecondsCount: make(map[string]uint64), - reqDurationBounds: defaultLatencyHistogramBucketsMs, - reqDurationSecondsBucketCounts: make(map[string][]uint64), - keyToMetric: make(map[string]metricSeries), + reqTotal: make(map[string]int64), + reqFailedTotal: make(map[string]int64), + reqServerDurationSecondsSum: make(map[string]float64), + reqServerDurationSecondsCount: make(map[string]uint64), + reqServerDurationSecondsBucketCounts: make(map[string][]uint64), + reqClientDurationSecondsSum: make(map[string]float64), + reqClientDurationSecondsCount: make(map[string]uint64), + reqClientDurationSecondsBucketCounts: make(map[string][]uint64), + reqDurationBounds: defaultLatencyHistogramBucketsMs, + keyToMetric: make(map[string]metricSeries), config: &Config{ Dimensions: []string{}, }, @@ -497,7 +503,7 @@ func TestUpdateDurationMetrics(t *testing.T) { } for _, tc := range testCases { t.Run(tc.caseStr, func(t *testing.T) { - p.updateDurationMetrics(metricKey, tc.duration) + p.updateDurationMetrics(metricKey, tc.duration, tc.duration) }) } }