diff --git a/.chloggen/ottl_change_statements_reflect_context_draft.yaml b/.chloggen/ottl_change_statements_reflect_context_draft.yaml new file mode 100644 index 000000000000..fd2d421b2379 --- /dev/null +++ b/.chloggen/ottl_change_statements_reflect_context_draft.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/ottl + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add support for statements to express their contexts via path names" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29017] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/pkg/ottl/contexts/internal/metric.go b/pkg/ottl/contexts/internal/metric.go index e2944a73df45..fb14229932e3 100644 --- a/pkg/ottl/contexts/internal/metric.go +++ b/pkg/ottl/contexts/internal/metric.go @@ -11,6 +11,11 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" ) +const ( + MetricContextName = "Metric" + MetricPathContext = "metric" +) + type MetricContext interface { GetMetric() pmetric.Metric } @@ -47,7 +52,7 @@ func MetricPathGetSetter[K MetricContext](path ottl.Path[K]) (ottl.GetSetter[K], case "data_points": return accessDataPoints[K](), nil default: - return nil, FormatDefaultErrorMessage(path.Name(), path.String(), "Metric", MetricRef) + return nil, FormatDefaultErrorMessage(path.Name(), path.String(), MetricContextName, MetricRef) } } diff --git a/pkg/ottl/contexts/internal/resource.go b/pkg/ottl/contexts/internal/resource.go index 2dfee7fce9f8..f0678d30cdbb 100644 --- a/pkg/ottl/contexts/internal/resource.go +++ b/pkg/ottl/contexts/internal/resource.go @@ -16,6 +16,11 @@ type ResourceContext interface { GetResourceSchemaURLItem() SchemaURLItem } +const ( + ResourceContextName = "Resource" + ResourcePathContext = "resource" +) + func ResourcePathGetSetter[K ResourceContext](path ottl.Path[K]) (ottl.GetSetter[K], error) { if path == nil { return accessResource[K](), nil @@ -31,7 +36,7 @@ func ResourcePathGetSetter[K ResourceContext](path ottl.Path[K]) (ottl.GetSetter case "schema_url": return accessResourceSchemaURLItem[K](), nil default: - return nil, FormatDefaultErrorMessage(path.Name(), path.String(), "Resource", ResourceContextRef) + return nil, FormatDefaultErrorMessage(path.Name(), path.String(), ResourceContextName, ResourceContextRef) } } diff --git a/pkg/ottl/contexts/internal/scope.go b/pkg/ottl/contexts/internal/scope.go index 6bc5d7352005..d7b0e7e8198d 100644 --- a/pkg/ottl/contexts/internal/scope.go +++ b/pkg/ottl/contexts/internal/scope.go @@ -11,6 +11,12 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" ) +const ( + InstrumentationScopeContextName = "Instrumentation Scope" + InstrumentationScopePathContext = "instrumentation_scope" + ScopePathContext = "scope" +) + type InstrumentationScopeContext interface { GetInstrumentationScope() pcommon.InstrumentationScope GetScopeSchemaURLItem() SchemaURLItem @@ -36,7 +42,7 @@ func ScopePathGetSetter[K InstrumentationScopeContext](path ottl.Path[K]) (ottl. case "schema_url": return accessInstrumentationScopeSchemaURLItem[K](), nil default: - return nil, FormatDefaultErrorMessage(path.Name(), path.String(), "Instrumentation Scope", InstrumentationScopeRef) + return nil, FormatDefaultErrorMessage(path.Name(), path.String(), InstrumentationScopeContextName, InstrumentationScopeRef) } } diff --git a/pkg/ottl/contexts/internal/span.go b/pkg/ottl/contexts/internal/span.go index 607cb2e110f9..d2d143ee102e 100644 --- a/pkg/ottl/contexts/internal/span.go +++ b/pkg/ottl/contexts/internal/span.go @@ -19,6 +19,7 @@ import ( const ( SpanContextName = "Span" + SpanPathContext = "span" ) type SpanContext interface { diff --git a/pkg/ottl/contexts/ottldatapoint/datapoint.go b/pkg/ottl/contexts/ottldatapoint/datapoint.go index a8a6f2158045..bf9c40e95273 100644 --- a/pkg/ottl/contexts/ottldatapoint/datapoint.go +++ b/pkg/ottl/contexts/ottldatapoint/datapoint.go @@ -20,7 +20,8 @@ import ( ) const ( - contextName = "DataPoint" + contextName = "DataPoint" + PathContextName = "datapoint" ) var _ internal.ResourceContext = (*TransformContext)(nil) @@ -122,6 +123,17 @@ func NewParser(functions map[string]ottl.Factory[TransformContext], telemetrySet return p, nil } +func WithPathContextNames() Option { + return func(p *ottl.Parser[TransformContext]) { + ottl.WithPathContextNames[TransformContext]([]string{ + PathContextName, + internal.ResourcePathContext, + internal.InstrumentationScopePathContext, + internal.MetricPathContext, + })(p) + } +} + type StatementSequenceOption func(*ottl.StatementSequence[TransformContext]) func WithStatementSequenceErrorMode(errorMode ottl.ErrorMode) StatementSequenceOption { @@ -183,81 +195,93 @@ func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ot if path == nil { return nil, fmt.Errorf("path cannot be nil") } - switch path.Name() { - case "cache": - if path.Keys() == nil { - return accessCache(), nil - } - return accessCacheKey(path.Keys()), nil - case "resource": - return internal.ResourcePathGetSetter[TransformContext](path.Next()) - case "instrumentation_scope": - return internal.ScopePathGetSetter[TransformContext](path.Next()) - case "metric": - return internal.MetricPathGetSetter[TransformContext](path.Next()) - case "attributes": - if path.Keys() == nil { - return accessAttributes(), nil - } - return accessAttributesKey(path.Keys()), nil - case "start_time_unix_nano": - return accessStartTimeUnixNano(), nil - case "time_unix_nano": - return accessTimeUnixNano(), nil - case "start_time": - return accessStartTime(), nil - case "time": - return accessTime(), nil - case "value_double": - return accessDoubleValue(), nil - case "value_int": - return accessIntValue(), nil - case "exemplars": - return accessExemplars(), nil - case "flags": - return accessFlags(), nil - case "count": - return accessCount(), nil - case "sum": - return accessSum(), nil - case "bucket_counts": - return accessBucketCounts(), nil - case "explicit_bounds": - return accessExplicitBounds(), nil - case "scale": - return accessScale(), nil - case "zero_count": - return accessZeroCount(), nil - case "positive": - nextPath := path.Next() - if nextPath != nil { - switch nextPath.Name() { - case "offset": - return accessPositiveOffset(), nil - case "bucket_counts": - return accessPositiveBucketCounts(), nil - default: - return nil, internal.FormatDefaultErrorMessage(nextPath.Name(), path.String(), contextName, internal.DataPointRef) + + if path.Context() == PathContextName || path.Context() == "" { + switch path.Name() { + case "cache": + if path.Keys() == nil { + return accessCache(), nil + } + return accessCacheKey(path.Keys()), nil + case "attributes": + if path.Keys() == nil { + return accessAttributes(), nil + } + return accessAttributesKey(path.Keys()), nil + case "start_time_unix_nano": + return accessStartTimeUnixNano(), nil + case "time_unix_nano": + return accessTimeUnixNano(), nil + case "start_time": + return accessStartTime(), nil + case "time": + return accessTime(), nil + case "value_double": + return accessDoubleValue(), nil + case "value_int": + return accessIntValue(), nil + case "exemplars": + return accessExemplars(), nil + case "flags": + return accessFlags(), nil + case "count": + return accessCount(), nil + case "sum": + return accessSum(), nil + case "bucket_counts": + return accessBucketCounts(), nil + case "explicit_bounds": + return accessExplicitBounds(), nil + case "scale": + return accessScale(), nil + case "zero_count": + return accessZeroCount(), nil + case "positive": + nextPath := path.Next() + if nextPath != nil { + switch nextPath.Name() { + case "offset": + return accessPositiveOffset(), nil + case "bucket_counts": + return accessPositiveBucketCounts(), nil + default: + return nil, internal.FormatDefaultErrorMessage(nextPath.Name(), path.String(), contextName, internal.DataPointRef) + } } - } - return accessPositive(), nil - case "negative": - nextPath := path.Next() - if nextPath != nil { - switch nextPath.Name() { - case "offset": - return accessNegativeOffset(), nil - case "bucket_counts": - return accessNegativeBucketCounts(), nil - default: - return nil, internal.FormatDefaultErrorMessage(nextPath.Name(), path.String(), contextName, internal.DataPointRef) + return accessPositive(), nil + case "negative": + nextPath := path.Next() + if nextPath != nil { + switch nextPath.Name() { + case "offset": + return accessNegativeOffset(), nil + case "bucket_counts": + return accessNegativeBucketCounts(), nil + default: + return nil, internal.FormatDefaultErrorMessage(nextPath.Name(), path.String(), contextName, internal.DataPointRef) + } } + return accessNegative(), nil + case "quantile_values": + return accessQuantileValues(), nil + default: + return pep.parseLowerContextPath(path.Name(), path.Next()) // BC paths without context } - return accessNegative(), nil - case "quantile_values": - return accessQuantileValues(), nil + } + + return pep.parseLowerContextPath(path.Context(), path) +} + +func (pep *pathExpressionParser) parseLowerContextPath(context string, path ottl.Path[TransformContext]) (ottl.GetSetter[TransformContext], error) { + switch context { + case internal.ResourcePathContext: + return internal.ResourcePathGetSetter(path) + case internal.InstrumentationScopePathContext: + return internal.ScopePathGetSetter(path) + case internal.MetricPathContext: + return internal.MetricPathGetSetter(path) default: - return nil, internal.FormatDefaultErrorMessage(path.Name(), path.String(), contextName, internal.DataPointRef) + return nil, internal.FormatDefaultErrorMessage(context, path.String(), contextName, internal.DataPointRef) } } diff --git a/pkg/ottl/contexts/ottllog/log.go b/pkg/ottl/contexts/ottllog/log.go index 5ae40916864e..0a88e6e391a4 100644 --- a/pkg/ottl/contexts/ottllog/log.go +++ b/pkg/ottl/contexts/ottllog/log.go @@ -22,7 +22,8 @@ import ( ) const ( - contextName = "Log" + contextName = "Log" + PathContextName = "log" ) var _ internal.ResourceContext = (*TransformContext)(nil) @@ -119,6 +120,16 @@ func NewParser(functions map[string]ottl.Factory[TransformContext], telemetrySet return p, nil } +func WithPathContextNames() Option { + return func(p *ottl.Parser[TransformContext]) { + ottl.WithPathContextNames[TransformContext]([]string{ + PathContextName, + internal.InstrumentationScopePathContext, + internal.ResourcePathContext, + })(p) + } +} + type StatementSequenceOption func(*ottl.StatementSequence[TransformContext]) func WithStatementSequenceErrorMode(errorMode ottl.ErrorMode) StatementSequenceOption { @@ -197,69 +208,81 @@ func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ot if path == nil { return nil, fmt.Errorf("path cannot be nil") } - switch path.Name() { - case "cache": - if path.Keys() == nil { - return accessCache(), nil - } - return accessCacheKey(path.Keys()), nil - case "resource": - return internal.ResourcePathGetSetter[TransformContext](path.Next()) - case "instrumentation_scope": - return internal.ScopePathGetSetter[TransformContext](path.Next()) - case "time_unix_nano": - return accessTimeUnixNano(), nil - case "observed_time_unix_nano": - return accessObservedTimeUnixNano(), nil - case "time": - return accessTime(), nil - case "observed_time": - return accessObservedTime(), nil - case "severity_number": - return accessSeverityNumber(), nil - case "severity_text": - return accessSeverityText(), nil - case "body": - nextPath := path.Next() - if nextPath != nil { - if nextPath.Name() == "string" { - return accessStringBody(), nil + + if path.Context() == PathContextName || path.Context() == "" { + switch path.Name() { + case "cache": + if path.Keys() == nil { + return accessCache(), nil } - return nil, internal.FormatDefaultErrorMessage(nextPath.Name(), nextPath.String(), contextName, internal.LogRef) - } - if path.Keys() == nil { - return accessBody(), nil - } - return accessBodyKey(path.Keys()), nil - case "attributes": - if path.Keys() == nil { - return accessAttributes(), nil - } - return accessAttributesKey(path.Keys()), nil - case "dropped_attributes_count": - return accessDroppedAttributesCount(), nil - case "flags": - return accessFlags(), nil - case "trace_id": - nextPath := path.Next() - if nextPath != nil { - if nextPath.Name() == "string" { - return accessStringTraceID(), nil + return accessCacheKey(path.Keys()), nil + case "time_unix_nano": + return accessTimeUnixNano(), nil + case "observed_time_unix_nano": + return accessObservedTimeUnixNano(), nil + case "time": + return accessTime(), nil + case "observed_time": + return accessObservedTime(), nil + case "severity_number": + return accessSeverityNumber(), nil + case "severity_text": + return accessSeverityText(), nil + case "body": + nextPath := path.Next() + if nextPath != nil { + if nextPath.Name() == "string" { + return accessStringBody(), nil + } + return nil, internal.FormatDefaultErrorMessage(nextPath.Name(), nextPath.String(), contextName, internal.LogRef) } - return nil, internal.FormatDefaultErrorMessage(nextPath.Name(), nextPath.String(), contextName, internal.LogRef) - } - return accessTraceID(), nil - case "span_id": - nextPath := path.Next() - if nextPath != nil { - if nextPath.Name() == "string" { - return accessStringSpanID(), nil + if path.Keys() == nil { + return accessBody(), nil } - return nil, internal.FormatDefaultErrorMessage(nextPath.Name(), path.String(), contextName, internal.LogRef) + return accessBodyKey(path.Keys()), nil + case "attributes": + if path.Keys() == nil { + return accessAttributes(), nil + } + return accessAttributesKey(path.Keys()), nil + case "dropped_attributes_count": + return accessDroppedAttributesCount(), nil + case "flags": + return accessFlags(), nil + case "trace_id": + nextPath := path.Next() + if nextPath != nil { + if nextPath.Name() == "string" { + return accessStringTraceID(), nil + } + return nil, internal.FormatDefaultErrorMessage(nextPath.Name(), nextPath.String(), contextName, internal.LogRef) + } + return accessTraceID(), nil + case "span_id": + nextPath := path.Next() + if nextPath != nil { + if nextPath.Name() == "string" { + return accessStringSpanID(), nil + } + return nil, internal.FormatDefaultErrorMessage(nextPath.Name(), path.String(), contextName, internal.LogRef) + } + return accessSpanID(), nil + default: + return pep.parseLowerContextPath(path.Name(), path.Next()) // BC paths without context } - return accessSpanID(), nil + } + + return pep.parseLowerContextPath(path.Context(), path) +} + +func (pep *pathExpressionParser) parseLowerContextPath(context string, path ottl.Path[TransformContext]) (ottl.GetSetter[TransformContext], error) { + switch context { + case internal.ResourcePathContext: + return internal.ResourcePathGetSetter(path) + case internal.InstrumentationScopePathContext: + return internal.ScopePathGetSetter(path) default: - return nil, internal.FormatDefaultErrorMessage(path.Name(), path.String(), contextName, internal.LogRef) + return nil, internal.FormatDefaultErrorMessage(context, path.String(), contextName, internal.LogRef) } } diff --git a/pkg/ottl/contexts/ottlmetric/metrics.go b/pkg/ottl/contexts/ottlmetric/metrics.go index ce5ff174ee69..2867d7f5c0d2 100644 --- a/pkg/ottl/contexts/ottlmetric/metrics.go +++ b/pkg/ottl/contexts/ottlmetric/metrics.go @@ -15,6 +15,10 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal" ) +const ( + PathContextName = internal.MetricPathContext +) + var _ internal.ResourceContext = TransformContext{} var _ internal.InstrumentationScopeContext = TransformContext{} var _ internal.MetricContext = TransformContext{} @@ -88,6 +92,16 @@ func NewParser(functions map[string]ottl.Factory[TransformContext], telemetrySet return p, err } +func WithPathContextNames() Option { + return func(p *ottl.Parser[TransformContext]) { + ottl.WithPathContextNames[TransformContext]([]string{ + PathContextName, + internal.InstrumentationScopePathContext, + internal.ResourcePathContext, + })(p) + } +} + type StatementSequenceOption func(*ottl.StatementSequence[TransformContext]) func WithStatementSequenceErrorMode(errorMode ottl.ErrorMode) StatementSequenceOption { @@ -140,18 +154,32 @@ func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ot if path == nil { return nil, fmt.Errorf("path cannot be nil") } - switch path.Name() { - case "cache": - if path.Keys() == nil { - return accessCache(), nil + + if path.Context() == PathContextName || path.Context() == "" { + switch path.Name() { + case "cache": + if path.Keys() == nil { + return accessCache(), nil + } + return accessCacheKey(path.Keys()), nil + case internal.ResourcePathContext, internal.InstrumentationScopePathContext: // BC paths without context + return pep.parseLowerContextPath(path.Name(), path.Next()) + default: + return internal.MetricPathGetSetter[TransformContext](path) } - return accessCacheKey(path.Keys()), nil - case "resource": - return internal.ResourcePathGetSetter[TransformContext](path.Next()) - case "instrumentation_scope": - return internal.ScopePathGetSetter[TransformContext](path.Next()) + } + + return pep.parseLowerContextPath(path.Context(), path) +} + +func (pep *pathExpressionParser) parseLowerContextPath(context string, path ottl.Path[TransformContext]) (ottl.GetSetter[TransformContext], error) { + switch context { + case internal.ResourcePathContext: + return internal.ResourcePathGetSetter(path) + case internal.InstrumentationScopePathContext: + return internal.ScopePathGetSetter(path) default: - return internal.MetricPathGetSetter[TransformContext](path) + return nil, internal.FormatDefaultErrorMessage(context, path.String(), internal.MetricContextName, internal.MetricRef) } } diff --git a/pkg/ottl/contexts/ottlresource/resource.go b/pkg/ottl/contexts/ottlresource/resource.go index f7fddd5b7ac4..6b1aea301bf5 100644 --- a/pkg/ottl/contexts/ottlresource/resource.go +++ b/pkg/ottl/contexts/ottlresource/resource.go @@ -17,6 +17,10 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/logging" ) +const ( + PathContextName = internal.ResourcePathContext +) + var _ internal.ResourceContext = (*TransformContext)(nil) var _ zapcore.ObjectMarshaler = (*TransformContext)(nil) @@ -71,6 +75,12 @@ func NewParser(functions map[string]ottl.Factory[TransformContext], telemetrySet return p, nil } +func WithPathContextNames() Option { + return func(p *ottl.Parser[TransformContext]) { + ottl.WithPathContextNames[TransformContext]([]string{PathContextName})(p) + } +} + type StatementSequenceOption func(*ottl.StatementSequence[TransformContext]) func WithStatementSequenceErrorMode(errorMode ottl.ErrorMode) StatementSequenceOption { @@ -115,15 +125,20 @@ func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ot if path == nil { return nil, fmt.Errorf("path cannot be nil") } - switch path.Name() { - case "cache": - if path.Keys() == nil { - return accessCache(), nil + + if path.Context() == PathContextName || path.Context() == "" { + switch path.Name() { + case "cache": + if path.Keys() == nil { + return accessCache(), nil + } + return accessCacheKey(path.Keys()), nil + default: + return internal.ResourcePathGetSetter[TransformContext](path) } - return accessCacheKey(path.Keys()), nil - default: - return internal.ResourcePathGetSetter[TransformContext](path) } + + return nil, internal.FormatDefaultErrorMessage(path.Context(), path.String(), internal.ResourceContextName, internal.ResourceContextRef) } func accessCache() ottl.StandardGetSetter[TransformContext] { diff --git a/pkg/ottl/contexts/ottlscope/scope.go b/pkg/ottl/contexts/ottlscope/scope.go index b55635bdcdb3..b132cbaa0761 100644 --- a/pkg/ottl/contexts/ottlscope/scope.go +++ b/pkg/ottl/contexts/ottlscope/scope.go @@ -17,6 +17,10 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/logging" ) +const ( + PathContextName = internal.ScopePathContext +) + var _ internal.ResourceContext = (*TransformContext)(nil) var _ internal.InstrumentationScopeContext = (*TransformContext)(nil) var _ zapcore.ObjectMarshaler = (*TransformContext)(nil) @@ -83,6 +87,15 @@ func NewParser(functions map[string]ottl.Factory[TransformContext], telemetrySet return p, nil } +func WithPathContextNames() Option { + return func(p *ottl.Parser[TransformContext]) { + ottl.WithPathContextNames[TransformContext]([]string{ + PathContextName, + internal.ResourcePathContext, + })(p) + } +} + type StatementSequenceOption func(*ottl.StatementSequence[TransformContext]) func WithStatementSequenceErrorMode(errorMode ottl.ErrorMode) StatementSequenceOption { @@ -127,16 +140,30 @@ func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ot if path == nil { return nil, fmt.Errorf("path cannot be nil") } - switch path.Name() { - case "cache": - if path.Keys() == nil { - return accessCache(), nil + + if path.Context() == PathContextName || path.Context() == "" { + switch path.Name() { + case "cache": + if path.Keys() == nil { + return accessCache(), nil + } + return accessCacheKey(path.Keys()), nil + case internal.ResourcePathContext: + return pep.parseLowerContextPath(path.Name(), path.Next()) // BC paths without context + default: + return internal.ScopePathGetSetter[TransformContext](path) } - return accessCacheKey(path.Keys()), nil - case "resource": - return internal.ResourcePathGetSetter[TransformContext](path.Next()) + } + + return pep.parseLowerContextPath(path.Context(), path) +} + +func (pep *pathExpressionParser) parseLowerContextPath(context string, path ottl.Path[TransformContext]) (ottl.GetSetter[TransformContext], error) { + switch context { + case internal.ResourcePathContext: + return internal.ResourcePathGetSetter[TransformContext](path) default: - return internal.ScopePathGetSetter[TransformContext](path) + return nil, internal.FormatDefaultErrorMessage(context, path.String(), internal.InstrumentationScopeContextName, internal.InstrumentationScopeRef) } } diff --git a/pkg/ottl/contexts/ottlspan/span.go b/pkg/ottl/contexts/ottlspan/span.go index 34fc2c944c1e..ae85a1939e70 100644 --- a/pkg/ottl/contexts/ottlspan/span.go +++ b/pkg/ottl/contexts/ottlspan/span.go @@ -18,6 +18,10 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/logging" ) +const ( + PathContextName = internal.SpanPathContext +) + var _ internal.ResourceContext = (*TransformContext)(nil) var _ internal.InstrumentationScopeContext = (*TransformContext)(nil) var _ zapcore.ObjectMarshaler = (*TransformContext)(nil) @@ -93,6 +97,16 @@ func NewParser(functions map[string]ottl.Factory[TransformContext], telemetrySet return p, nil } +func WithPathContextNames() Option { + return func(p *ottl.Parser[TransformContext]) { + ottl.WithPathContextNames[TransformContext]([]string{ + PathContextName, + internal.ResourcePathContext, + internal.InstrumentationScopePathContext, + })(p) + } +} + type StatementSequenceOption func(*ottl.StatementSequence[TransformContext]) func WithStatementSequenceErrorMode(errorMode ottl.ErrorMode) StatementSequenceOption { @@ -143,18 +157,32 @@ func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ot if path == nil { return nil, fmt.Errorf("path cannot be nil") } - switch path.Name() { - case "cache": - if path.Keys() == nil { - return accessCache(), nil + + if path.Context() == PathContextName || path.Context() == "" { + switch path.Name() { + case "cache": + if path.Keys() == nil { + return accessCache(), nil + } + return accessCacheKey(path.Keys()), nil + case internal.ResourcePathContext, internal.InstrumentationScopePathContext: // BC paths without context + return pep.parseLowerContextPath(path.Name(), path.Next()) + default: + return internal.SpanPathGetSetter[TransformContext](path) } - return accessCacheKey(path.Keys()), nil - case "resource": - return internal.ResourcePathGetSetter[TransformContext](path.Next()) - case "instrumentation_scope": - return internal.ScopePathGetSetter[TransformContext](path.Next()) + } + + return pep.parseLowerContextPath(path.Context(), path) +} + +func (pep *pathExpressionParser) parseLowerContextPath(context string, path ottl.Path[TransformContext]) (ottl.GetSetter[TransformContext], error) { + switch context { + case internal.ResourcePathContext: + return internal.ResourcePathGetSetter[TransformContext](path) + case internal.InstrumentationScopePathContext: + return internal.ScopePathGetSetter[TransformContext](path) default: - return internal.SpanPathGetSetter[TransformContext](path) + return nil, internal.FormatDefaultErrorMessage(context, path.String(), internal.SpanContextName, internal.SpanRef) } } diff --git a/pkg/ottl/contexts/ottlspanevent/span_events.go b/pkg/ottl/contexts/ottlspanevent/span_events.go index 8a4de90f15e9..bb0f332b85bf 100644 --- a/pkg/ottl/contexts/ottlspanevent/span_events.go +++ b/pkg/ottl/contexts/ottlspanevent/span_events.go @@ -19,6 +19,10 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal/logging" ) +const ( + PathContextName = "spanevent" +) + var _ internal.ResourceContext = (*TransformContext)(nil) var _ internal.InstrumentationScopeContext = (*TransformContext)(nil) var _ zapcore.ObjectMarshaler = (*TransformContext)(nil) @@ -101,6 +105,17 @@ func NewParser(functions map[string]ottl.Factory[TransformContext], telemetrySet return p, nil } +func WithPathContextNames() Option { + return func(p *ottl.Parser[TransformContext]) { + ottl.WithPathContextNames[TransformContext]([]string{ + PathContextName, + internal.SpanPathContext, + internal.ResourcePathContext, + internal.InstrumentationScopePathContext, + })(p) + } +} + type StatementSequenceOption func(*ottl.StatementSequence[TransformContext]) func WithStatementSequenceErrorMode(errorMode ottl.ErrorMode) StatementSequenceOption { @@ -151,36 +166,48 @@ func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ot if path == nil { return nil, fmt.Errorf("path cannot be nil") } - switch path.Name() { - case "cache": - if path.Keys() == nil { - return accessCache(), nil - } - return accessCacheKey(path.Keys()), nil - case "resource": - return internal.ResourcePathGetSetter[TransformContext](path.Next()) - case "instrumentation_scope": - return internal.ScopePathGetSetter[TransformContext](path.Next()) - case "span": - return internal.SpanPathGetSetter[TransformContext](path.Next()) - case "time_unix_nano": - return accessSpanEventTimeUnixNano(), nil - case "time": - return accessSpanEventTime(), nil - case "name": - return accessSpanEventName(), nil - case "attributes": - if path.Keys() == nil { - return accessSpanEventAttributes(), nil + + if path.Context() == PathContextName || path.Context() == "" { + switch path.Name() { + case "cache": + if path.Keys() == nil { + return accessCache(), nil + } + return accessCacheKey(path.Keys()), nil + case "time_unix_nano": + return accessSpanEventTimeUnixNano(), nil + case "time": + return accessSpanEventTime(), nil + case "name": + return accessSpanEventName(), nil + case "attributes": + if path.Keys() == nil { + return accessSpanEventAttributes(), nil + } + return accessSpanEventAttributesKey(path.Keys()), nil + case "dropped_attributes_count": + return accessSpanEventDroppedAttributeCount(), nil + default: + return pep.parseLowerContextPath(path.Name(), path.Next()) // BC paths without context } - return accessSpanEventAttributesKey(path.Keys()), nil - case "dropped_attributes_count": - return accessSpanEventDroppedAttributeCount(), nil - default: - return nil, internal.FormatDefaultErrorMessage(path.Name(), path.String(), "Span Event", internal.SpanEventRef) } + return pep.parseLowerContextPath(path.Context(), path) } + +func (pep *pathExpressionParser) parseLowerContextPath(context string, path ottl.Path[TransformContext]) (ottl.GetSetter[TransformContext], error) { + switch context { + case internal.ResourcePathContext: + return internal.ResourcePathGetSetter(path) + case internal.InstrumentationScopePathContext: + return internal.ScopePathGetSetter(path) + case internal.SpanPathContext: + return internal.SpanPathGetSetter(path) + default: + return nil, internal.FormatDefaultErrorMessage(context, path.String(), "Span Event", internal.SpanEventRef) + } +} + func accessCache() ottl.StandardGetSetter[TransformContext] { return ottl.StandardGetSetter[TransformContext]{ Getter: func(_ context.Context, tCtx TransformContext) (any, error) { diff --git a/pkg/ottl/parser_collection.go b/pkg/ottl/parser_collection.go new file mode 100644 index 000000000000..04cf5a73d533 --- /dev/null +++ b/pkg/ottl/parser_collection.go @@ -0,0 +1,289 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ottl // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + +import ( + "fmt" + "reflect" + + "go.opentelemetry.io/collector/component" + "go.uber.org/zap" +) + +var _ interface { + ParseStatements(statements []string) ([]*Statement[any], error) +} = (*Parser[any])(nil) + +var _ ParsedStatementConverter[any, StatementsGetter, any] = func( + _ *ParserCollection[StatementsGetter, any], + _ *Parser[any], + _ string, + _ StatementsGetter, + _ []*Statement[any]) (any, error) { + return nil, nil +} + +// StatementsGetter represents a set of statements to be parsed +type StatementsGetter interface { + // GetStatements retrieves the OTTL statements to be parsed + GetStatements() []string +} + +// ottlParserWrapper wraps an ottl.Parser using reflection, so it can invoke exported +// methods without knowing its generic type (transform context). +type ottlParserWrapper[S StatementsGetter] struct { + parser reflect.Value + prependContextToStatementPaths func(context string, statement string) (string, error) +} + +func newParserWrapper[K any, S StatementsGetter](parser *Parser[K]) *ottlParserWrapper[S] { + return &ottlParserWrapper[S]{ + parser: reflect.ValueOf(parser), + prependContextToStatementPaths: parser.prependContextToStatementPaths, + } +} + +func (g *ottlParserWrapper[S]) parseStatements(statements []string) (reflect.Value, error) { + method := g.parser.MethodByName("ParseStatements") + parseStatementsRes := method.Call([]reflect.Value{reflect.ValueOf(statements)}) + err := parseStatementsRes[1] + if !err.IsNil() { + return reflect.Value{}, err.Interface().(error) + } + return parseStatementsRes[0], nil +} + +func (g *ottlParserWrapper[S]) prependContextToStatementsPaths(context string, statements []string) ([]string, error) { + result := make([]string, 0, len(statements)) + for _, s := range statements { + prependedStatement, err := g.prependContextToStatementPaths(context, s) + if err != nil { + return nil, err + } + result = append(result, prependedStatement) + } + return result, nil +} + +// statementsConverterWrapper is reflection-based wrapper to the ParsedStatementConverter function, +// which does not require knowing all generic parameters to be called. +type statementsConverterWrapper[S StatementsGetter] reflect.Value + +func newStatementsConverterWrapper[K any, S StatementsGetter, R any](converter ParsedStatementConverter[K, S, R]) statementsConverterWrapper[S] { + return statementsConverterWrapper[S](reflect.ValueOf(converter)) +} + +func (s statementsConverterWrapper[S]) call( + parserCollection reflect.Value, + ottlParser *ottlParserWrapper[S], + context string, + statements S, + parsedStatements reflect.Value, +) (reflect.Value, error) { + result := reflect.Value(s).Call([]reflect.Value{ + parserCollection, + ottlParser.parser, + reflect.ValueOf(context), + reflect.ValueOf(statements), + parsedStatements, + }) + + resultValue := result[0] + resultError := result[1] + if !resultError.IsNil() { + return reflect.Value{}, resultError.Interface().(error) + } + + return resultValue, nil +} + +// parserCollectionParser holds an ottlParserWrapper and its respectively +// statementsConverter function. +type parserCollectionParser[S StatementsGetter] struct { + ottlParser *ottlParserWrapper[S] + statementsConverter statementsConverterWrapper[S] +} + +// ParserCollection is a configurable set of ottl.Parser that can handle multiple OTTL contexts +// parsings, inferring the context and choosing the right parser for the given statements. +type ParserCollection[S StatementsGetter, R any] struct { + contextParsers map[string]*parserCollectionParser[S] + contextInferrer contextInferrer + modifiedStatementLogging bool + Settings component.TelemetrySettings + ErrorMode ErrorMode +} + +type ParserCollectionOption[S StatementsGetter, R any] func(*ParserCollection[S, R]) error + +func NewParserCollection[S StatementsGetter, R any]( + settings component.TelemetrySettings, + options ...ParserCollectionOption[S, R]) (*ParserCollection[S, R], error) { + + pc := &ParserCollection[S, R]{ + Settings: settings, + contextParsers: map[string]*parserCollectionParser[S]{}, + contextInferrer: defaultPriorityContextInferrer(), + } + + for _, op := range options { + err := op(pc) + if err != nil { + return nil, err + } + } + + return pc, nil +} + +// ParsedStatementConverter is a function that converts the parsed ottl.Statement[K] into +// a common representation to all parser collection contexts WithParserCollectionContext. +// Given each parser has its own transform context type, they must agree on a common type [R] +// so is can be returned by the ParserCollection.ParseStatements and ParserCollection.ParseStatementsWithContext +// functions. +type ParsedStatementConverter[T any, S StatementsGetter, R any] func( + collection *ParserCollection[S, R], + parser *Parser[T], + context string, + statements S, + parsedStatements []*Statement[T], +) (R, error) + +func newNopParsedStatementConverter[T any, S StatementsGetter]() ParsedStatementConverter[T, S, any] { + return func( + _ *ParserCollection[S, any], + _ *Parser[T], + _ string, + _ S, + parsedStatements []*Statement[T]) (any, error) { + return parsedStatements, nil + } +} + +// WithParserCollectionContext configures an ottl.Parser for the given context. +// The provided ottl.Parser must be configured to support the provided context using +// the ottl.WithPathContextNames option. +func WithParserCollectionContext[K any, S StatementsGetter, R any]( + context string, + parser *Parser[K], + converter ParsedStatementConverter[K, S, R], +) ParserCollectionOption[S, R] { + return func(mp *ParserCollection[S, R]) error { + if _, ok := parser.pathContextNames[context]; !ok { + return fmt.Errorf(`context "%s" must be a valid "%T" path context name`, context, parser) + } + mp.contextParsers[context] = &parserCollectionParser[S]{ + ottlParser: newParserWrapper[K, S](parser), + statementsConverter: newStatementsConverterWrapper(converter), + } + return nil + } +} + +// WithParserCollectionErrorMode has no effect on the ParserCollection, but might be used +// by the ParsedStatementConverter functions to handle/create StatementSequence. +func WithParserCollectionErrorMode[S StatementsGetter, R any](errorMode ErrorMode) ParserCollectionOption[S, R] { + return func(tp *ParserCollection[S, R]) error { + tp.ErrorMode = errorMode + return nil + } +} + +// EnableParserCollectionModifiedStatementLogging controls the statements modification logs. +// When enabled, it logs any statements modifications performed by the parsing operations, +// instructing users to rewrite the statements accordingly. +func EnableParserCollectionModifiedStatementLogging[S StatementsGetter, R any](enabled bool) ParserCollectionOption[S, R] { + return func(tp *ParserCollection[S, R]) error { + tp.modifiedStatementLogging = enabled + return nil + } +} + +// ParseStatements parses the given statements into [R] using the configured context's ottl.Parser +// and subsequently calling the ParsedStatementConverter function. +// The statement's context is automatically inferred from the [Path.Context] values, choosing the +// highest priority context found. +// If no contexts are present in the statements, or if the inferred value is not supported by +// the [ParserCollection], it returns an error. +// If parsing the statements fails, it returns the underline [ottl.Parser.ParseStatements] error. +func (pc *ParserCollection[S, R]) ParseStatements(statements S) (R, error) { + statementsValues := statements.GetStatements() + inferredContext, err := pc.contextInferrer.infer(statementsValues) + if err != nil { + return *new(R), err + } + + if inferredContext == "" { + return *new(R), fmt.Errorf("unable to infer context from statements [%v], path's first segment must be a valid context name", statementsValues) + } + + return pc.ParseStatementsWithContext(inferredContext, statements, false) +} + +// ParseStatementsWithContext parses the given statements into [R] using the configured +// context's ottl.Parser and subsequently calling the ParsedStatementConverter function. +// Differently from ParseStatements, it uses the provided context and does not infer it +// automatically. The context valuer must be supported by the [ParserCollection], +// otherwise an error is returned. +// If the statement's Path does not provide their Path.Context value, the prependPathsContext +// argument should be set to true, so it rewrites the statements prepending the missing paths +// contexts. +// If parsing the statements fails, it returns the underline [ottl.Parser.ParseStatements] error. +func (pc *ParserCollection[S, R]) ParseStatementsWithContext(context string, statements S, prependPathsContext bool) (R, error) { + contextParser, ok := pc.contextParsers[context] + if !ok { + return *new(R), fmt.Errorf(`unknown context "%s" for stataments: %v`, context, statements.GetStatements()) + } + + var err error + var parsingStatements []string + if prependPathsContext { + originalStatements := statements.GetStatements() + parsingStatements, err = contextParser.ottlParser.prependContextToStatementsPaths(context, originalStatements) + if err != nil { + return *new(R), err + } + if pc.modifiedStatementLogging { + pc.logModifiedStatements(originalStatements, parsingStatements) + } + } else { + parsingStatements = statements.GetStatements() + } + + parsedStatements, err := contextParser.ottlParser.parseStatements(parsingStatements) + if err != nil { + return *new(R), err + } + + convertedStatements, err := contextParser.statementsConverter.call( + reflect.ValueOf(pc), + contextParser.ottlParser, + context, + statements, + parsedStatements, + ) + + if err != nil { + return *new(R), err + } + + return convertedStatements.Interface().(R), nil +} + +func (pc *ParserCollection[S, R]) logModifiedStatements(originalStatements, modifiedStatements []string) { + var fields []zap.Field + for i, original := range originalStatements { + if modifiedStatements[i] != original { + statementKey := fmt.Sprintf("[%v]", i) + fields = append(fields, zap.Dict( + statementKey, + zap.String("original", original), + zap.String("modified", modifiedStatements[i])), + ) + } + } + if len(fields) > 0 { + pc.Settings.Logger.Info("one or more statements were modified to include their paths context, please rewrite them accordingly", zap.Dict("statements", fields...)) + } +} diff --git a/pkg/ottl/parser_collection_test.go b/pkg/ottl/parser_collection_test.go new file mode 100644 index 000000000000..b5d293f68a77 --- /dev/null +++ b/pkg/ottl/parser_collection_test.go @@ -0,0 +1,354 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ottl + +import ( + "context" + "errors" + "fmt" + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" +) + +type mockStatementsGetter struct { + values []string +} + +func (s mockStatementsGetter) GetStatements() []string { + return s.values +} + +type mockFailingContextInferrer struct { + err error +} + +func (r *mockFailingContextInferrer) infer(_ []string) (string, error) { + return "", r.err +} + +type mockStaticContextInferrer struct { + value string +} + +func (r *mockStaticContextInferrer) infer(_ []string) (string, error) { + return r.value, nil +} + +type mockSetArguments[K any] struct { + Target Setter[K] + Value Getter[K] +} + +func Test_NewParserCollection(t *testing.T) { + settings := componenttest.NewNopTelemetrySettings() + pc, err := NewParserCollection[mockStatementsGetter, any](settings) + require.NoError(t, err) + + assert.NotNil(t, pc) + assert.NotNil(t, pc.contextParsers) + assert.NotNil(t, pc.contextInferrer) +} + +func Test_NewParserCollection_OptionError(t *testing.T) { + _, err := NewParserCollection[mockStatementsGetter, any]( + componenttest.NewNopTelemetrySettings(), + func(_ *ParserCollection[mockStatementsGetter, any]) error { + return errors.New("option error") + }, + ) + + require.Error(t, err, "option error") +} + +func Test_WithParserCollectionContext(t *testing.T) { + ps := mockParser(t, WithPathContextNames[any]([]string{"testContext"})) + conv := newNopParsedStatementConverter[any, mockStatementsGetter]() + option := WithParserCollectionContext("testContext", ps, conv) + + pc, err := NewParserCollection[mockStatementsGetter, any](componenttest.NewNopTelemetrySettings(), option) + require.NoError(t, err) + + pw, exists := pc.contextParsers["testContext"] + assert.True(t, exists) + assert.NotNil(t, pw) + assert.Equal(t, reflect.ValueOf(ps), pw.ottlParser.parser) + assert.Equal(t, reflect.ValueOf(conv), reflect.Value(pw.statementsConverter)) +} + +func Test_WithParserCollectionContext_UnsupportedContext(t *testing.T) { + ps := mockParser(t, WithPathContextNames[any]([]string{"foo"})) + conv := newNopParsedStatementConverter[any, mockStatementsGetter]() + option := WithParserCollectionContext("bar", ps, conv) + + _, err := NewParserCollection[mockStatementsGetter, any](componenttest.NewNopTelemetrySettings(), option) + + require.ErrorContains(t, err, `context "bar" must be a valid "*ottl.Parser[interface {}]" path context name`) +} + +func Test_WithParserCollectionErrorMode(t *testing.T) { + pc, err := NewParserCollection[mockStatementsGetter, any]( + componenttest.NewNopTelemetrySettings(), + WithParserCollectionErrorMode[mockStatementsGetter, any](PropagateError), + ) + + require.NoError(t, err) + require.NotNil(t, pc) + require.Equal(t, PropagateError, pc.ErrorMode) +} + +func Test_EnableParserCollectionModifiedStatementLogging_True(t *testing.T) { + ps := mockParser(t, WithPathContextNames[any]([]string{"dummy"})) + core, observedLogs := observer.New(zap.InfoLevel) + telemetrySettings := componenttest.NewNopTelemetrySettings() + telemetrySettings.Logger = zap.New(core) + + pc, err := NewParserCollection( + telemetrySettings, + WithParserCollectionContext("dummy", ps, newNopParsedStatementConverter[any, mockStatementsGetter]()), + EnableParserCollectionModifiedStatementLogging[mockStatementsGetter, any](true), + ) + require.NoError(t, err) + + originalStatements := []string{ + `set(attributes["foo"], "foo")`, + `set(attributes["bar"], "bar")`, + } + + _, err = pc.ParseStatementsWithContext("dummy", mockStatementsGetter{originalStatements}, true) + require.NoError(t, err) + + logEntries := observedLogs.TakeAll() + require.Len(t, logEntries, 1) + logEntry := logEntries[0] + require.Equal(t, zap.InfoLevel, logEntry.Level) + require.Contains(t, logEntry.Message, "one or more statements were modified") + logEntryStatements := logEntry.ContextMap()["statements"] + require.NotNil(t, logEntryStatements) + + for i, originalStatement := range originalStatements { + k := fmt.Sprintf("[%d]", i) + logEntryStatementContext := logEntryStatements.(map[string]any)[k] + require.Equal(t, logEntryStatementContext.(map[string]any)["original"], originalStatement) + modifiedStatement, err := ps.prependContextToStatementPaths("dummy", originalStatement) + require.NoError(t, err) + require.Equal(t, logEntryStatementContext.(map[string]any)["modified"], modifiedStatement) + } +} + +func Test_EnableParserCollectionModifiedStatementLogging_False(t *testing.T) { + ps := mockParser(t, WithPathContextNames[any]([]string{"dummy"})) + core, observedLogs := observer.New(zap.InfoLevel) + telemetrySettings := componenttest.NewNopTelemetrySettings() + telemetrySettings.Logger = zap.New(core) + + pc, err := NewParserCollection( + telemetrySettings, + WithParserCollectionContext("dummy", ps, newNopParsedStatementConverter[any, mockStatementsGetter]()), + EnableParserCollectionModifiedStatementLogging[mockStatementsGetter, any](false), + ) + require.NoError(t, err) + + _, err = pc.ParseStatementsWithContext("dummy", mockStatementsGetter{[]string{`set(attributes["foo"], "foo")`}}, true) + require.NoError(t, err) + require.Empty(t, observedLogs.TakeAll()) +} + +func Test_NopParsedStatementConverter(t *testing.T) { + type dummyContext struct{} + + noop := newNopParsedStatementConverter[dummyContext, mockStatementsGetter]() + parsedStatements := []*Statement[dummyContext]{{}} + convertedStatements, err := noop(nil, nil, "", mockStatementsGetter{values: []string{}}, parsedStatements) + + require.NoError(t, err) + require.NotNil(t, convertedStatements) + assert.Equal(t, parsedStatements, convertedStatements) +} + +func Test_NewParserCollection_DefaultContextInferrer(t *testing.T) { + pc, err := NewParserCollection[mockStatementsGetter, any](componenttest.NewNopTelemetrySettings()) + require.NoError(t, err) + require.NotNil(t, pc) + require.NotNil(t, pc.contextInferrer) +} + +func Test_ParseStatements_Success(t *testing.T) { + ps := mockParser(t, WithPathContextNames[any]([]string{"foo"})) + + pc, err := NewParserCollection( + component.TelemetrySettings{}, + WithParserCollectionContext("foo", ps, newNopParsedStatementConverter[any, mockStatementsGetter]()), + ) + require.NoError(t, err) + pc.contextInferrer = &mockStaticContextInferrer{"foo"} + + statements := mockStatementsGetter{values: []string{`set(foo.attributes["bar"], "foo")`, `set(foo.attributes["bar"], "bar")`}} + result, err := pc.ParseStatements(statements) + require.NoError(t, err) + + assert.IsType(t, []*Statement[any]{}, result) + assert.Len(t, result.([]*Statement[any]), 2) + assert.NotNil(t, result) +} + +func Test_ParseStatements_MultipleContexts_Success(t *testing.T) { + fooParser := mockParser(t, WithPathContextNames[any]([]string{"foo"})) + barParser := mockParser(t, WithPathContextNames[any]([]string{"bar"})) + failingConverter := func( + _ *ParserCollection[mockStatementsGetter, any], + _ *Parser[any], + _ string, + _ mockStatementsGetter, + _ []*Statement[any]) (any, error) { + return nil, errors.New("failing converter") + } + + pc, err := NewParserCollection( + component.TelemetrySettings{}, + WithParserCollectionContext("foo", fooParser, failingConverter), + WithParserCollectionContext("bar", barParser, newNopParsedStatementConverter[any, mockStatementsGetter]()), + ) + require.NoError(t, err) + pc.contextInferrer = &mockStaticContextInferrer{"bar"} + + statements := mockStatementsGetter{values: []string{`set(bar.attributes["bar"], "foo")`, `set(bar.attributes["bar"], "bar")`}} + result, err := pc.ParseStatements(statements) + require.NoError(t, err) + + assert.IsType(t, []*Statement[any]{}, result) + assert.Len(t, result.([]*Statement[any]), 2) + assert.NotNil(t, result) +} + +func Test_ParseStatements_NoContextInferredError(t *testing.T) { + pc, err := NewParserCollection[mockStatementsGetter, any](component.TelemetrySettings{}) + require.NoError(t, err) + pc.contextInferrer = &mockStaticContextInferrer{""} + + statements := mockStatementsGetter{values: []string{`set(bar.attributes["bar"], "foo")`}} + _, err = pc.ParseStatements(statements) + + assert.ErrorContains(t, err, "unable to infer context from statements") +} + +func Test_ParseStatements_ContextInferenceError(t *testing.T) { + pc, err := NewParserCollection[mockStatementsGetter, any](component.TelemetrySettings{}) + require.NoError(t, err) + pc.contextInferrer = &mockFailingContextInferrer{err: errors.New("inference error")} + + statements := mockStatementsGetter{values: []string{`set(bar.attributes["bar"], "foo")`}} + _, err = pc.ParseStatements(statements) + + assert.EqualError(t, err, "inference error") +} + +func Test_ParseStatements_UnknownContextError(t *testing.T) { + pc, err := NewParserCollection[mockStatementsGetter, any](component.TelemetrySettings{}) + require.NoError(t, err) + pc.contextInferrer = &mockStaticContextInferrer{"foo"} + + statements := mockStatementsGetter{values: []string{`set(foo.attributes["bar"], "foo")`}} + _, err = pc.ParseStatements(statements) + + assert.ErrorContains(t, err, `unknown context "foo"`) +} + +func Test_ParseStatements_ParseStatementsError(t *testing.T) { + ps := mockParser(t, WithPathContextNames[any]([]string{"foo"})) + ps.pathParser = func(_ Path[any]) (GetSetter[any], error) { + return nil, errors.New("parse statements error") + } + + pc, err := NewParserCollection( + component.TelemetrySettings{}, + WithParserCollectionContext("foo", ps, newNopParsedStatementConverter[any, mockStatementsGetter]()), + ) + require.NoError(t, err) + pc.contextInferrer = &mockStaticContextInferrer{"foo"} + + statements := mockStatementsGetter{values: []string{`set(foo.attributes["bar"], "foo")`}} + _, err = pc.ParseStatements(statements) + assert.ErrorContains(t, err, "parse statements error") +} + +func Test_ParseStatements_ConverterError(t *testing.T) { + ps := mockParser(t, WithPathContextNames[any]([]string{"dummy"})) + conv := func(_ *ParserCollection[mockStatementsGetter, any], _ *Parser[any], _ string, _ mockStatementsGetter, _ []*Statement[any]) (any, error) { + return nil, errors.New("converter error") + } + + pc, err := NewParserCollection( + component.TelemetrySettings{}, + WithParserCollectionContext("dummy", ps, conv), + ) + require.NoError(t, err) + pc.contextInferrer = &mockStaticContextInferrer{"dummy"} + + statements := mockStatementsGetter{values: []string{`set(dummy.attributes["bar"], "foo")`}} + _, err = pc.ParseStatements(statements) + + assert.EqualError(t, err, "converter error") +} + +func Test_ParseStatementsWithContext_UnknownContextError(t *testing.T) { + pc, err := NewParserCollection[mockStatementsGetter, any](component.TelemetrySettings{}) + require.NoError(t, err) + + statements := mockStatementsGetter{[]string{`set(attributes["bar"], "bar")`}} + _, err = pc.ParseStatementsWithContext("bar", statements, false) + + assert.ErrorContains(t, err, `unknown context "bar"`) +} + +func Test_ParseStatementsWithContext_PrependPathContext(t *testing.T) { + ps := mockParser(t, WithPathContextNames[any]([]string{"dummy"})) + pc, err := NewParserCollection( + component.TelemetrySettings{}, + WithParserCollectionContext("dummy", ps, newNopParsedStatementConverter[any, mockStatementsGetter]()), + ) + require.NoError(t, err) + + result, err := pc.ParseStatementsWithContext( + "dummy", + mockStatementsGetter{[]string{ + `set(attributes["foo"], "foo")`, + `set(attributes["bar"], "bar")`, + }}, + true, + ) + + require.NoError(t, err) + require.Len(t, result, 2) + parsedStatements := result.([]*Statement[any]) + assert.Equal(t, `set(dummy.attributes["foo"], "foo")`, parsedStatements[0].origText) + assert.Equal(t, `set(dummy.attributes["bar"], "bar")`, parsedStatements[1].origText) +} + +func mockParser(t *testing.T, options ...Option[any]) *Parser[any] { + mockSetFactory := NewFactory("set", &mockSetArguments[any]{}, + func(_ FunctionContext, _ Arguments) (ExprFunc[any], error) { + return func(_ context.Context, _ any) (any, error) { + return nil, nil + }, nil + }) + + ps, err := NewParser( + CreateFactoryMap[any](mockSetFactory), + testParsePath[any], + componenttest.NewNopTelemetrySettings(), + append([]Option[any]{ + WithEnumParser[any](testParseEnum), + }, options...)..., + ) + + require.NoError(t, err) + return &ps +} diff --git a/pkg/ottl/paths.go b/pkg/ottl/paths.go index dbb66ee7c994..82f149f32a64 100644 --- a/pkg/ottl/paths.go +++ b/pkg/ottl/paths.go @@ -2,6 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 package ottl // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" +import ( + "fmt" + "strings" +) // grammarPathVisitor is used to extract all path from a parsedStatement or booleanExpression type grammarPathVisitor struct { @@ -30,3 +34,48 @@ func getBooleanExpressionPaths(be *booleanExpression) []path { be.accept(visitor) return visitor.paths } + +// AppendStatementPathsContext changes the given statement adding the missing path's context prefix. +// Path's witch context value matches any WithPathContextNames value are not modified. +// It returns an error if the context argument is not a valid WithPathContextNames value. +func (p *Parser[K]) AppendStatementPathsContext(context string, statement string) (string, error) { + if _, ok := p.pathContextNames[context]; !ok { + return statement, fmt.Errorf(`unknown context "%s" for parser %T, valid options are: %s`, context, p, p.buildPathContextNamesText("")) + } + parsed, err := parseStatement(statement) + if err != nil { + return "", err + } + paths := getParsedStatementPaths(parsed) + if len(paths) == 0 { + return statement, nil + } + + var missingContextOffsets []int + for _, it := range paths { + if _, ok := p.pathContextNames[it.Context]; !ok { + missingContextOffsets = append(missingContextOffsets, it.Pos.Offset) + } + } + + return writeStatementWithPathsContext(context, statement, missingContextOffsets), nil +} + +func writeStatementWithPathsContext(context string, statement string, offsets []int) string { + if len(offsets) == 0 { + return statement + } + contextPrefix := context + "." + sb := strings.Builder{} + left := 0 + for i, offset := range offsets { + sb.WriteString(statement[left:offset]) + sb.WriteString(contextPrefix) + if i+1 >= len(offsets) { + sb.WriteString(statement[offset:]) + } else { + left = offset + } + } + return sb.String() +} diff --git a/pkg/ottl/paths_test.go b/pkg/ottl/paths_test.go index 9f31dda15718..749f50f6cb8f 100644 --- a/pkg/ottl/paths_test.go +++ b/pkg/ottl/paths_test.go @@ -4,10 +4,13 @@ package ottl import ( + "context" "testing" "github.com/alecthomas/participle/v2/lexer" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottltest" ) @@ -448,3 +451,124 @@ func Test_getBooleanExpressionPaths(t *testing.T) { paths := getBooleanExpressionPaths(c) require.Equal(t, expected, paths) } + +func Test_AppendStatementPathsContext_InvalidStatement(t *testing.T) { + ps, err := NewParser( + CreateFactoryMap[any](), + testParsePath[any], + componenttest.NewNopTelemetrySettings(), + WithEnumParser[any](testParseEnum), + WithPathContextNames[any]([]string{"foo", "bar"}), + ) + require.NoError(t, err) + _, err = ps.AppendStatementPathsContext("foo", "this is invalid") + require.ErrorContains(t, err, `statement has invalid syntax`) +} + +func Test_AppendStatementPathsContext_InvalidContext(t *testing.T) { + ps, err := NewParser( + CreateFactoryMap[any](), + testParsePath[any], + componenttest.NewNopTelemetrySettings(), + WithEnumParser[any](testParseEnum), + WithPathContextNames[any]([]string{"foo", "bar"}), + ) + require.NoError(t, err) + _, err = ps.AppendStatementPathsContext("foobar", "set(foo, 1)") + require.ErrorContains(t, err, `unknown context "foobar" for parser`) +} + +func Test_AppendStatementPathsContext_Success(t *testing.T) { + tests := []struct { + name string + statement string + context string + pathContextNames []string + expected string + }{ + { + name: "no paths", + statement: `set("foo", 1)`, + context: "bar", + }, + { + name: "no modification needed", + statement: `set(span.value, 1)`, + context: "span", + }, + { + name: "single modifiable path", + statement: "set(value, 1)", + expected: "set(span.value, 1)", + context: "span", + }, + { + name: "single path with valid context", + statement: "set(span.value, 1)", + expected: "set(span.value, 1)", + pathContextNames: []string{"spanevent", "span"}, + context: "spanevent", + }, + { + name: "multiple paths with contexts", + statement: `set(span.value, 1) where span.attributes["foo"] == "foo" and span.id == 1`, + pathContextNames: []string{"another", "span"}, + context: "another", + }, + { + name: "multiple paths with and without contexts", + statement: `set(value, 1) where span.attributes["foo"] == "foo" and id == 1`, + expected: "set(spanevent.value, 1) where span.attributes[\"foo\"] == \"foo\" and spanevent.id == 1", + pathContextNames: []string{"spanevent", "span"}, + context: "spanevent", + }, + { + name: "multiple modifiable paths", + statement: `set(value, 1) where name == attributes["foo.name"]`, + expected: `set(span.value, 1) where span.name == span.attributes["foo.name"]`, + context: "span", + }, + { + name: "where with function path parameter", + statement: `set(attributes["test"], "pass") where IsMatch(name, "operation[AC]")`, + context: "log", + expected: `set(log.attributes["test"], "pass") where IsMatch(log.name, "operation[AC]")`, + }, + } + + mockSetFactory := NewFactory("set", &mockSetArguments[any]{}, + func(_ FunctionContext, _ Arguments) (ExprFunc[any], error) { + return func(_ context.Context, _ any) (any, error) { + return nil, nil + }, nil + }) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if len(tt.pathContextNames) == 0 { + tt.pathContextNames = append(tt.pathContextNames, tt.context) + } + + ps, err := NewParser( + CreateFactoryMap[any](mockSetFactory), + testParsePath[any], + componenttest.NewNopTelemetrySettings(), + WithEnumParser[any](testParseEnum), + WithPathContextNames[any](tt.pathContextNames), + ) + + require.NoError(t, err) + + var expected string + if tt.expected != "" { + expected = tt.expected + } else { + expected = tt.statement + } + + result, err := ps.AppendStatementPathsContext(tt.context, tt.statement) + require.NoError(t, err) + assert.Equal(t, expected, result) + }) + } +} diff --git a/processor/transformprocessor/config.go b/processor/transformprocessor/config.go index 2eaeb094d5e5..0f350b57ca3e 100644 --- a/processor/transformprocessor/config.go +++ b/processor/transformprocessor/config.go @@ -5,8 +5,11 @@ package transformprocessor // import "github.com/open-telemetry/opentelemetry-co import ( "errors" + "fmt" + "reflect" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/featuregate" "go.uber.org/multierr" "go.uber.org/zap" @@ -24,7 +27,8 @@ var ( featuregate.WithRegisterFromVersion("v0.103.0"), featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32080#issuecomment-2120764953"), ) - errFlatLogsGateDisabled = errors.New("'flatten_data' requires the 'transform.flatten.logs' feature gate to be enabled") + errFlatLogsGateDisabled = errors.New("'flatten_data' requires the 'transform.flatten.logs' feature gate to be enabled") + configContextStatementsFields = []string{"trace_statements", "metric_statements", "log_statements"} ) // Config defines the configuration for the processor. @@ -44,6 +48,49 @@ type Config struct { logger *zap.Logger } +func (c *Config) Unmarshal(component *confmap.Conf) error { + if component == nil { + return nil + } + + contextStatementsPatch := map[string]any{} + for _, fieldName := range configContextStatementsFields { + if !component.IsSet(fieldName) { + continue + } + + rawVal := component.Get(fieldName) + values, ok := rawVal.([]any) + if !ok { + return fmt.Errorf("invalid %s type, expected: array, got: %t", fieldName, rawVal) + } + + if len(values) == 0 { + continue + } + + stmts := make([]any, 0, len(values)) + for _, value := range values { + if reflect.TypeOf(value).Kind() == reflect.String { + stmts = append(stmts, map[string]any{"statements": []any{value}}) + } else { + stmts = append(stmts, value) + } + } + + contextStatementsPatch[fieldName] = stmts + } + + if len(contextStatementsPatch) > 0 { + err := component.Merge(confmap.NewFromStringMap(contextStatementsPatch)) + if err != nil { + return err + } + } + + return component.Unmarshal(c) +} + var _ component.Config = (*Config)(nil) func (c *Config) Validate() error { diff --git a/processor/transformprocessor/internal/common/config.go b/processor/transformprocessor/internal/common/config.go index c0f293457329..1964ca4e97fe 100644 --- a/processor/transformprocessor/internal/common/config.go +++ b/processor/transformprocessor/internal/common/config.go @@ -36,3 +36,7 @@ type ContextStatements struct { Conditions []string `mapstructure:"conditions"` Statements []string `mapstructure:"statements"` } + +func (c ContextStatements) GetStatements() []string { + return c.Statements +} diff --git a/processor/transformprocessor/internal/common/logs.go b/processor/transformprocessor/internal/common/logs.go index 4d9726c38260..1e9d636b3168 100644 --- a/processor/transformprocessor/internal/common/logs.go +++ b/processor/transformprocessor/internal/common/logs.go @@ -14,8 +14,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope" ) var _ consumer.Logs = &logStatements{} @@ -55,76 +53,56 @@ func (l logStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error { return nil } -type LogParserCollection struct { - parserCollection - logParser ottl.Parser[ottllog.TransformContext] -} +type LogParserCollection ottl.ParserCollection[ContextStatements, consumer.Logs] -type LogParserCollectionOption func(*LogParserCollection) error +type LogParserCollectionOption ottl.ParserCollectionOption[ContextStatements, consumer.Logs] func WithLogParser(functions map[string]ottl.Factory[ottllog.TransformContext]) LogParserCollectionOption { - return func(lp *LogParserCollection) error { - logParser, err := ottllog.NewParser(functions, lp.settings) + return func(pc *ottl.ParserCollection[ContextStatements, consumer.Logs]) error { + logParser, err := ottllog.NewParser(functions, pc.Settings, ottllog.WithPathContextNames()) if err != nil { return err } - lp.logParser = logParser - return nil + return ottl.WithParserCollectionContext(ottllog.PathContextName, &logParser, convertLogStatements)(pc) } } -func WithLogErrorMode(errorMode ottl.ErrorMode) LogParserCollectionOption { - return func(lp *LogParserCollection) error { - lp.errorMode = errorMode - return nil +func convertLogStatements(pc *ottl.ParserCollection[ContextStatements, consumer.Logs], _ *ottl.Parser[ottllog.TransformContext], _ string, statements ContextStatements, parsedStatements []*ottl.Statement[ottllog.TransformContext]) (consumer.Logs, error) { + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForLog, statements.Conditions, pc.ErrorMode, pc.Settings, filterottl.StandardLogFuncs()) + if errGlobalBoolExpr != nil { + return nil, errGlobalBoolExpr } + lStatements := ottllog.NewStatementSequence(parsedStatements, pc.Settings, ottllog.WithStatementSequenceErrorMode(pc.ErrorMode)) + return logStatements{lStatements, globalExpr}, nil +} + +func WithLogErrorMode(errorMode ottl.ErrorMode) LogParserCollectionOption { + return LogParserCollectionOption(ottl.WithParserCollectionErrorMode[ContextStatements, consumer.Logs](errorMode)) } func NewLogParserCollection(settings component.TelemetrySettings, options ...LogParserCollectionOption) (*LogParserCollection, error) { - rp, err := ottlresource.NewParser(ResourceFunctions(), settings) - if err != nil { - return nil, err - } - sp, err := ottlscope.NewParser(ScopeFunctions(), settings) - if err != nil { - return nil, err + pcOptions := []ottl.ParserCollectionOption[ContextStatements, consumer.Logs]{ + withCommonContextParsers[consumer.Logs](), + ottl.EnableParserCollectionModifiedStatementLogging[ContextStatements, consumer.Logs](true), } - lpc := &LogParserCollection{ - parserCollection: parserCollection{ - settings: settings, - resourceParser: rp, - scopeParser: sp, - }, + + for _, option := range options { + pcOptions = append(pcOptions, ottl.ParserCollectionOption[ContextStatements, consumer.Logs](option)) } - for _, op := range options { - err := op(lpc) - if err != nil { - return nil, err - } + pc, err := ottl.NewParserCollection(settings, pcOptions...) + if err != nil { + return nil, err } - return lpc, nil + lpc := LogParserCollection(*pc) + return &lpc, nil } -func (pc LogParserCollection) ParseContextStatements(contextStatements ContextStatements) (consumer.Logs, error) { - switch contextStatements.Context { - case Log: - parsedStatements, err := pc.logParser.ParseStatements(contextStatements.Statements) - if err != nil { - return nil, err - } - globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForLog, contextStatements.Conditions, pc.parserCollection, filterottl.StandardLogFuncs()) - if errGlobalBoolExpr != nil { - return nil, errGlobalBoolExpr - } - lStatements := ottllog.NewStatementSequence(parsedStatements, pc.settings, ottllog.WithStatementSequenceErrorMode(pc.errorMode)) - return logStatements{lStatements, globalExpr}, nil - default: - statements, err := pc.parseCommonContextStatements(contextStatements) - if err != nil { - return nil, err - } - return statements, nil +func (lpc *LogParserCollection) ParseContextStatements(contextStatements ContextStatements) (consumer.Logs, error) { + pc := ottl.ParserCollection[ContextStatements, consumer.Logs](*lpc) + if contextStatements.Context != "" { + return pc.ParseStatementsWithContext(string(contextStatements.Context), contextStatements, true) } + return pc.ParseStatements(contextStatements) } diff --git a/processor/transformprocessor/internal/common/metrics.go b/processor/transformprocessor/internal/common/metrics.go index 3ae07920ca2c..9d5695dafa36 100644 --- a/processor/transformprocessor/internal/common/metrics.go +++ b/processor/transformprocessor/internal/common/metrics.go @@ -16,8 +16,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope" ) var _ consumer.Metrics = &metricStatements{} @@ -169,99 +167,76 @@ func (d dataPointStatements) handleSummaryDataPoints(ctx context.Context, dps pm return nil } -type MetricParserCollection struct { - parserCollection - metricParser ottl.Parser[ottlmetric.TransformContext] - dataPointParser ottl.Parser[ottldatapoint.TransformContext] -} +type MetricParserCollection ottl.ParserCollection[ContextStatements, consumer.Metrics] -type MetricParserCollectionOption func(*MetricParserCollection) error +type MetricParserCollectionOption ottl.ParserCollectionOption[ContextStatements, consumer.Metrics] func WithMetricParser(functions map[string]ottl.Factory[ottlmetric.TransformContext]) MetricParserCollectionOption { - return func(mp *MetricParserCollection) error { - metricParser, err := ottlmetric.NewParser(functions, mp.settings) + return func(pc *ottl.ParserCollection[ContextStatements, consumer.Metrics]) error { + metricParser, err := ottlmetric.NewParser(functions, pc.Settings, ottlmetric.WithPathContextNames()) if err != nil { return err } - mp.metricParser = metricParser - return nil + opt := ottl.WithParserCollectionContext(ottlmetric.PathContextName, &metricParser, convertMetricStatements) + return opt(pc) + } +} + +func convertMetricStatements(pc *ottl.ParserCollection[ContextStatements, consumer.Metrics], _ *ottl.Parser[ottlmetric.TransformContext], _ string, statements ContextStatements, parsedStatements []*ottl.Statement[ottlmetric.TransformContext]) (consumer.Metrics, error) { + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForMetric, statements.Conditions, pc.ErrorMode, pc.Settings, filterottl.StandardMetricFuncs()) + if errGlobalBoolExpr != nil { + return nil, errGlobalBoolExpr } + mStatements := ottlmetric.NewStatementSequence(parsedStatements, pc.Settings, ottlmetric.WithStatementSequenceErrorMode(pc.ErrorMode)) + return metricStatements{mStatements, globalExpr}, nil } func WithDataPointParser(functions map[string]ottl.Factory[ottldatapoint.TransformContext]) MetricParserCollectionOption { - return func(mp *MetricParserCollection) error { - dataPointParser, err := ottldatapoint.NewParser(functions, mp.settings) + return func(pc *ottl.ParserCollection[ContextStatements, consumer.Metrics]) error { + dataPointParser, err := ottldatapoint.NewParser(functions, pc.Settings, ottldatapoint.WithPathContextNames()) if err != nil { return err } - mp.dataPointParser = dataPointParser - return nil + return ottl.WithParserCollectionContext(ottldatapoint.PathContextName, &dataPointParser, convertDataPointStatements)(pc) } } -func WithMetricErrorMode(errorMode ottl.ErrorMode) MetricParserCollectionOption { - return func(mp *MetricParserCollection) error { - mp.errorMode = errorMode - return nil +func convertDataPointStatements(pc *ottl.ParserCollection[ContextStatements, consumer.Metrics], _ *ottl.Parser[ottldatapoint.TransformContext], _ string, statements ContextStatements, parsedStatements []*ottl.Statement[ottldatapoint.TransformContext]) (consumer.Metrics, error) { + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForDataPoint, statements.Conditions, pc.ErrorMode, pc.Settings, filterottl.StandardDataPointFuncs()) + if errGlobalBoolExpr != nil { + return nil, errGlobalBoolExpr } + dpStatements := ottldatapoint.NewStatementSequence(parsedStatements, pc.Settings, ottldatapoint.WithStatementSequenceErrorMode(pc.ErrorMode)) + return dataPointStatements{dpStatements, globalExpr}, nil +} + +func WithMetricErrorMode(errorMode ottl.ErrorMode) MetricParserCollectionOption { + return MetricParserCollectionOption(ottl.WithParserCollectionErrorMode[ContextStatements, consumer.Metrics](errorMode)) } func NewMetricParserCollection(settings component.TelemetrySettings, options ...MetricParserCollectionOption) (*MetricParserCollection, error) { - rp, err := ottlresource.NewParser(ResourceFunctions(), settings) - if err != nil { - return nil, err - } - sp, err := ottlscope.NewParser(ScopeFunctions(), settings) - if err != nil { - return nil, err + pcOptions := []ottl.ParserCollectionOption[ContextStatements, consumer.Metrics]{ + withCommonContextParsers[consumer.Metrics](), + ottl.EnableParserCollectionModifiedStatementLogging[ContextStatements, consumer.Metrics](true), } - mpc := &MetricParserCollection{ - parserCollection: parserCollection{ - settings: settings, - resourceParser: rp, - scopeParser: sp, - }, + + for _, option := range options { + pcOptions = append(pcOptions, ottl.ParserCollectionOption[ContextStatements, consumer.Metrics](option)) } - for _, op := range options { - err := op(mpc) - if err != nil { - return nil, err - } + pc, err := ottl.NewParserCollection(settings, pcOptions...) + if err != nil { + return nil, err } - return mpc, nil + mpc := MetricParserCollection(*pc) + return &mpc, nil } -func (pc MetricParserCollection) ParseContextStatements(contextStatements ContextStatements) (consumer.Metrics, error) { - switch contextStatements.Context { - case Metric: - parseStatements, err := pc.metricParser.ParseStatements(contextStatements.Statements) - if err != nil { - return nil, err - } - globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForMetric, contextStatements.Conditions, pc.parserCollection, filterottl.StandardMetricFuncs()) - if errGlobalBoolExpr != nil { - return nil, errGlobalBoolExpr - } - mStatements := ottlmetric.NewStatementSequence(parseStatements, pc.settings, ottlmetric.WithStatementSequenceErrorMode(pc.errorMode)) - return metricStatements{mStatements, globalExpr}, nil - case DataPoint: - parsedStatements, err := pc.dataPointParser.ParseStatements(contextStatements.Statements) - if err != nil { - return nil, err - } - globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForDataPoint, contextStatements.Conditions, pc.parserCollection, filterottl.StandardDataPointFuncs()) - if errGlobalBoolExpr != nil { - return nil, errGlobalBoolExpr - } - dpStatements := ottldatapoint.NewStatementSequence(parsedStatements, pc.settings, ottldatapoint.WithStatementSequenceErrorMode(pc.errorMode)) - return dataPointStatements{dpStatements, globalExpr}, nil - default: - statements, err := pc.parseCommonContextStatements(contextStatements) - if err != nil { - return nil, err - } - return statements, nil +func (mpc *MetricParserCollection) ParseContextStatements(contextStatements ContextStatements) (consumer.Metrics, error) { + pc := ottl.ParserCollection[ContextStatements, consumer.Metrics](*mpc) + if contextStatements.Context != "" { + return pc.ParseStatementsWithContext(string(contextStatements.Context), contextStatements, true) } + return pc.ParseStatements(contextStatements) } diff --git a/processor/transformprocessor/internal/common/processor.go b/processor/transformprocessor/internal/common/processor.go index 137cac8ffeac..bf7857590859 100644 --- a/processor/transformprocessor/internal/common/processor.go +++ b/processor/transformprocessor/internal/common/processor.go @@ -5,7 +5,6 @@ package common // import "github.com/open-telemetry/opentelemetry-collector-cont import ( "context" - "fmt" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -169,56 +168,78 @@ func (s scopeStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error { return nil } -type parserCollection struct { - settings component.TelemetrySettings - resourceParser ottl.Parser[ottlresource.TransformContext] - scopeParser ottl.Parser[ottlscope.TransformContext] - errorMode ottl.ErrorMode -} - type baseContext interface { consumer.Traces consumer.Metrics consumer.Logs } -func (pc parserCollection) parseCommonContextStatements(contextStatement ContextStatements) (baseContext, error) { - switch contextStatement.Context { - case Resource: - parsedStatements, err := pc.resourceParser.ParseStatements(contextStatement.Statements) +func withCommonContextParsers[R any]() ottl.ParserCollectionOption[ContextStatements, R] { + return func(pc *ottl.ParserCollection[ContextStatements, R]) error { + rp, err := ottlresource.NewParser(ResourceFunctions(), pc.Settings, ottlresource.WithPathContextNames()) if err != nil { - return nil, err + return err } - globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForResource, contextStatement.Conditions, pc, filterottl.StandardResourceFuncs()) - if errGlobalBoolExpr != nil { - return nil, errGlobalBoolExpr + sp, err := ottlscope.NewParser(ScopeFunctions(), pc.Settings, ottlscope.WithPathContextNames()) + if err != nil { + return err } - rStatements := ottlresource.NewStatementSequence(parsedStatements, pc.settings, ottlresource.WithStatementSequenceErrorMode(pc.errorMode)) - return resourceStatements{rStatements, globalExpr}, nil - case Scope: - parsedStatements, err := pc.scopeParser.ParseStatements(contextStatement.Statements) + + err = ottl.WithParserCollectionContext[ottlresource.TransformContext, ContextStatements, R](ottlresource.PathContextName, &rp, parseResourceContextStatements)(pc) if err != nil { - return nil, err + return err } - globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForScope, contextStatement.Conditions, pc, filterottl.StandardScopeFuncs()) - if errGlobalBoolExpr != nil { - return nil, errGlobalBoolExpr + + err = ottl.WithParserCollectionContext[ottlscope.TransformContext, ContextStatements, R](ottlscope.PathContextName, &sp, parseScopeContextStatements)(pc) + if err != nil { + return err } - sStatements := ottlscope.NewStatementSequence(parsedStatements, pc.settings, ottlscope.WithStatementSequenceErrorMode(pc.errorMode)) - return scopeStatements{sStatements, globalExpr}, nil - default: - return nil, fmt.Errorf("unknown context %v", contextStatement.Context) + + return nil + } +} + +func parseResourceContextStatements[R any]( + collection *ottl.ParserCollection[ContextStatements, R], + _ *ottl.Parser[ottlresource.TransformContext], + _ string, + statements ContextStatements, + parsedStatements []*ottl.Statement[ottlresource.TransformContext], +) (R, error) { + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForResource, statements.Conditions, collection.ErrorMode, collection.Settings, filterottl.StandardResourceFuncs()) + if errGlobalBoolExpr != nil { + return *new(R), errGlobalBoolExpr + } + rStatements := ottlresource.NewStatementSequence(parsedStatements, collection.Settings, ottlresource.WithStatementSequenceErrorMode(collection.ErrorMode)) + result := (baseContext)(resourceStatements{rStatements, globalExpr}) + return result.(R), nil +} + +func parseScopeContextStatements[R any]( + collection *ottl.ParserCollection[ContextStatements, R], + _ *ottl.Parser[ottlscope.TransformContext], + _ string, + statements ContextStatements, + parsedStatements []*ottl.Statement[ottlscope.TransformContext], +) (R, error) { + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForScope, statements.Conditions, collection.ErrorMode, collection.Settings, filterottl.StandardScopeFuncs()) + if errGlobalBoolExpr != nil { + return *new(R), errGlobalBoolExpr } + sStatements := ottlscope.NewStatementSequence(parsedStatements, collection.Settings, ottlscope.WithStatementSequenceErrorMode(collection.ErrorMode)) + result := (baseContext)(scopeStatements{sStatements, globalExpr}) + return result.(R), nil } func parseGlobalExpr[K any]( boolExprFunc func([]string, map[string]ottl.Factory[K], ottl.ErrorMode, component.TelemetrySettings) (expr.BoolExpr[K], error), conditions []string, - pc parserCollection, + errorMode ottl.ErrorMode, + settings component.TelemetrySettings, standardFuncs map[string]ottl.Factory[K]) (expr.BoolExpr[K], error) { if len(conditions) > 0 { - return boolExprFunc(conditions, standardFuncs, pc.errorMode, pc.settings) + return boolExprFunc(conditions, standardFuncs, errorMode, settings) } // By default, set the global expression to always true unless conditions are specified. return expr.AlwaysTrue[K](), nil diff --git a/processor/transformprocessor/internal/common/traces.go b/processor/transformprocessor/internal/common/traces.go index de03b8afe917..f2f396ccb7ce 100644 --- a/processor/transformprocessor/internal/common/traces.go +++ b/processor/transformprocessor/internal/common/traces.go @@ -13,8 +13,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/expr" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterottl" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspanevent" ) @@ -97,95 +95,75 @@ func (s spanEventStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces return nil } -type TraceParserCollection struct { - parserCollection - spanParser ottl.Parser[ottlspan.TransformContext] - spanEventParser ottl.Parser[ottlspanevent.TransformContext] -} +type TraceParserCollection ottl.ParserCollection[ContextStatements, consumer.Traces] -type TraceParserCollectionOption func(*TraceParserCollection) error +type TraceParserCollectionOption ottl.ParserCollectionOption[ContextStatements, consumer.Traces] func WithSpanParser(functions map[string]ottl.Factory[ottlspan.TransformContext]) TraceParserCollectionOption { - return func(tp *TraceParserCollection) error { - spanParser, err := ottlspan.NewParser(functions, tp.settings) + return func(pc *ottl.ParserCollection[ContextStatements, consumer.Traces]) error { + parser, err := ottlspan.NewParser(functions, pc.Settings, ottlspan.WithPathContextNames()) if err != nil { return err } - tp.spanParser = spanParser - return nil + return ottl.WithParserCollectionContext(ottlspan.PathContextName, &parser, convertSpanStatements)(pc) + } +} + +func convertSpanStatements(collection *ottl.ParserCollection[ContextStatements, consumer.Traces], _ *ottl.Parser[ottlspan.TransformContext], _ string, statements ContextStatements, parsedStatements []*ottl.Statement[ottlspan.TransformContext]) (consumer.Traces, error) { + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForSpan, statements.Conditions, collection.ErrorMode, collection.Settings, filterottl.StandardSpanFuncs()) + if errGlobalBoolExpr != nil { + return nil, errGlobalBoolExpr } + sStatements := ottlspan.NewStatementSequence(parsedStatements, collection.Settings, ottlspan.WithStatementSequenceErrorMode(collection.ErrorMode)) + return traceStatements{sStatements, globalExpr}, nil } func WithSpanEventParser(functions map[string]ottl.Factory[ottlspanevent.TransformContext]) TraceParserCollectionOption { - return func(tp *TraceParserCollection) error { - spanEventParser, err := ottlspanevent.NewParser(functions, tp.settings) + return func(pc *ottl.ParserCollection[ContextStatements, consumer.Traces]) error { + parser, err := ottlspanevent.NewParser(functions, pc.Settings, ottlspanevent.WithPathContextNames()) if err != nil { return err } - tp.spanEventParser = spanEventParser - return nil + return ottl.WithParserCollectionContext(ottlspanevent.PathContextName, &parser, convertSpanEventStatements)(pc) } } -func WithTraceErrorMode(errorMode ottl.ErrorMode) TraceParserCollectionOption { - return func(tp *TraceParserCollection) error { - tp.errorMode = errorMode - return nil +func convertSpanEventStatements(collection *ottl.ParserCollection[ContextStatements, consumer.Traces], _ *ottl.Parser[ottlspanevent.TransformContext], _ string, statements ContextStatements, parsedStatements []*ottl.Statement[ottlspanevent.TransformContext]) (consumer.Traces, error) { + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForSpanEvent, statements.Conditions, collection.ErrorMode, collection.Settings, filterottl.StandardSpanEventFuncs()) + if errGlobalBoolExpr != nil { + return nil, errGlobalBoolExpr } + seStatements := ottlspanevent.NewStatementSequence(parsedStatements, collection.Settings, ottlspanevent.WithStatementSequenceErrorMode(collection.ErrorMode)) + return spanEventStatements{seStatements, globalExpr}, nil +} + +func WithTraceErrorMode(errorMode ottl.ErrorMode) TraceParserCollectionOption { + return TraceParserCollectionOption(ottl.WithParserCollectionErrorMode[ContextStatements, consumer.Traces](errorMode)) } func NewTraceParserCollection(settings component.TelemetrySettings, options ...TraceParserCollectionOption) (*TraceParserCollection, error) { - rp, err := ottlresource.NewParser(ResourceFunctions(), settings) - if err != nil { - return nil, err - } - sp, err := ottlscope.NewParser(ScopeFunctions(), settings) - if err != nil { - return nil, err + pcOptions := []ottl.ParserCollectionOption[ContextStatements, consumer.Traces]{ + withCommonContextParsers[consumer.Traces](), + ottl.EnableParserCollectionModifiedStatementLogging[ContextStatements, consumer.Traces](true), } - tpc := &TraceParserCollection{ - parserCollection: parserCollection{ - settings: settings, - resourceParser: rp, - scopeParser: sp, - }, + + for _, option := range options { + pcOptions = append(pcOptions, ottl.ParserCollectionOption[ContextStatements, consumer.Traces](option)) } - for _, op := range options { - err := op(tpc) - if err != nil { - return nil, err - } + pc, err := ottl.NewParserCollection(settings, pcOptions...) + if err != nil { + return nil, err } - return tpc, nil + tpc := TraceParserCollection(*pc) + return &tpc, nil } -func (pc TraceParserCollection) ParseContextStatements(contextStatements ContextStatements) (consumer.Traces, error) { - switch contextStatements.Context { - case Span: - parsedStatements, err := pc.spanParser.ParseStatements(contextStatements.Statements) - if err != nil { - return nil, err - } - globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForSpan, contextStatements.Conditions, pc.parserCollection, filterottl.StandardSpanFuncs()) - if errGlobalBoolExpr != nil { - return nil, errGlobalBoolExpr - } - sStatements := ottlspan.NewStatementSequence(parsedStatements, pc.settings, ottlspan.WithStatementSequenceErrorMode(pc.errorMode)) - return traceStatements{sStatements, globalExpr}, nil - case SpanEvent: - parsedStatements, err := pc.spanEventParser.ParseStatements(contextStatements.Statements) - if err != nil { - return nil, err - } - globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForSpanEvent, contextStatements.Conditions, pc.parserCollection, filterottl.StandardSpanEventFuncs()) - if errGlobalBoolExpr != nil { - return nil, errGlobalBoolExpr - } - seStatements := ottlspanevent.NewStatementSequence(parsedStatements, pc.settings, ottlspanevent.WithStatementSequenceErrorMode(pc.errorMode)) - return spanEventStatements{seStatements, globalExpr}, nil - default: - return pc.parseCommonContextStatements(contextStatements) +func (tpc *TraceParserCollection) ParseContextStatements(contextStatements ContextStatements) (consumer.Traces, error) { + pc := ottl.ParserCollection[ContextStatements, consumer.Traces](*tpc) + if contextStatements.Context != "" { + return pc.ParseStatementsWithContext(string(contextStatements.Context), contextStatements, true) } + return pc.ParseStatements(contextStatements) }