From 1df24f83c6f84e94f6f007758dad6d88f0eb6661 Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Mon, 13 Mar 2023 14:07:07 -0600 Subject: [PATCH 1/3] Fix issue where ErrorMode wasn't used. --- processor/transformprocessor/factory.go | 6 ++--- processor/transformprocessor/factory_test.go | 25 +++++++++++++++----- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/processor/transformprocessor/factory.go b/processor/transformprocessor/factory.go index 18a09177c037..9b4b0b331173 100644 --- a/processor/transformprocessor/factory.go +++ b/processor/transformprocessor/factory.go @@ -64,7 +64,7 @@ func createLogsProcessor( ) (processor.Logs, error) { oCfg := cfg.(*Config) - proc, err := logs.NewProcessor(oCfg.LogStatements, ottl.PropagateError, set.TelemetrySettings) + proc, err := logs.NewProcessor(oCfg.LogStatements, oCfg.ErrorMode, set.TelemetrySettings) if err != nil { return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err) } @@ -85,7 +85,7 @@ func createTracesProcessor( ) (processor.Traces, error) { oCfg := cfg.(*Config) - proc, err := traces.NewProcessor(oCfg.TraceStatements, ottl.PropagateError, set.TelemetrySettings) + proc, err := traces.NewProcessor(oCfg.TraceStatements, oCfg.ErrorMode, set.TelemetrySettings) if err != nil { return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err) } @@ -106,7 +106,7 @@ func createMetricsProcessor( ) (processor.Metrics, error) { oCfg := cfg.(*Config) - proc, err := metrics.NewProcessor(oCfg.MetricStatements, ottl.PropagateError, set.TelemetrySettings) + proc, err := metrics.NewProcessor(oCfg.MetricStatements, oCfg.ErrorMode, set.TelemetrySettings) if err != nil { return nil, fmt.Errorf("invalid config for \"transform\" processor %w", err) } diff --git a/processor/transformprocessor/factory_test.go b/processor/transformprocessor/factory_test.go index c7bb55f56325..0d7de1e0c2af 100644 --- a/processor/transformprocessor/factory_test.go +++ b/processor/transformprocessor/factory_test.go @@ -74,10 +74,14 @@ func TestFactoryCreateTracesProcessor(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig() oCfg := cfg.(*Config) + oCfg.ErrorMode = ottl.IgnoreError oCfg.TraceStatements = []common.ContextStatements{ { - Context: "span", - Statements: []string{`set(attributes["test"], "pass") where name == "operationA"`}, + Context: "span", + Statements: []string{ + `set(attributes["test"], "pass") where name == "operationA"`, + `set(attributes["test error mode"], ParseJSON(1)) where name == "operationA"`, + }, }, } tp, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) @@ -103,6 +107,7 @@ func TestFactoryCreateMetricsProcessor_InvalidActions(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig() oCfg := cfg.(*Config) + oCfg.ErrorMode = ottl.IgnoreError oCfg.MetricStatements = []common.ContextStatements{ { Context: "datapoint", @@ -118,10 +123,14 @@ func TestFactoryCreateMetricsProcessor(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig() oCfg := cfg.(*Config) + oCfg.ErrorMode = ottl.IgnoreError oCfg.MetricStatements = []common.ContextStatements{ { - Context: "datapoint", - Statements: []string{`set(attributes["test"], "pass") where metric.name == "operationA"`}, + Context: "datapoint", + Statements: []string{ + `set(attributes["test"], "pass") where metric.name == "operationA"`, + `set(attributes["test error mode"], ParseJSON(1)) where metric.name == "operationA"`, + }, }, } metricsProcessor, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) @@ -147,10 +156,14 @@ func TestFactoryCreateLogsProcessor(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig() oCfg := cfg.(*Config) + oCfg.ErrorMode = ottl.IgnoreError oCfg.LogStatements = []common.ContextStatements{ { - Context: "log", - Statements: []string{`set(attributes["test"], "pass") where body == "operationA"`}, + Context: "log", + Statements: []string{ + `set(attributes["test"], "pass") where body == "operationA"`, + `set(attributes["test error mode"], ParseJSON(1)) where body == "operationA"`, + }, }, } lp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) From 8aec8cb4060f856d5e3af314c2473db3028e5da3 Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Mon, 13 Mar 2023 14:11:05 -0600 Subject: [PATCH 2/3] add changelog --- .chloggen/tp-fix-error-mode-usage.yaml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100755 .chloggen/tp-fix-error-mode-usage.yaml diff --git a/.chloggen/tp-fix-error-mode-usage.yaml b/.chloggen/tp-fix-error-mode-usage.yaml new file mode 100755 index 000000000000..5828ce3ea048 --- /dev/null +++ b/.chloggen/tp-fix-error-mode-usage.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: transformprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fixes bug where the value for `error_mode` was ignored. + +# One or more tracking issues related to the change +issues: [19629] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: From 285b1724bd9fa18eecffab2a1a06c4e09219f228 Mon Sep 17 00:00:00 2001 From: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com> Date: Mon, 13 Mar 2023 14:34:18 -0600 Subject: [PATCH 3/3] Add more tests --- .../internal/common/logs.go | 1 - .../internal/common/traces.go | 1 - .../internal/logs/processor_test.go | 36 +++++++++++++-- .../internal/metrics/processor_test.go | 43 ++++++++++++++++-- .../internal/traces/processor_test.go | 45 ++++++++++++++++--- 5 files changed, 109 insertions(+), 17 deletions(-) diff --git a/processor/transformprocessor/internal/common/logs.go b/processor/transformprocessor/internal/common/logs.go index 629ad6adbba9..2b9ab45dfd9e 100644 --- a/processor/transformprocessor/internal/common/logs.go +++ b/processor/transformprocessor/internal/common/logs.go @@ -60,7 +60,6 @@ func (l logStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error { type LogParserCollection struct { parserCollection logParser ottl.Parser[ottllog.TransformContext] - errorMode ottl.ErrorMode } type LogParserCollectionOption func(*LogParserCollection) error diff --git a/processor/transformprocessor/internal/common/traces.go b/processor/transformprocessor/internal/common/traces.go index 4f66dbe3ca66..73a7f03b685c 100644 --- a/processor/transformprocessor/internal/common/traces.go +++ b/processor/transformprocessor/internal/common/traces.go @@ -96,7 +96,6 @@ type TraceParserCollection struct { parserCollection spanParser ottl.Parser[ottlspan.TransformContext] spanEventParser ottl.Parser[ottlspanevent.TransformContext] - errorMode ottl.ErrorMode } type TraceParserCollectionOption func(*TraceParserCollection) error diff --git a/processor/transformprocessor/internal/logs/processor_test.go b/processor/transformprocessor/internal/logs/processor_test.go index 2ee8a6e6f3a7..6ed1d6ffa276 100644 --- a/processor/transformprocessor/internal/logs/processor_test.go +++ b/processor/transformprocessor/internal/logs/processor_test.go @@ -60,7 +60,7 @@ func Test_ProcessLogs_ResourceContext(t *testing.T) { for _, tt := range tests { t.Run(tt.statement, func(t *testing.T) { td := constructLogs() - processor, err := NewProcessor([]common.ContextStatements{{Context: "resource", Statements: []string{tt.statement}}}, ottl.PropagateError, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "resource", Statements: []string{tt.statement}}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessLogs(context.Background(), td) @@ -95,7 +95,7 @@ func Test_ProcessLogs_ScopeContext(t *testing.T) { for _, tt := range tests { t.Run(tt.statement, func(t *testing.T) { td := constructLogs() - processor, err := NewProcessor([]common.ContextStatements{{Context: "scope", Statements: []string{tt.statement}}}, ottl.PropagateError, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "scope", Statements: []string{tt.statement}}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessLogs(context.Background(), td) @@ -333,7 +333,7 @@ func Test_ProcessLogs_LogContext(t *testing.T) { for _, tt := range tests { t.Run(tt.statement, func(t *testing.T) { td := constructLogs() - processor, err := NewProcessor([]common.ContextStatements{{Context: "log", Statements: []string{tt.statement}}}, ottl.PropagateError, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "log", Statements: []string{tt.statement}}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessLogs(context.Background(), td) @@ -450,7 +450,7 @@ func Test_ProcessLogs_MixContext(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { td := constructLogs() - processor, err := NewProcessor(tt.contextStatments, ottl.PropagateError, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor(tt.contextStatments, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessLogs(context.Background(), td) @@ -464,6 +464,34 @@ func Test_ProcessLogs_MixContext(t *testing.T) { } } +func Test_ProcessTraces_Error(t *testing.T) { + tests := []struct { + statement string + context common.ContextID + }{ + { + context: "resource", + }, + { + context: "scope", + }, + { + context: "log", + }, + } + + for _, tt := range tests { + t.Run(string(tt.context), func(t *testing.T) { + td := constructLogs() + processor, err := NewProcessor([]common.ContextStatements{{Context: tt.context, Statements: []string{`set(attributes["test"], ParseJSON(1))`}}}, ottl.PropagateError, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessLogs(context.Background(), td) + assert.Error(t, err) + }) + } +} + func constructLogs() plog.Logs { td := plog.NewLogs() rs0 := td.ResourceLogs().AppendEmpty() diff --git a/processor/transformprocessor/internal/metrics/processor_test.go b/processor/transformprocessor/internal/metrics/processor_test.go index 62f5c2faf255..33a5eb2ab355 100644 --- a/processor/transformprocessor/internal/metrics/processor_test.go +++ b/processor/transformprocessor/internal/metrics/processor_test.go @@ -56,7 +56,7 @@ func Test_ProcessMetrics_ResourceContext(t *testing.T) { for _, tt := range tests { t.Run(tt.statement, func(t *testing.T) { td := constructMetrics() - processor, err := NewProcessor([]common.ContextStatements{{Context: "resource", Statements: []string{tt.statement}}}, ottl.PropagateError, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "resource", Statements: []string{tt.statement}}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessMetrics(context.Background(), td) @@ -91,7 +91,7 @@ func Test_ProcessMetrics_ScopeContext(t *testing.T) { for _, tt := range tests { t.Run(tt.statement, func(t *testing.T) { td := constructMetrics() - processor, err := NewProcessor([]common.ContextStatements{{Context: "scope", Statements: []string{tt.statement}}}, ottl.PropagateError, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "scope", Statements: []string{tt.statement}}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessMetrics(context.Background(), td) @@ -510,7 +510,7 @@ func Test_ProcessMetrics_DataPointContext(t *testing.T) { for _, tt := range tests { t.Run(tt.statements[0], func(t *testing.T) { td := constructMetrics() - processor, err := NewProcessor([]common.ContextStatements{{Context: "datapoint", Statements: tt.statements}}, ottl.PropagateError, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "datapoint", Statements: tt.statements}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessMetrics(context.Background(), td) @@ -642,7 +642,7 @@ func Test_ProcessMetrics_MixContext(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { td := constructMetrics() - processor, err := NewProcessor(tt.contextStatments, ottl.PropagateError, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor(tt.contextStatments, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessMetrics(context.Background(), td) @@ -656,6 +656,41 @@ func Test_ProcessMetrics_MixContext(t *testing.T) { } } +func Test_ProcessMetrics_Error(t *testing.T) { + tests := []struct { + statement string + context common.ContextID + }{ + { + statement: `set(attributes["test"], ParseJSON(1))`, + context: "resource", + }, + { + statement: `set(attributes["test"], ParseJSON(1))`, + context: "scope", + }, + { + statement: `set(name, ParseJSON(1))`, + context: "metric", + }, + { + statement: `set(attributes["test"], ParseJSON(1))`, + context: "datapoint", + }, + } + + for _, tt := range tests { + t.Run(tt.statement, func(t *testing.T) { + td := constructMetrics() + processor, err := NewProcessor([]common.ContextStatements{{Context: tt.context, Statements: []string{tt.statement}}}, ottl.PropagateError, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessMetrics(context.Background(), td) + assert.Error(t, err) + }) + } +} + func constructMetrics() pmetric.Metrics { td := pmetric.NewMetrics() rm0 := td.ResourceMetrics().AppendEmpty() diff --git a/processor/transformprocessor/internal/traces/processor_test.go b/processor/transformprocessor/internal/traces/processor_test.go index c31c6709d534..ba384c37ee34 100644 --- a/processor/transformprocessor/internal/traces/processor_test.go +++ b/processor/transformprocessor/internal/traces/processor_test.go @@ -61,7 +61,7 @@ func Test_ProcessTraces_ResourceContext(t *testing.T) { for _, tt := range tests { t.Run(tt.statement, func(t *testing.T) { td := constructTraces() - processor, err := NewProcessor([]common.ContextStatements{{Context: "resource", Statements: []string{tt.statement}}}, ottl.PropagateError, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "resource", Statements: []string{tt.statement}}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessTraces(context.Background(), td) @@ -96,7 +96,7 @@ func Test_ProcessTraces_ScopeContext(t *testing.T) { for _, tt := range tests { t.Run(tt.statement, func(t *testing.T) { td := constructTraces() - processor, err := NewProcessor([]common.ContextStatements{{Context: "scope", Statements: []string{tt.statement}}}, ottl.PropagateError, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "scope", Statements: []string{tt.statement}}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessTraces(context.Background(), td) @@ -378,7 +378,7 @@ func Test_ProcessTraces_TraceContext(t *testing.T) { for _, tt := range tests { t.Run(tt.statement, func(t *testing.T) { td := constructTraces() - processor, err := NewProcessor([]common.ContextStatements{{Context: "span", Statements: []string{tt.statement}}}, ottl.PropagateError, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "span", Statements: []string{tt.statement}}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessTraces(context.Background(), td) @@ -408,7 +408,7 @@ func Test_ProcessTraces_SpanEventContext(t *testing.T) { for _, tt := range tests { t.Run(tt.statement, func(t *testing.T) { td := constructTraces() - processor, err := NewProcessor([]common.ContextStatements{{Context: "spanevent", Statements: []string{tt.statement}}}, ottl.PropagateError, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "spanevent", Statements: []string{tt.statement}}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessTraces(context.Background(), td) @@ -525,7 +525,7 @@ func Test_ProcessTraces_MixContext(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { td := constructTraces() - processor, err := NewProcessor(tt.contextStatments, ottl.PropagateError, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor(tt.contextStatments, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) assert.NoError(t, err) _, err = processor.ProcessTraces(context.Background(), td) @@ -539,6 +539,37 @@ func Test_ProcessTraces_MixContext(t *testing.T) { } } +func Test_ProcessTraces_Error(t *testing.T) { + tests := []struct { + statement string + context common.ContextID + }{ + { + context: "resource", + }, + { + context: "scope", + }, + { + context: "span", + }, + { + context: "spanevent", + }, + } + + for _, tt := range tests { + t.Run(string(tt.context), func(t *testing.T) { + td := constructTraces() + processor, err := NewProcessor([]common.ContextStatements{{Context: tt.context, Statements: []string{`set(attributes["test"], ParseJSON(1))`}}}, ottl.PropagateError, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + + _, err = processor.ProcessTraces(context.Background(), td) + assert.Error(t, err) + }) + } +} + func BenchmarkTwoSpans(b *testing.B) { tests := []struct { name string @@ -575,7 +606,7 @@ func BenchmarkTwoSpans(b *testing.B) { for _, tt := range tests { b.Run(tt.name, func(b *testing.B) { - processor, err := NewProcessor([]common.ContextStatements{{Context: "span", Statements: tt.statements}}, ottl.PropagateError, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "span", Statements: tt.statements}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) assert.NoError(b, err) b.ResetTimer() for n := 0; n < b.N; n++ { @@ -617,7 +648,7 @@ func BenchmarkHundredSpans(b *testing.B) { } for _, tt := range tests { b.Run(tt.name, func(b *testing.B) { - processor, err := NewProcessor([]common.ContextStatements{{Context: "span", Statements: tt.statements}}, ottl.PropagateError, componenttest.NewNopTelemetrySettings()) + processor, err := NewProcessor([]common.ContextStatements{{Context: "span", Statements: tt.statements}}, ottl.IgnoreError, componenttest.NewNopTelemetrySettings()) assert.NoError(b, err) b.ResetTimer() for n := 0; n < b.N; n++ {