Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/transform] Instrument the transform processor to emit traces #33508

Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 50 additions & 6 deletions pkg/ottl/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,22 @@ import (
"context"
"errors"
"fmt"
"sync"

"github.com/alecthomas/participle/v2"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"go.uber.org/zap"
)

const (
logAttributeTraceID = "trace_id"
logAttributeSpanID = "span_id"
)

// Statement holds a top level Statement for processing telemetry data. A Statement is a combination of a function
// invocation and the boolean expression to match telemetry for invoking the function.
type Statement[K any] struct {
Expand Down Expand Up @@ -233,6 +241,8 @@ type StatementSequence[K any] struct {
statements []*Statement[K]
errorMode ErrorMode
telemetrySettings component.TelemetrySettings
tracer trace.Tracer
tracerOnce sync.Once
}

type StatementSequenceOption[K any] func(*StatementSequence[K])
Expand All @@ -252,6 +262,10 @@ func NewStatementSequence[K any](statements []*Statement[K], telemetrySettings c
statements: statements,
errorMode: PropagateError,
telemetrySettings: telemetrySettings,
tracer: &noop.Tracer{},
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved
}
if telemetrySettings.TracerProvider != nil {
s.tracer = telemetrySettings.TracerProvider.Tracer("ottl")
}
for _, op := range options {
op(&s)
Expand All @@ -264,12 +278,16 @@ func NewStatementSequence[K any](statements []*Statement[K], telemetrySettings c
// When the ErrorMode of the StatementSequence is `ignore`, errors are logged and execution continues to the next statement.
// When the ErrorMode of the StatementSequence is `silent`, errors are not logged and execution continues to the next statement.
func (s *StatementSequence[K]) Execute(ctx context.Context, tCtx K) error {
tracer := s.telemetrySettings.TracerProvider.Tracer("ottl")
ctx, sequenceSpan := tracer.Start(ctx, "ottl/StatementSequenceExecution")
ctx, sequenceSpan := s.getTracer().Start(ctx, "ottl/StatementSequenceExecution")
defer sequenceSpan.End()
s.telemetrySettings.Logger.Debug("initial TransformContext", zap.Any("TransformContext", tCtx))
s.telemetrySettings.Logger.Debug(
"initial TransformContext",
zap.Any("TransformContext", tCtx),
zap.String(logAttributeTraceID, sequenceSpan.SpanContext().TraceID().String()),
zap.String(logAttributeSpanID, sequenceSpan.SpanContext().SpanID().String()),
)
for _, statement := range s.statements {
statementCtx, statementSpan := tracer.Start(ctx, "ottl/StatementExecution")
statementCtx, statementSpan := s.getTracer().Start(ctx, "ottl/StatementExecution")
statementSpan.SetAttributes(
attribute.KeyValue{
Key: "statement",
Expand All @@ -283,7 +301,14 @@ func (s *StatementSequence[K]) Execute(ctx context.Context, tCtx K) error {
Value: attribute.BoolValue(condition),
},
)
s.telemetrySettings.Logger.Debug("TransformContext after statement execution", zap.String("statement", statement.origText), zap.Bool("condition matched", condition), zap.Any("TransformContext", tCtx))
s.telemetrySettings.Logger.Debug(
"TransformContext after statement execution",
zap.String("statement", statement.origText),
zap.Bool("condition matched", condition),
zap.Any("TransformContext", tCtx),
zap.String(logAttributeTraceID, statementSpan.SpanContext().TraceID().String()),
zap.String(logAttributeSpanID, statementSpan.SpanContext().SpanID().String()),
)
if err != nil {
statementSpan.RecordError(err)
errMsg := fmt.Sprintf("failed to execute statement '%s': %v", statement.origText, err)
Expand All @@ -295,7 +320,13 @@ func (s *StatementSequence[K]) Execute(ctx context.Context, tCtx K) error {
return err
}
if s.errorMode == IgnoreError {
s.telemetrySettings.Logger.Warn("failed to execute statement", zap.Error(err), zap.String("statement", statement.origText))
s.telemetrySettings.Logger.Warn(
"failed to execute statement",
zap.Error(err),
zap.String("statement", statement.origText),
zap.String(logAttributeTraceID, statementSpan.SpanContext().TraceID().String()),
zap.String(logAttributeSpanID, statementSpan.SpanContext().SpanID().String()),
)
}
} else {
statementSpan.SetStatus(codes.Ok, "statement executed successfully")
Expand All @@ -306,6 +337,19 @@ func (s *StatementSequence[K]) Execute(ctx context.Context, tCtx K) error {
return nil
}

func (s *StatementSequence[K]) getTracer() trace.Tracer {
s.tracerOnce.Do(func() {
TylerHelmuth marked this conversation as resolved.
Show resolved Hide resolved
if s.tracer == nil {
if s.telemetrySettings.TracerProvider != nil {
s.tracer = s.telemetrySettings.TracerProvider.Tracer("ottl")
} else {
s.tracer = &noop.Tracer{}
}
}
})
return s.tracer
}

// ConditionSequence represents a list of Conditions that will be evaluated sequentially for a TransformContext
// and will handle errors returned by conditions based on an ErrorMode.
// By default, the conditions are ORed together, but they can be ANDed together using the WithLogicOperation option.
Expand Down
Loading