diff --git a/processor/spanmetricsprocessor/processor.go b/processor/spanmetricsprocessor/processor.go index 747cf0de4e76..3faa6299dc1e 100644 --- a/processor/spanmetricsprocessor/processor.go +++ b/processor/spanmetricsprocessor/processor.go @@ -375,7 +375,13 @@ func (p *processorImp) aggregateMetricsForServiceSpans(rspans ptrace.ResourceSpa } func (p *processorImp) aggregateMetricsForSpan(serviceName string, span ptrace.Span, resourceAttr pcommon.Map) { - latencyInMilliseconds := float64(span.EndTimestamp()-span.StartTimestamp()) / float64(time.Millisecond.Nanoseconds()) + // Protect against end timestamps before start timestamps. Assume 0 duration. + latencyInMilliseconds := float64(0) + startTime := span.StartTimestamp() + endTime := span.EndTimestamp() + if endTime > startTime { + latencyInMilliseconds = float64(endTime-startTime) / float64(time.Millisecond.Nanoseconds()) + } // Binary search to find the latencyInMilliseconds bucket index. index := sort.SearchFloat64s(p.latencyBounds, latencyInMilliseconds) diff --git a/processor/spanmetricsprocessor/processor_test.go b/processor/spanmetricsprocessor/processor_test.go index 5d640d88089f..bb88936e3267 100644 --- a/processor/spanmetricsprocessor/processor_test.go +++ b/processor/spanmetricsprocessor/processor_test.go @@ -270,6 +270,13 @@ func TestProcessorConsumeTraces(t *testing.T) { verifier: verifyConsumeMetricsInputDelta, traces: []ptrace.Traces{buildSampleTrace(), buildSampleTrace()}, }, + { + // Consumptions with improper timestamps + name: "Test bad consumptions (Delta).", + aggregationTemporality: cumulative, + verifier: verifyBadMetricsOkay, + traces: []ptrace.Traces{buildBadSampleTrace()}, + }, } for _, tc := range testcases { @@ -404,6 +411,10 @@ func verifyConsumeMetricsInputCumulative(t testing.TB, input pmetric.Metrics) bo return verifyConsumeMetricsInput(t, input, pmetric.MetricAggregationTemporalityCumulative, 1) } +func verifyBadMetricsOkay(t testing.TB, input pmetric.Metrics) bool { + return true // Validating no exception +} + // verifyConsumeMetricsInputDelta expects one accumulation of metrics, and marked as delta func verifyConsumeMetricsInputDelta(t testing.TB, input pmetric.Metrics) bool { return verifyConsumeMetricsInput(t, input, pmetric.MetricAggregationTemporalityDelta, 1) @@ -539,6 +550,16 @@ func verifyMetricLabels(dp metricDataPoint, t testing.TB, seenMetricIDs map[metr seenMetricIDs[mID] = true } +func buildBadSampleTrace() ptrace.Traces { + badTrace := buildSampleTrace() + span := badTrace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) + now := time.Now() + // Flipping timestamp for a bad duration + span.SetEndTimestamp(pcommon.NewTimestampFromTime(now)) + span.SetStartTimestamp(pcommon.NewTimestampFromTime(now.Add(sampleLatencyDuration))) + return badTrace +} + // buildSampleTrace builds the following trace: // service-a/ping (server) -> // service-a/ping (client) -> diff --git a/unreleased/spanmetrics_7250.yaml b/unreleased/spanmetrics_7250.yaml new file mode 100644 index 000000000000..71feaa40d3ad --- /dev/null +++ b/unreleased/spanmetrics_7250.yaml @@ -0,0 +1,17 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: spanmetricsprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Modifies spanmetrics processor to handle negative durations without crashing. Related to open-telemetry/opentelemetry-js-contrib#1013 + + +# One or more tracking issues related to the change +issues: [7250] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Sets negative durations to count towards the smallest histogram bucket.