Skip to content

Commit

Permalink
[servicegraphprocessor/servicegraphconnector] Change metric names to …
Browse files Browse the repository at this point in the history
…match the spec (#21098)

* Change metrics to match Tempo spec

* Update tests

* Add chlog entry

* Fix duration

* Poke CI

* Add legacy metrics under flag

* Typo
  • Loading branch information
mapno authored Jun 8, 2023
1 parent d0fe5f3 commit f722fe4
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 58 deletions.
18 changes: 18 additions & 0 deletions .chloggen/servicegraph-fix-metric-names.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: servicegraphprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Change metric names to match the spec

# One or more tracking issues related to the change
issues: [18743, 16578]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Latency metric `traces_service_graph_request_duration_seconds` are deprecated in favor of server and client metrics |
`traces_service_graph_server_request_seconds` and `traces_service_graph_client_request_seconds` |
respectively. Use the feature gate `processor.servicegraph.legacyLatencyMetricNames` to enable the old metric names.
22 changes: 14 additions & 8 deletions processor/servicegraphprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/processor"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/servicegraphprocessor/internal/metadata"
)

const (
// The value of "type" key in configuration.
typeStr = "servicegraph"
// The stability level of the processor.
connectorStability = component.StabilityLevelDevelopment
virtualNodeFeatureGateID = "processor.servicegraph.virtualNode"
connectorStability = component.StabilityLevelDevelopment
virtualNodeFeatureGateID = "processor.servicegraph.virtualNode"
legacyLatencyMetricNamesFeatureGateID = "processor.servicegraph.legacyLatencyMetricNames"
)

var virtualNodeFeatureGate *featuregate.Gate
var virtualNodeFeatureGate, legacyMetricNamesFeatureGate *featuregate.Gate

func init() {
virtualNodeFeatureGate = featuregate.GlobalRegistry().MustRegister(
Expand All @@ -33,6 +33,13 @@ func init() {
featuregate.WithRegisterDescription("When enabled, when the edge expires, processor checks if it has peer attributes(`db.name, net.sock.peer.addr, net.peer.name, rpc.service, http.url, http.target`), and then aggregate the metrics with virtual node."),
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/17196"),
)
// TODO: Remove this feature gate when the legacy metric names are removed.
legacyMetricNamesFeatureGate = featuregate.GlobalRegistry().MustRegister(
legacyLatencyMetricNamesFeatureGateID,
featuregate.StageAlpha, // Alpha because we want it disabled by default.
featuregate.WithRegisterDescription("When enabled, processor uses legacy latency metric names."),
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/18743,https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/16578"),
)
}

// NewFactory creates a factory for the servicegraph processor.
Expand All @@ -41,9 +48,9 @@ func NewFactory() processor.Factory {
_ = view.Register(serviceGraphProcessorViews()...)

return processor.NewFactory(
metadata.Type,
typeStr,
createDefaultConfig,
processor.WithTraces(createTracesProcessor, metadata.TracesStability),
processor.WithTraces(createTracesProcessor, connectorStability),
)
}

Expand All @@ -52,7 +59,6 @@ func NewConnectorFactoryFunc(cfgType component.Type, tracesToMetricsStability co
return func() connector.Factory {
// TODO: Handle this err
_ = view.Register(serviceGraphProcessorViews()...)

return connector.NewFactory(
cfgType,
createDefaultConfig,
Expand Down
133 changes: 97 additions & 36 deletions processor/servicegraphprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,16 @@ type serviceGraphProcessor struct {

startTime time.Time

seriesMutex sync.Mutex
reqTotal map[string]int64
reqFailedTotal map[string]int64
reqDurationSecondsSum map[string]float64
reqDurationSecondsCount map[string]uint64
reqDurationBounds []float64
reqDurationSecondsBucketCounts map[string][]uint64
seriesMutex sync.Mutex
reqTotal map[string]int64
reqFailedTotal map[string]int64
reqClientDurationSecondsCount map[string]uint64
reqClientDurationSecondsSum map[string]float64
reqClientDurationSecondsBucketCounts map[string][]uint64
reqServerDurationSecondsCount map[string]uint64
reqServerDurationSecondsSum map[string]float64
reqServerDurationSecondsBucketCounts map[string][]uint64
reqDurationBounds []float64

metricMutex sync.RWMutex
keyToMetric map[string]metricSeries
Expand Down Expand Up @@ -93,17 +96,20 @@ func newProcessor(logger *zap.Logger, config component.Config) *serviceGraphProc
}

return &serviceGraphProcessor{
config: pConfig,
logger: logger,
startTime: time.Now(),
reqTotal: make(map[string]int64),
reqFailedTotal: make(map[string]int64),
reqDurationSecondsSum: make(map[string]float64),
reqDurationSecondsCount: make(map[string]uint64),
reqDurationBounds: bounds,
reqDurationSecondsBucketCounts: make(map[string][]uint64),
keyToMetric: make(map[string]metricSeries),
shutdownCh: make(chan interface{}),
config: pConfig,
logger: logger,
startTime: time.Now(),
reqTotal: make(map[string]int64),
reqFailedTotal: make(map[string]int64),
reqClientDurationSecondsCount: make(map[string]uint64),
reqClientDurationSecondsSum: make(map[string]float64),
reqClientDurationSecondsBucketCounts: make(map[string][]uint64),
reqServerDurationSecondsCount: make(map[string]uint64),
reqServerDurationSecondsSum: make(map[string]float64),
reqServerDurationSecondsBucketCounts: make(map[string][]uint64),
reqDurationBounds: bounds,
keyToMetric: make(map[string]metricSeries),
shutdownCh: make(chan interface{}),
}
}

Expand Down Expand Up @@ -336,17 +342,14 @@ func (p *serviceGraphProcessor) aggregateMetricsForEdge(e *store.Edge) {
metricKey := p.buildMetricKey(e.ClientService, e.ServerService, string(e.ConnectionType), e.Dimensions)
dimensions := buildDimensions(e)

// TODO: Consider configuring server or client latency
duration := e.ServerLatencySec

p.seriesMutex.Lock()
defer p.seriesMutex.Unlock()
p.updateSeries(metricKey, dimensions)
p.updateCountMetrics(metricKey)
if e.Failed {
p.updateErrorMetrics(metricKey)
}
p.updateDurationMetrics(metricKey, duration)
p.updateDurationMetrics(metricKey, e.ServerLatencySec, e.ClientLatencySec)
}

func (p *serviceGraphProcessor) updateSeries(key string, dimensions pcommon.Map) {
Expand All @@ -373,14 +376,29 @@ func (p *serviceGraphProcessor) updateCountMetrics(key string) { p.reqTotal[key]

func (p *serviceGraphProcessor) updateErrorMetrics(key string) { p.reqFailedTotal[key]++ }

func (p *serviceGraphProcessor) updateDurationMetrics(key string, duration float64) {
func (p *serviceGraphProcessor) updateDurationMetrics(key string, serverDuration, clientDuration float64) {
p.updateServerDurationMetrics(key, serverDuration)
p.updateClientDurationMetrics(key, clientDuration)
}

func (p *serviceGraphProcessor) updateServerDurationMetrics(key string, duration float64) {
index := sort.SearchFloat64s(p.reqDurationBounds, duration) // Search bucket index
if _, ok := p.reqDurationSecondsBucketCounts[key]; !ok {
p.reqDurationSecondsBucketCounts[key] = make([]uint64, len(p.reqDurationBounds)+1)
if _, ok := p.reqServerDurationSecondsBucketCounts[key]; !ok {
p.reqServerDurationSecondsBucketCounts[key] = make([]uint64, len(p.reqDurationBounds)+1)
}
p.reqDurationSecondsSum[key] += duration
p.reqDurationSecondsCount[key]++
p.reqDurationSecondsBucketCounts[key][index]++
p.reqServerDurationSecondsSum[key] += duration
p.reqServerDurationSecondsCount[key]++
p.reqServerDurationSecondsBucketCounts[key][index]++
}

func (p *serviceGraphProcessor) updateClientDurationMetrics(key string, duration float64) {
index := sort.SearchFloat64s(p.reqDurationBounds, duration) // Search bucket index
if _, ok := p.reqClientDurationSecondsBucketCounts[key]; !ok {
p.reqClientDurationSecondsBucketCounts[key] = make([]uint64, len(p.reqDurationBounds)+1)
}
p.reqClientDurationSecondsSum[key] += duration
p.reqClientDurationSecondsCount[key]++
p.reqClientDurationSecondsBucketCounts[key][index]++
}

func buildDimensions(e *store.Edge) pcommon.Map {
Expand Down Expand Up @@ -460,9 +478,22 @@ func (p *serviceGraphProcessor) collectCountMetrics(ilm pmetric.ScopeMetrics) er
}

func (p *serviceGraphProcessor) collectLatencyMetrics(ilm pmetric.ScopeMetrics) error {
for key := range p.reqDurationSecondsCount {
// TODO: Remove this once legacy metric names are removed
if legacyMetricNamesFeatureGate.IsEnabled() {
return p.collectServerLatencyMetrics(ilm, "traces_service_graph_request_duration_seconds")
}

if err := p.collectServerLatencyMetrics(ilm, "traces_service_graph_request_server_seconds"); err != nil {
return err
}

return p.collectClientLatencyMetrics(ilm)
}

func (p *serviceGraphProcessor) collectClientLatencyMetrics(ilm pmetric.ScopeMetrics) error {
for key := range p.reqServerDurationSecondsCount {
mDuration := ilm.Metrics().AppendEmpty()
mDuration.SetName("traces_service_graph_request_duration_seconds")
mDuration.SetName("traces_service_graph_request_client_seconds")
// TODO: Support other aggregation temporalities
mDuration.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)

Expand All @@ -472,12 +503,39 @@ func (p *serviceGraphProcessor) collectLatencyMetrics(ilm pmetric.ScopeMetrics)
dpDuration.SetStartTimestamp(pcommon.NewTimestampFromTime(p.startTime))
dpDuration.SetTimestamp(timestamp)
dpDuration.ExplicitBounds().FromRaw(p.reqDurationBounds)
dpDuration.BucketCounts().FromRaw(p.reqDurationSecondsBucketCounts[key])
dpDuration.SetCount(p.reqDurationSecondsCount[key])
dpDuration.SetSum(p.reqDurationSecondsSum[key])
dpDuration.BucketCounts().FromRaw(p.reqServerDurationSecondsBucketCounts[key])
dpDuration.SetCount(p.reqServerDurationSecondsCount[key])
dpDuration.SetSum(p.reqServerDurationSecondsSum[key])

// TODO: Support exemplars
dimensions, ok := p.dimensionsForSeries(key)
if !ok {
return fmt.Errorf("failed to find dimensions for key %s", key)
}

dimensions.CopyTo(dpDuration.Attributes())
}
return nil
}

func (p *serviceGraphProcessor) collectServerLatencyMetrics(ilm pmetric.ScopeMetrics, mName string) error {
for key := range p.reqServerDurationSecondsCount {
mDuration := ilm.Metrics().AppendEmpty()
mDuration.SetName(mName)
// TODO: Support other aggregation temporalities
mDuration.SetEmptyHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)

timestamp := pcommon.NewTimestampFromTime(time.Now())

dpDuration := mDuration.Histogram().DataPoints().AppendEmpty()
dpDuration.SetStartTimestamp(pcommon.NewTimestampFromTime(p.startTime))
dpDuration.SetTimestamp(timestamp)
dpDuration.ExplicitBounds().FromRaw(p.reqDurationBounds)
dpDuration.BucketCounts().FromRaw(p.reqClientDurationSecondsBucketCounts[key])
dpDuration.SetCount(p.reqClientDurationSecondsCount[key])
dpDuration.SetSum(p.reqClientDurationSecondsSum[key])

// TODO: Support exemplars
dimensions, ok := p.dimensionsForSeries(key)
if !ok {
return fmt.Errorf("failed to find dimensions for key %s", key)
Expand Down Expand Up @@ -562,9 +620,12 @@ func (p *serviceGraphProcessor) cleanCache() {
for _, key := range staleSeries {
delete(p.reqTotal, key)
delete(p.reqFailedTotal, key)
delete(p.reqDurationSecondsCount, key)
delete(p.reqDurationSecondsSum, key)
delete(p.reqDurationSecondsBucketCounts, key)
delete(p.reqClientDurationSecondsCount, key)
delete(p.reqClientDurationSecondsSum, key)
delete(p.reqClientDurationSecondsBucketCounts, key)
delete(p.reqServerDurationSecondsCount, key)
delete(p.reqServerDurationSecondsSum, key)
delete(p.reqServerDurationSecondsBucketCounts, key)
}
p.seriesMutex.Unlock()
}
Expand Down
34 changes: 20 additions & 14 deletions processor/servicegraphprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func TestConnectorConsume(t *testing.T) {
}

func verifyHappyCaseMetrics(t *testing.T, md pmetric.Metrics) {
assert.Equal(t, 2, md.MetricCount())
assert.Equal(t, 3, md.MetricCount())

rms := md.ResourceMetrics()
assert.Equal(t, 1, rms.Len())
Expand All @@ -266,13 +266,18 @@ func verifyHappyCaseMetrics(t *testing.T, md pmetric.Metrics) {
assert.Equal(t, 1, sms.Len())

ms := sms.At(0).Metrics()
assert.Equal(t, 2, ms.Len())
assert.Equal(t, 3, ms.Len())

mCount := ms.At(0)
verifyCount(t, mCount)

mDuration := ms.At(1)
verifyDuration(t, mDuration)
mServerDuration := ms.At(1)
assert.Equal(t, "traces_service_graph_request_server_seconds", mServerDuration.Name())
verifyDuration(t, mServerDuration)

mClientDuration := ms.At(2)
assert.Equal(t, "traces_service_graph_request_client_seconds", mClientDuration.Name())
verifyDuration(t, mClientDuration)
}

func verifyCount(t *testing.T, m pmetric.Metric) {
Expand All @@ -296,8 +301,6 @@ func verifyCount(t *testing.T, m pmetric.Metric) {
}

func verifyDuration(t *testing.T, m pmetric.Metric) {
assert.Equal(t, "traces_service_graph_request_duration_seconds", m.Name())

assert.Equal(t, pmetric.MetricTypeHistogram, m.Type())
dps := m.Histogram().DataPoints()
assert.Equal(t, 1, dps.Len())
Expand Down Expand Up @@ -464,13 +467,16 @@ func (m *mockMetricsExporter) ConsumeMetrics(context.Context, pmetric.Metrics) e

func TestUpdateDurationMetrics(t *testing.T) {
p := serviceGraphProcessor{
reqTotal: make(map[string]int64),
reqFailedTotal: make(map[string]int64),
reqDurationSecondsSum: make(map[string]float64),
reqDurationSecondsCount: make(map[string]uint64),
reqDurationBounds: defaultLatencyHistogramBucketsMs,
reqDurationSecondsBucketCounts: make(map[string][]uint64),
keyToMetric: make(map[string]metricSeries),
reqTotal: make(map[string]int64),
reqFailedTotal: make(map[string]int64),
reqServerDurationSecondsSum: make(map[string]float64),
reqServerDurationSecondsCount: make(map[string]uint64),
reqServerDurationSecondsBucketCounts: make(map[string][]uint64),
reqClientDurationSecondsSum: make(map[string]float64),
reqClientDurationSecondsCount: make(map[string]uint64),
reqClientDurationSecondsBucketCounts: make(map[string][]uint64),
reqDurationBounds: defaultLatencyHistogramBucketsMs,
keyToMetric: make(map[string]metricSeries),
config: &Config{
Dimensions: []string{},
},
Expand All @@ -497,7 +503,7 @@ func TestUpdateDurationMetrics(t *testing.T) {
}
for _, tc := range testCases {
t.Run(tc.caseStr, func(t *testing.T) {
p.updateDurationMetrics(metricKey, tc.duration)
p.updateDurationMetrics(metricKey, tc.duration, tc.duration)
})
}
}
Expand Down

0 comments on commit f722fe4

Please sign in to comment.