Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix panic caused by race condition when accessing span attributes #12661

Merged
merged 6 commits into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion processor/spanmetricsprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
go.opentelemetry.io/collector v0.56.0
go.opentelemetry.io/collector/pdata v0.56.0
go.opentelemetry.io/collector/semconv v0.56.0
go.uber.org/multierr v1.8.0
go.uber.org/zap v1.21.0
google.golang.org/grpc v1.48.0
)
Expand Down Expand Up @@ -80,7 +81,6 @@ require (
go.opentelemetry.io/otel/metric v0.31.0 // indirect
go.opentelemetry.io/otel/trace v1.8.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect
golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b // indirect
golang.org/x/text v0.3.7 // indirect
Expand Down
51 changes: 24 additions & 27 deletions processor/spanmetricsprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor/internal/cache"
Expand Down Expand Up @@ -220,36 +221,32 @@ func (p *processorImp) Capabilities() consumer.Capabilities {
// It aggregates the trace data to generate metrics, forwarding these metrics to the discovered metrics exporter.
// The original input trace data will be forwarded to the next consumer, unmodified.
func (p *processorImp) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error {
// Execute trace to metrics aggregation as a goroutine and only log errors instead
// of failing the entire pipeline to prioritize the propagation of trace data,
// regardless of error.
//
// This processor should be treated as a branched, out-of-band process
// that should not interfere with the flow of trace data because
// it is an orthogonal concern to the trace flow (it should not impact
// upstream or downstream pipeline trace components).
go func() {
// Since this is in a goroutine, the entire func can be locked without
// impacting trace processing performance. This also significantly
// reduces the number of locks/unlocks to manage, reducing the
// concurrency-bug surface area.
p.lock.Lock()
defer p.lock.Unlock()

p.aggregateMetrics(traces)
m, err := p.buildMetrics()
// Forward trace data unmodified and propagate both metrics and trace pipeline errors, if any.
return multierr.Combine(p.tracesToMetrics(ctx, traces), p.nextConsumer.ConsumeTraces(ctx, traces))
}

if err != nil {
p.logger.Error(err.Error())
} else if err = p.metricsExporter.ConsumeMetrics(ctx, *m); err != nil {
p.logger.Error(err.Error())
}
func (p *processorImp) tracesToMetrics(ctx context.Context, traces ptrace.Traces) error {
p.lock.Lock()

p.aggregateMetrics(traces)
m, err := p.buildMetrics()

// Exemplars are only relevant to this batch of traces, so must be cleared within the lock,
// regardless of error while building metrics, before the next batch of spans is received.
p.resetExemplarData()

// This component no longer needs to read the metrics once built, so it is safe to unlock.
p.lock.Unlock()

p.resetExemplarData()
}()
if err != nil {
return err
}

// Forward trace data unmodified.
return p.nextConsumer.ConsumeTraces(ctx, traces)
if err = p.metricsExporter.ConsumeMetrics(ctx, *m); err != nil {
return err
}

return nil
}

// buildMetrics collects the computed raw metrics data, builds the metrics object and
Expand Down
37 changes: 21 additions & 16 deletions processor/spanmetricsprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"go.uber.org/zap/zaptest/observer"
"google.golang.org/grpc/metadata"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/spanmetricsprocessor/internal/cache"
Expand Down Expand Up @@ -189,18 +188,22 @@ func TestProcessorConsumeTracesErrors(t *testing.T) {
consumeTracesErr error
}{
{
name: "metricsExporter error",
consumeMetricsErr: fmt.Errorf("metricsExporter error"),
name: "ConsumeMetrics error",
consumeMetricsErr: fmt.Errorf("consume metrics error"),
},
{
name: "nextConsumer error",
consumeTracesErr: fmt.Errorf("nextConsumer error"),
name: "ConsumeTraces error",
consumeTracesErr: fmt.Errorf("consume traces error"),
},
{
name: "ConsumeMetrics and ConsumeTraces error",
consumeMetricsErr: fmt.Errorf("consume metrics error"),
consumeTracesErr: fmt.Errorf("consume traces error"),
},
} {
t.Run(tc.name, func(t *testing.T) {
// Prepare
obs, logs := observer.New(zap.ErrorLevel)
logger := zap.New(obs)
logger := zap.NewNop()

mexp := &mocks.MetricsExporter{}
mexp.On("ConsumeMetrics", mock.Anything, mock.Anything).Return(tc.consumeMetricsErr)
Expand All @@ -215,17 +218,19 @@ func TestProcessorConsumeTracesErrors(t *testing.T) {
// Test
ctx := metadata.NewIncomingContext(context.Background(), nil)
err := p.ConsumeTraces(ctx, traces)
if tc.consumeTracesErr != nil {
require.Error(t, err)
assert.EqualError(t, err, tc.consumeTracesErr.Error())
return
}

// Verify
require.NoError(t, err)
assert.Eventually(t, func() bool {
return logs.FilterMessage(tc.consumeMetricsErr.Error()).Len() > 0
}, 10*time.Second, time.Millisecond*100)
require.Error(t, err)
switch {
case tc.consumeMetricsErr != nil && tc.consumeTracesErr != nil:
assert.EqualError(t, err, tc.consumeMetricsErr.Error()+"; "+tc.consumeTracesErr.Error())
case tc.consumeMetricsErr != nil:
assert.EqualError(t, err, tc.consumeMetricsErr.Error())
case tc.consumeTracesErr != nil:
assert.EqualError(t, err, tc.consumeTracesErr.Error())
default:
assert.Fail(t, "expected at least one error")
}
})
}
}
Expand Down
16 changes: 16 additions & 0 deletions unreleased/12644-fix-panic-race.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: spanmetricsprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix panic caused by race condition when accessing span attributes.

# One or more tracking issues related to the change
issues: [12644]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: