-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix panic caused by race condition when accessing span attributes #12661
Fix panic caused by race condition when accessing span attributes #12661
Conversation
Signed-off-by: albertteoh <[email protected]>
@@ -293,6 +299,8 @@ func TestProcessorConsumeTraces(t *testing.T) { | |||
err := p.ConsumeTraces(ctx, traces) | |||
|
|||
// Verify | |||
<-done // Wait till ConsumeMetrics verification is done. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixes a "test bug" I discovered while trying to reproduce the panic. As we now compute metrics in a separate goroutine, the execution of p.ConsumeTraces(ctx, traces)
completes immediately, not allowing the metrics aggregation goroutine to complete and terminating it prematurely, that could lead to false positives.
This change ensures that the verification will only be executed once metrics aggregation is complete (i.e. when the "next.ConsumeMetrics" is invoked).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would the test in its current form cause the panic when the main code does not have the fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the test in its current form won't cause the panic.
Signed-off-by: albertteoh <[email protected]>
@@ -228,6 +228,7 @@ func (p *processorImp) ConsumeTraces(ctx context.Context, traces ptrace.Traces) | |||
// that should not interfere with the flow of trace data because | |||
// it is an orthogonal concern to the trace flow (it should not impact | |||
// upstream or downstream pipeline trace components). | |||
tracesClone := traces.Clone() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This to me sounds right the correct fix in this design, but this smells to me as a bad design somehow that we have to do this. The "pipelines" are correctly constructed and passed the right clone of the data by the collector (using capabilities, etc.), and somehow this breaks that assumption and considerably adds more CPU than simply executing the code on the critical path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feedback @bogdandrutu. I'm definitely open to ideas to improve the design. 😄
The original intent with the design change was that we're effectively creating a new stream of metrics data coming out of traces and, in my mind, the propagation of trace data is an orthogonal concern to metrics. That is, the trace pipeline should not depend on, or be impacted by, other components of a downstream metrics pipeline as well as any errors and latency introduced by them. That is, I feel that spanmetricsprocessor should just be a passthrough operation for traces.
Are there existing components that translate from one telemetry type to another, whose design pattern we could follow for this processor?
The "pipelines" are correctly constructed and passed the right clone of the data by the collector (using capabilities, etc.), and somehow this breaks that assumption and considerably adds more CPU than simply executing the code on the critical path.
I'm not very familiar with some concepts around the collector. For my learning, could you please elaborate more around what is meant by the "right clone" and "capabilities" and how this design adds considerably more CPU?
This also raises an existential question of whether this component should exist at all if it breaks some fundamental assumptions of the collector.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is the only processor housed in contrib that generates telemetry of a different signal type than the signal of the pipeline the processor runs in, but it is not the only processor that "short-circuits" data immediately to an exporter (the routingprocessor also does that). The need to branch between signal types has been brought up before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this processor has already been accepted and is widely used, I don't think now is the time to discuss whether or not it should've been rejected due to the way it interacts with multiple signals. Feels like a good discussion for the SIG meeting if it even needs to happen at all.
For this issue, I think cloning the traces payload is a reasonable solution to allow downstream trace processing to continue while this processor generates and forwards new metrics from its trace payload.
If cloning the traces payload has substantial performance impact then it could be noted in this processor's README as a Warning, similar to the warning sections of the cumulativetodelta processor or transform processor. Would probably be appropriate to add a "Performance Impact" to the Collector's standard warnings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I am proposing actually executing the logic on the critical path, instead of cloning to execute async the metrics logic. Not suggesting to remove the processor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I can do this.
As for error handling, I'm proposing to return errors caused from any logic in this processor, but only logging errors from the metrics pipeline (i.e. ConsumeMetrics
).
That is:
func (p *processorImp) ConsumeTraces(ctx context.Context, traces ptrace.Traces) error {
if err := p.tracesToMetrics(ctx, traces); err != nil {
return err
}
// Forward trace data unmodified.
return p.nextConsumer.ConsumeTraces(ctx, traces)
}
func (p *processorImp) tracesToMetrics(ctx context.Context, traces ptrace.Traces) error {
p.lock.Lock()
defer p.lock.Unlock()
p.aggregateMetrics(traces)
m, err := p.buildMetrics()
if err != nil {
return err
}
if err = p.metricsExporter.ConsumeMetrics(ctx, *m); err != nil {
// Avoid penalising trace processing caused by metrics processing.
p.logger.Error(err.Error())
}
p.resetExemplarData()
return nil
}
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would do multierr.Combine(p.tracesToMetrics(ctx, traces), p.nextConsumer.ConsumeTraces(ctx, traces))
since this processor fanout to 2 components, would call both independent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion. Updated in: 9858d30
Can this be merged please ? The issue is blocking my environment. Thank you in advance |
Signed-off-by: albertteoh <[email protected]>
Signed-off-by: albertteoh <[email protected]>
Signed-off-by: albertteoh <[email protected]>
Hi, |
This will get merged once it gets all the approvals. From what I can see, it's currently pending on @bogdandrutu's review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left one nit, otherwise LGTM.
@@ -228,28 +229,33 @@ func (p *processorImp) ConsumeTraces(ctx context.Context, traces ptrace.Traces) | |||
// that should not interfere with the flow of trace data because | |||
// it is an orthogonal concern to the trace flow (it should not impact | |||
// upstream or downstream pipeline trace components). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parts of this comment are no longer accurate. Can you update it to reflect the new process?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated comment in: af3dc8f.
It's mostly deleting since I don't think that line of code is too controversial.
Signed-off-by: albertteoh <[email protected]>
Signed-off-by: albertteoh [email protected]
Description:
There is a race condition in
Map.Get
exposed byspanmetricsprocessor
:that can trigger an
index out of range [0] with length 0
error when there was 1 element detected in the slice, the loop is entered and, while attempting to access the first element, the slice is found to be empty.This is because the metrics are computed in a separate goroutine from the trace "stream":
The fix is to
Clone()
the traces and use this clone within the traces->metrics aggregation goroutine.Link to tracking Issue: Fixes #12644
Testing:
The race condition was reproduced via unit tests, with some hacks:
spanmetricsprocessor.ConsumeTraces
to sleep for a short period of time (1 ms) after the goroutine, then clearing every span's attributes. Why the short sleep? Because we want the spanmetrics processor to think that nothing has change right up until it enters the loop inMap.Get
(see next step):Map.Get
to sleep slightly longer than step 1. (2 ms) within the for loop to allow step 1. to finish emptying all the attributes so that by the time we invokeakv := &(*m.orig)[i]
,m.orig
will be empty:panic
:Applying the following fix passes the unit test:
Unfortunately, there does not appear to be a feasible means to add a unit test to guard against this race condition because the problem is triggered within an external library.
At the very least, it should be possible to assert that the metrics are aggregated on a copy
traces
and the nextConsumerTraces
invocation uses the original.As such, I propose to do this in another PR because it would involve a fairly large code refactor and would not want this to: 1. block the fix from being released 2. create too much noise in this PR.
Open to suggestions!