From 52183e7c395b3f6930f297c796d1080e7d242392 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Mon, 30 Sep 2024 12:58:50 -0700 Subject: [PATCH] Deprecate funcs that repeate processor in name Signed-off-by: Bogdan Drutu --- .chloggen/deprecate-processor-name.yaml | 27 +++++ .../internal/templates/component_test.go.tmpl | 6 +- otelcol/command_components.go | 6 +- processor/batchprocessor/batch_processor.go | 8 +- .../batchprocessor/batch_processor_test.go | 8 +- processor/batchprocessor/factory_test.go | 6 +- .../generated_component_test.go | 6 +- processor/memorylimiterprocessor/factory.go | 18 +-- .../memorylimiterprocessor/factory_test.go | 6 +- .../generated_component_test.go | 6 +- .../memorylimiter_test.go | 8 +- processor/processor.go | 108 ++++++++++++------ processor/processor_test.go | 18 +-- processor/processorhelper/logs.go | 12 +- processor/processorhelper/logs_test.go | 24 ++-- processor/processorhelper/metrics.go | 12 +- processor/processorhelper/metrics_test.go | 24 ++-- processor/processorhelper/traces.go | 12 +- processor/processorhelper/traces_test.go | 24 ++-- processor/processorprofiles/processor.go | 6 +- processor/processorprofiles/processor_test.go | 2 +- processor/processortest/nop_processor.go | 24 ++-- processor/processortest/nop_processor_test.go | 6 +- processor/processortest/shutdown_verifier.go | 6 +- .../processortest/shutdown_verifier_test.go | 38 +++--- .../processortest/unhealthy_processor.go | 26 ++--- .../processortest/unhealthy_processor_test.go | 6 +- service/internal/builders/processor.go | 14 +-- service/internal/builders/processor_test.go | 6 +- .../testcomponents/example_processor.go | 12 +- 30 files changed, 276 insertions(+), 209 deletions(-) create mode 100644 .chloggen/deprecate-processor-name.yaml diff --git a/.chloggen/deprecate-processor-name.yaml b/.chloggen/deprecate-processor-name.yaml new file mode 100644 index 00000000000..f404361049c --- /dev/null +++ b/.chloggen/deprecate-processor-name.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: processor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Deprecate funcs that repeat "processor" in name + +# One or more tracking issues or pull requests related to the change +issues: [11310] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Factory.Create[Traces|Metrics|Logs|Profiles]Processor -> Factory.Create[Traces|Metrics|Logs|Profiles] + Factory.[Traces|Metrics|Logs|Profiles]ProcessorStability -> Factory.[Traces|Metrics|Logs|Profiles]Stability + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/cmd/mdatagen/internal/templates/component_test.go.tmpl b/cmd/mdatagen/internal/templates/component_test.go.tmpl index 27bdf0b4a0b..3ba603dbc00 100644 --- a/cmd/mdatagen/internal/templates/component_test.go.tmpl +++ b/cmd/mdatagen/internal/templates/component_test.go.tmpl @@ -173,7 +173,7 @@ func TestComponentLifecycle(t *testing.T) { { name: "logs", createFn: func(ctx context.Context, set processor.Settings, cfg component.Config) (component.Component, error) { - return factory.CreateLogsProcessor(ctx, set, cfg, consumertest.NewNop()) + return factory.CreateLogs(ctx, set, cfg, consumertest.NewNop()) }, }, {{ end }} @@ -181,7 +181,7 @@ func TestComponentLifecycle(t *testing.T) { { name: "metrics", createFn: func(ctx context.Context, set processor.Settings, cfg component.Config) (component.Component, error) { - return factory.CreateMetricsProcessor(ctx, set, cfg, consumertest.NewNop()) + return factory.CreateMetrics(ctx, set, cfg, consumertest.NewNop()) }, }, {{ end }} @@ -189,7 +189,7 @@ func TestComponentLifecycle(t *testing.T) { { name: "traces", createFn: func(ctx context.Context, set processor.Settings, cfg component.Config) (component.Component, error) { - return factory.CreateTracesProcessor(ctx, set, cfg, consumertest.NewNop()) + return factory.CreateTraces(ctx, set, cfg, consumertest.NewNop()) }, }, {{ end }} diff --git a/otelcol/command_components.go b/otelcol/command_components.go index becd8ff9ae7..90b40c9b5dc 100644 --- a/otelcol/command_components.go +++ b/otelcol/command_components.go @@ -81,9 +81,9 @@ func newComponentsCommand(set CollectorSettings) *cobra.Command { Name: prs.Type(), Module: factories.ProcessorModules[prs.Type()], Stability: map[string]string{ - "logs": prs.LogsProcessorStability().String(), - "metrics": prs.MetricsProcessorStability().String(), - "traces": prs.TracesProcessorStability().String(), + "logs": prs.LogsStability().String(), + "metrics": prs.MetricsStability().String(), + "traces": prs.TracesStability().String(), }, }) } diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index 4af8eaab42f..3a6188b5604 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -341,22 +341,22 @@ func (mb *multiShardBatcher) currentMetadataCardinality() int { return mb.size } -// ConsumeTraces implements TracesProcessor +// ConsumeTraces implements processor.Traces func (bp *batchProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { return bp.batcher.consume(ctx, td) } -// ConsumeMetrics implements MetricsProcessor +// ConsumeMetrics implements processor.Metrics func (bp *batchProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { return bp.batcher.consume(ctx, md) } -// ConsumeLogs implements LogsProcessor +// ConsumeLogs implements processor.Logs func (bp *batchProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error { return bp.batcher.consume(ctx, ld) } -// newBatchTracesProcessor creates a new batch processor that batches traces by size or with timeout +// newBatchTraces creates a new batch processor that batches traces by size or with timeout func newBatchTracesProcessor(set processor.Settings, next consumer.Traces, cfg *Config) (*batchProcessor, error) { return newBatchProcessor(set, cfg, func() batch { return newBatchTraces(next) }) } diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index eeaf7d3f965..bd14106a138 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -37,13 +37,13 @@ func TestProcessorShutdown(t *testing.T) { for i := 0; i < 5; i++ { require.NotPanics(t, func() { - tProc, err := factory.CreateTracesProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) + tProc, err := factory.CreateTraces(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) require.NoError(t, err) _ = tProc.Shutdown(ctx) }) require.NotPanics(t, func() { - mProc, err := factory.CreateMetricsProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) + mProc, err := factory.CreateMetrics(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) require.NoError(t, err) _ = mProc.Shutdown(ctx) }) @@ -63,12 +63,12 @@ func TestProcessorLifecycle(t *testing.T) { processorCreationSet := processortest.NewNopSettings() for i := 0; i < 5; i++ { - tProc, err := factory.CreateTracesProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) + tProc, err := factory.CreateTraces(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) require.NoError(t, err) require.NoError(t, tProc.Start(ctx, componenttest.NewNopHost())) require.NoError(t, tProc.Shutdown(ctx)) - mProc, err := factory.CreateMetricsProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) + mProc, err := factory.CreateMetrics(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) require.NoError(t, err) require.NoError(t, mProc.Start(ctx, componenttest.NewNopHost())) require.NoError(t, mProc.Shutdown(ctx)) diff --git a/processor/batchprocessor/factory_test.go b/processor/batchprocessor/factory_test.go index 6dbc3af13da..881a6cd9c3a 100644 --- a/processor/batchprocessor/factory_test.go +++ b/processor/batchprocessor/factory_test.go @@ -26,17 +26,17 @@ func TestCreateProcessor(t *testing.T) { cfg := factory.CreateDefaultConfig() creationSet := processortest.NewNopSettings() - tp, err := factory.CreateTracesProcessor(context.Background(), creationSet, cfg, nil) + tp, err := factory.CreateTraces(context.Background(), creationSet, cfg, nil) assert.NotNil(t, tp) assert.NoError(t, err, "cannot create trace processor") assert.NoError(t, tp.Shutdown(context.Background())) - mp, err := factory.CreateMetricsProcessor(context.Background(), creationSet, cfg, nil) + mp, err := factory.CreateMetrics(context.Background(), creationSet, cfg, nil) assert.NotNil(t, mp) assert.NoError(t, err, "cannot create metric processor") assert.NoError(t, mp.Shutdown(context.Background())) - lp, err := factory.CreateLogsProcessor(context.Background(), creationSet, cfg, nil) + lp, err := factory.CreateLogs(context.Background(), creationSet, cfg, nil) assert.NotNil(t, lp) assert.NoError(t, err, "cannot create logs processor") assert.NoError(t, lp.Shutdown(context.Background())) diff --git a/processor/batchprocessor/generated_component_test.go b/processor/batchprocessor/generated_component_test.go index 0de5ebd38a2..1f360ad47b7 100644 --- a/processor/batchprocessor/generated_component_test.go +++ b/processor/batchprocessor/generated_component_test.go @@ -40,21 +40,21 @@ func TestComponentLifecycle(t *testing.T) { { name: "logs", createFn: func(ctx context.Context, set processor.Settings, cfg component.Config) (component.Component, error) { - return factory.CreateLogsProcessor(ctx, set, cfg, consumertest.NewNop()) + return factory.CreateLogs(ctx, set, cfg, consumertest.NewNop()) }, }, { name: "metrics", createFn: func(ctx context.Context, set processor.Settings, cfg component.Config) (component.Component, error) { - return factory.CreateMetricsProcessor(ctx, set, cfg, consumertest.NewNop()) + return factory.CreateMetrics(ctx, set, cfg, consumertest.NewNop()) }, }, { name: "traces", createFn: func(ctx context.Context, set processor.Settings, cfg component.Config) (component.Component, error) { - return factory.CreateTracesProcessor(ctx, set, cfg, consumertest.NewNop()) + return factory.CreateTraces(ctx, set, cfg, consumertest.NewNop()) }, }, } diff --git a/processor/memorylimiterprocessor/factory.go b/processor/memorylimiterprocessor/factory.go index c64f401624a..03e18aeedc6 100644 --- a/processor/memorylimiterprocessor/factory.go +++ b/processor/memorylimiterprocessor/factory.go @@ -33,9 +33,9 @@ func NewFactory() processor.Factory { return processor.NewFactory( metadata.Type, createDefaultConfig, - processor.WithTraces(f.createTracesProcessor, metadata.TracesStability), - processor.WithMetrics(f.createMetricsProcessor, metadata.MetricsStability), - processor.WithLogs(f.createLogsProcessor, metadata.LogsStability)) + processor.WithTraces(f.createTraces, metadata.TracesStability), + processor.WithMetrics(f.createMetrics, metadata.MetricsStability), + processor.WithLogs(f.createLogs, metadata.LogsStability)) } // CreateDefaultConfig creates the default configuration for processor. Notice @@ -44,7 +44,7 @@ func createDefaultConfig() component.Config { return &Config{} } -func (f *factory) createTracesProcessor( +func (f *factory) createTraces( ctx context.Context, set processor.Settings, cfg component.Config, @@ -54,14 +54,14 @@ func (f *factory) createTracesProcessor( if err != nil { return nil, err } - return processorhelper.NewTracesProcessor(ctx, set, cfg, nextConsumer, + return processorhelper.NewTraces(ctx, set, cfg, nextConsumer, memLimiter.processTraces, processorhelper.WithCapabilities(processorCapabilities), processorhelper.WithStart(memLimiter.start), processorhelper.WithShutdown(memLimiter.shutdown)) } -func (f *factory) createMetricsProcessor( +func (f *factory) createMetrics( ctx context.Context, set processor.Settings, cfg component.Config, @@ -71,14 +71,14 @@ func (f *factory) createMetricsProcessor( if err != nil { return nil, err } - return processorhelper.NewMetricsProcessor(ctx, set, cfg, nextConsumer, + return processorhelper.NewMetrics(ctx, set, cfg, nextConsumer, memLimiter.processMetrics, processorhelper.WithCapabilities(processorCapabilities), processorhelper.WithStart(memLimiter.start), processorhelper.WithShutdown(memLimiter.shutdown)) } -func (f *factory) createLogsProcessor( +func (f *factory) createLogs( ctx context.Context, set processor.Settings, cfg component.Config, @@ -88,7 +88,7 @@ func (f *factory) createLogsProcessor( if err != nil { return nil, err } - return processorhelper.NewLogsProcessor(ctx, set, cfg, nextConsumer, + return processorhelper.NewLogs(ctx, set, cfg, nextConsumer, memLimiter.processLogs, processorhelper.WithCapabilities(processorCapabilities), processorhelper.WithStart(memLimiter.start), diff --git a/processor/memorylimiterprocessor/factory_test.go b/processor/memorylimiterprocessor/factory_test.go index 2502fa5b1ef..31a2fabab5a 100644 --- a/processor/memorylimiterprocessor/factory_test.go +++ b/processor/memorylimiterprocessor/factory_test.go @@ -38,19 +38,19 @@ func TestCreateProcessor(t *testing.T) { pCfg.MemorySpikeLimitMiB = 1907 pCfg.CheckInterval = 100 * time.Millisecond - tp, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopSettings(), cfg, consumertest.NewNop()) + tp, err := factory.CreateTraces(context.Background(), processortest.NewNopSettings(), cfg, consumertest.NewNop()) require.NoError(t, err) assert.NotNil(t, tp) // test if we can shutdown a monitoring routine that has not started require.ErrorIs(t, tp.Shutdown(context.Background()), memorylimiter.ErrShutdownNotStarted) require.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost())) - mp, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopSettings(), cfg, consumertest.NewNop()) + mp, err := factory.CreateMetrics(context.Background(), processortest.NewNopSettings(), cfg, consumertest.NewNop()) require.NoError(t, err) assert.NotNil(t, mp) require.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost())) - lp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopSettings(), cfg, consumertest.NewNop()) + lp, err := factory.CreateLogs(context.Background(), processortest.NewNopSettings(), cfg, consumertest.NewNop()) require.NoError(t, err) assert.NotNil(t, lp) assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost())) diff --git a/processor/memorylimiterprocessor/generated_component_test.go b/processor/memorylimiterprocessor/generated_component_test.go index 598a82be167..60bcc845654 100644 --- a/processor/memorylimiterprocessor/generated_component_test.go +++ b/processor/memorylimiterprocessor/generated_component_test.go @@ -40,21 +40,21 @@ func TestComponentLifecycle(t *testing.T) { { name: "logs", createFn: func(ctx context.Context, set processor.Settings, cfg component.Config) (component.Component, error) { - return factory.CreateLogsProcessor(ctx, set, cfg, consumertest.NewNop()) + return factory.CreateLogs(ctx, set, cfg, consumertest.NewNop()) }, }, { name: "metrics", createFn: func(ctx context.Context, set processor.Settings, cfg component.Config) (component.Component, error) { - return factory.CreateMetricsProcessor(ctx, set, cfg, consumertest.NewNop()) + return factory.CreateMetrics(ctx, set, cfg, consumertest.NewNop()) }, }, { name: "traces", createFn: func(ctx context.Context, set processor.Settings, cfg component.Config) (component.Component, error) { - return factory.CreateTracesProcessor(ctx, set, cfg, consumertest.NewNop()) + return factory.CreateTraces(ctx, set, cfg, consumertest.NewNop()) }, }, } diff --git a/processor/memorylimiterprocessor/memorylimiter_test.go b/processor/memorylimiterprocessor/memorylimiter_test.go index 9794b7dcd4b..b01703b12e5 100644 --- a/processor/memorylimiterprocessor/memorylimiter_test.go +++ b/processor/memorylimiterprocessor/memorylimiter_test.go @@ -56,7 +56,7 @@ func TestNoDataLoss(t *testing.T) { limiter, err := newMemoryLimiterProcessor(set, cfg) require.NoError(t, err) - processor, err := processorhelper.NewLogsProcessor(context.Background(), processor.Settings{ + processor, err := processorhelper.NewLogs(context.Background(), processor.Settings{ ID: component.MustNewID("nop"), TelemetrySettings: componenttest.NewNopTelemetrySettings(), }, cfg, exporter, @@ -176,7 +176,7 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(), tt.mlCfg) require.NoError(t, err) - mp, err := processorhelper.NewMetricsProcessor( + mp, err := processorhelper.NewMetrics( context.Background(), processortest.NewNopSettings(), tt.mlCfg, @@ -266,7 +266,7 @@ func TestTraceMemoryPressureResponse(t *testing.T) { ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(), tt.mlCfg) require.NoError(t, err) - tp, err := processorhelper.NewTracesProcessor( + tp, err := processorhelper.NewTraces( context.Background(), processortest.NewNopSettings(), tt.mlCfg, @@ -356,7 +356,7 @@ func TestLogMemoryPressureResponse(t *testing.T) { ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(), tt.mlCfg) require.NoError(t, err) - tp, err := processorhelper.NewLogsProcessor( + tp, err := processorhelper.NewLogs( context.Background(), processortest.NewNopSettings(), tt.mlCfg, diff --git a/processor/processor.go b/processor/processor.go index 6ded6505e2c..ea12c98f0b4 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -48,31 +48,49 @@ type Settings struct { type Factory interface { component.Factory - // CreateTracesProcessor creates a TracesProcessor based on this config. + // CreateTraces creates a Traces processor based on this config. // If the processor type does not support traces, // this function returns the error [pipeline.ErrSignalNotSupported]. - // Implementers can assume `nextConsumer` is never nil. - CreateTracesProcessor(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Traces) (Traces, error) + // Implementers can assume `next` is never nil. + CreateTraces(ctx context.Context, set Settings, cfg component.Config, next consumer.Traces) (Traces, error) - // TracesProcessorStability gets the stability level of the TracesProcessor. + // Deprecated: [v0.111.0] use CreateTraces. + CreateTracesProcessor(ctx context.Context, set Settings, cfg component.Config, next consumer.Traces) (Traces, error) + + // TracesStability gets the stability level of the Traces processor. + TracesStability() component.StabilityLevel + + // Deprecated: [v0.111.0] use TracesStability. TracesProcessorStability() component.StabilityLevel - // CreateMetricsProcessor creates a MetricsProcessor based on this config. + // CreateMetrics creates a Metrics processor based on this config. // If the processor type does not support metrics, // this function returns the error [pipeline.ErrSignalNotSupported]. - // Implementers can assume `nextConsumer` is never nil. - CreateMetricsProcessor(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Metrics) (Metrics, error) + // Implementers can assume `next` is never nil. + CreateMetrics(ctx context.Context, set Settings, cfg component.Config, next consumer.Metrics) (Metrics, error) + + // Deprecated: [v0.111.0] use CreateMetrics. + CreateMetricsProcessor(ctx context.Context, set Settings, cfg component.Config, next consumer.Metrics) (Metrics, error) - // MetricsProcessorStability gets the stability level of the MetricsProcessor. + // MetricsStability gets the stability level of the Metrics processor. + MetricsStability() component.StabilityLevel + + // Deprecated: [v0.111.0] use MetricsStability. MetricsProcessorStability() component.StabilityLevel - // CreateLogsProcessor creates a LogsProcessor based on the config. + // CreateLogs creates a Logs processor based on the config. // If the processor type does not support logs, // this function returns the error [pipeline.ErrSignalNotSupported]. - // Implementers can assume `nextConsumer` is never nil. - CreateLogsProcessor(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Logs) (Logs, error) + // Implementers can assume `next` is never nil. + CreateLogs(ctx context.Context, set Settings, cfg component.Config, next consumer.Logs) (Logs, error) + + // Deprecated: [v0.111.0] use CreateLogs. + CreateLogsProcessor(ctx context.Context, set Settings, cfg component.Config, next consumer.Logs) (Logs, error) + + // LogsStability gets the stability level of the Logs processor. + LogsStability() component.StabilityLevel - // LogsProcessorStability gets the stability level of the LogsProcessor. + // Deprecated: [v0.111.0] use LogsStability. LogsProcessorStability() component.StabilityLevel unexportedFactoryFunc() @@ -110,14 +128,29 @@ func (f *factory) Type() component.Type { func (f *factory) unexportedFactoryFunc() {} +func (f factory) TracesStability() component.StabilityLevel { + return f.tracesStabilityLevel +} + +// Deprecated: [v0.111.0] use TracesStability. func (f factory) TracesProcessorStability() component.StabilityLevel { return f.tracesStabilityLevel } -func (f factory) MetricsProcessorStability() component.StabilityLevel { +func (f factory) MetricsStability() component.StabilityLevel { return f.metricsStabilityLevel } +// Deprecated: [v0.111.0] use MetricsStability. +func (f factory) MetricsProcessorStability() component.StabilityLevel { + return f.tracesStabilityLevel +} + +func (f factory) LogsStability() component.StabilityLevel { + return f.logsStabilityLevel +} + +// Deprecated: [v0.111.0] use LogsStability. func (f factory) LogsProcessorStability() component.StabilityLevel { return f.logsStabilityLevel } @@ -125,48 +158,49 @@ func (f factory) LogsProcessorStability() component.StabilityLevel { // CreateTracesFunc is the equivalent of Factory.CreateTraces(). type CreateTracesFunc func(context.Context, Settings, component.Config, consumer.Traces) (Traces, error) -// CreateTracesProcessor implements Factory.CreateTracesProcessor(). -func (f CreateTracesFunc) CreateTracesProcessor( - ctx context.Context, - set Settings, - cfg component.Config, - nextConsumer consumer.Traces) (Traces, error) { +// CreateTraces implements Factory.CreateTraces. +func (f CreateTracesFunc) CreateTraces(ctx context.Context, set Settings, cfg component.Config, next consumer.Traces) (Traces, error) { if f == nil { return nil, pipeline.ErrSignalNotSupported } - return f(ctx, set, cfg, nextConsumer) + return f(ctx, set, cfg, next) +} + +// Deprecated: [v0.111.0] use CreateTraces. +func (f CreateTracesFunc) CreateTracesProcessor(ctx context.Context, set Settings, cfg component.Config, next consumer.Traces) (Traces, error) { + return f.CreateTraces(ctx, set, cfg, next) } // CreateMetricsFunc is the equivalent of Factory.CreateMetrics(). type CreateMetricsFunc func(context.Context, Settings, component.Config, consumer.Metrics) (Metrics, error) -// CreateMetricsProcessor implements Factory.CreateMetricsProcessor(). -func (f CreateMetricsFunc) CreateMetricsProcessor( - ctx context.Context, - set Settings, - cfg component.Config, - nextConsumer consumer.Metrics, -) (Metrics, error) { +// CreateMetrics implements Factory.CreateMetrics. +func (f CreateMetricsFunc) CreateMetrics(ctx context.Context, set Settings, cfg component.Config, next consumer.Metrics) (Metrics, error) { if f == nil { return nil, pipeline.ErrSignalNotSupported } - return f(ctx, set, cfg, nextConsumer) + return f(ctx, set, cfg, next) +} + +// Deprecated: [v0.111.0] use CreateMetrics. +func (f CreateMetricsFunc) CreateMetricsProcessor(ctx context.Context, set Settings, cfg component.Config, next consumer.Metrics) (Metrics, error) { + return f.CreateMetrics(ctx, set, cfg, next) } -// CreateLogsFunc is the equivalent of Factory.CreateLogs(). +// CreateLogsFunc is the equivalent of Factory.CreateLogs. type CreateLogsFunc func(context.Context, Settings, component.Config, consumer.Logs) (Logs, error) -// CreateLogsProcessor implements Factory.CreateLogsProcessor(). -func (f CreateLogsFunc) CreateLogsProcessor( - ctx context.Context, - set Settings, - cfg component.Config, - nextConsumer consumer.Logs, -) (Logs, error) { +// CreateLogs implements Factory.CreateLogs(). +func (f CreateLogsFunc) CreateLogs(ctx context.Context, set Settings, cfg component.Config, next consumer.Logs) (Logs, error) { if f == nil { return nil, pipeline.ErrSignalNotSupported } - return f(ctx, set, cfg, nextConsumer) + return f(ctx, set, cfg, next) +} + +// Deprecated: [v0.111.0] use CreateLogs. +func (f CreateLogsFunc) CreateLogsProcessor(ctx context.Context, set Settings, cfg component.Config, next consumer.Logs) (Logs, error) { + return f.CreateLogs(ctx, set, cfg, next) } // WithTraces overrides the default "error not supported" implementation for CreateTraces and the default "undefined" stability level. diff --git a/processor/processor_test.go b/processor/processor_test.go index 1d2c17c17e5..79fd2cb6aa6 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -23,11 +23,11 @@ func TestNewFactory(t *testing.T) { func() component.Config { return &defaultCfg }) assert.EqualValues(t, testType, factory.Type()) assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) - _, err := factory.CreateTracesProcessor(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) + _, err := factory.CreateTraces(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) require.Error(t, err) - _, err = factory.CreateMetricsProcessor(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) + _, err = factory.CreateMetrics(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) require.Error(t, err) - _, err = factory.CreateLogsProcessor(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) + _, err = factory.CreateLogs(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) assert.Error(t, err) } @@ -43,16 +43,16 @@ func TestNewFactoryWithOptions(t *testing.T) { assert.EqualValues(t, testType, factory.Type()) assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) - assert.Equal(t, component.StabilityLevelAlpha, factory.TracesProcessorStability()) - _, err := factory.CreateTracesProcessor(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) + assert.Equal(t, component.StabilityLevelAlpha, factory.TracesStability()) + _, err := factory.CreateTraces(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) require.NoError(t, err) - assert.Equal(t, component.StabilityLevelBeta, factory.MetricsProcessorStability()) - _, err = factory.CreateMetricsProcessor(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) + assert.Equal(t, component.StabilityLevelBeta, factory.MetricsStability()) + _, err = factory.CreateMetrics(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) require.NoError(t, err) - assert.Equal(t, component.StabilityLevelUnmaintained, factory.LogsProcessorStability()) - _, err = factory.CreateLogsProcessor(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) + assert.Equal(t, component.StabilityLevelUnmaintained, factory.LogsStability()) + _, err = factory.CreateLogs(context.Background(), Settings{}, &defaultCfg, consumertest.NewNop()) assert.NoError(t, err) } diff --git a/processor/processorhelper/logs.go b/processor/processorhelper/logs.go index 35415d258fd..9ea1e8d40a2 100644 --- a/processor/processorhelper/logs.go +++ b/processor/processorhelper/logs.go @@ -20,14 +20,14 @@ import ( // If error is returned then returned data are ignored. It MUST not call the next component. type ProcessLogsFunc func(context.Context, plog.Logs) (plog.Logs, error) -type logProcessor struct { +type logs struct { component.StartFunc component.ShutdownFunc consumer.Logs } -// NewLogsProcessor creates a processor.Logs that ensure context propagation and the right tags are set. -func NewLogsProcessor( +// NewLogs creates a processor.Logs that ensure context propagation and the right tags are set. +func NewLogs( _ context.Context, set processor.Settings, _ component.Config, @@ -35,7 +35,6 @@ func NewLogsProcessor( logsFunc ProcessLogsFunc, options ...Option, ) (processor.Logs, error) { - // TODO: Add observability metrics support if logsFunc == nil { return nil, errors.New("nil logsFunc") } @@ -68,9 +67,12 @@ func NewLogsProcessor( return nil, err } - return &logProcessor{ + return &logs{ StartFunc: bs.StartFunc, ShutdownFunc: bs.ShutdownFunc, Logs: logsConsumer, }, nil } + +// Deprecated: [v0.111.0] use NewTraces. +var NewLogsProcessor = NewLogs diff --git a/processor/processorhelper/logs_test.go b/processor/processorhelper/logs_test.go index 1fe5bb7098d..e39d5cac2b9 100644 --- a/processor/processorhelper/logs_test.go +++ b/processor/processorhelper/logs_test.go @@ -23,8 +23,8 @@ import ( var testLogsCfg = struct{}{} -func TestNewLogsProcessor(t *testing.T) { - lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(nil)) +func TestNewLogs(t *testing.T) { + lp, err := NewLogs(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(nil)) require.NoError(t, err) assert.True(t, lp.Capabilities().MutatesData) @@ -33,9 +33,9 @@ func TestNewLogsProcessor(t *testing.T) { assert.NoError(t, lp.Shutdown(context.Background())) } -func TestNewLogsProcessor_WithOptions(t *testing.T) { +func TestNewLogs_WithOptions(t *testing.T) { want := errors.New("my_error") - lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(nil), + lp, err := NewLogs(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(nil), WithStart(func(context.Context, component.Host) error { return want }), WithShutdown(func(context.Context) error { return want }), WithCapabilities(consumer.Capabilities{MutatesData: false})) @@ -46,20 +46,20 @@ func TestNewLogsProcessor_WithOptions(t *testing.T) { assert.False(t, lp.Capabilities().MutatesData) } -func TestNewLogsProcessor_NilRequiredFields(t *testing.T) { - _, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), nil) +func TestNewLogs_NilRequiredFields(t *testing.T) { + _, err := NewLogs(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), nil) assert.Error(t, err) } -func TestNewLogsProcessor_ProcessLogError(t *testing.T) { +func TestNewLogs_ProcessLogError(t *testing.T) { want := errors.New("my_error") - lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(want)) + lp, err := NewLogs(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(want)) require.NoError(t, err) assert.Equal(t, want, lp.ConsumeLogs(context.Background(), plog.NewLogs())) } -func TestNewLogsProcessor_ProcessLogsErrSkipProcessingData(t *testing.T) { - lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(ErrSkipProcessingData)) +func TestNewLogs_ProcessLogsErrSkipProcessingData(t *testing.T) { + lp, err := NewLogs(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(ErrSkipProcessingData)) require.NoError(t, err) assert.NoError(t, lp.ConsumeLogs(context.Background(), plog.NewLogs())) } @@ -70,7 +70,7 @@ func newTestLProcessor(retError error) ProcessLogsFunc { } } -func TestLogsProcessor_RecordInOut(t *testing.T) { +func TestLogs_RecordInOut(t *testing.T) { // Regardless of how many logs are ingested, emit just one mockAggregate := func(_ context.Context, _ plog.Logs) (plog.Logs, error) { ld := plog.NewLogs() @@ -87,7 +87,7 @@ func TestLogsProcessor_RecordInOut(t *testing.T) { incomingLogRecords.AppendEmpty() testTelemetry := setupTestTelemetry() - lp, err := NewLogsProcessor(context.Background(), testTelemetry.NewSettings(), &testLogsCfg, consumertest.NewNop(), mockAggregate) + lp, err := NewLogs(context.Background(), testTelemetry.NewSettings(), &testLogsCfg, consumertest.NewNop(), mockAggregate) require.NoError(t, err) assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost())) diff --git a/processor/processorhelper/metrics.go b/processor/processorhelper/metrics.go index ead30766b9f..af8a24dd193 100644 --- a/processor/processorhelper/metrics.go +++ b/processor/processorhelper/metrics.go @@ -20,14 +20,14 @@ import ( // If error is returned then returned data are ignored. It MUST not call the next component. type ProcessMetricsFunc func(context.Context, pmetric.Metrics) (pmetric.Metrics, error) -type metricsProcessor struct { +type metrics struct { component.StartFunc component.ShutdownFunc consumer.Metrics } -// NewMetricsProcessor creates a processor.Metrics that ensure context propagation and the right tags are set. -func NewMetricsProcessor( +// NewMetrics creates a processor.Metrics that ensure context propagation and the right tags are set. +func NewMetrics( _ context.Context, set processor.Settings, _ component.Config, @@ -35,7 +35,6 @@ func NewMetricsProcessor( metricsFunc ProcessMetricsFunc, options ...Option, ) (processor.Metrics, error) { - // TODO: Add observability metrics support if metricsFunc == nil { return nil, errors.New("nil metricsFunc") } @@ -68,9 +67,12 @@ func NewMetricsProcessor( return nil, err } - return &metricsProcessor{ + return &metrics{ StartFunc: bs.StartFunc, ShutdownFunc: bs.ShutdownFunc, Metrics: metricsConsumer, }, nil } + +// Deprecated: [v0.111.0] use NewMetrics. +var NewMetricsProcessor = NewMetrics diff --git a/processor/processorhelper/metrics_test.go b/processor/processorhelper/metrics_test.go index 90293dd0354..960e0c74f07 100644 --- a/processor/processorhelper/metrics_test.go +++ b/processor/processorhelper/metrics_test.go @@ -23,8 +23,8 @@ import ( var testMetricsCfg = struct{}{} -func TestNewMetricsProcessor(t *testing.T) { - mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(nil)) +func TestNewMetrics(t *testing.T) { + mp, err := NewMetrics(context.Background(), processortest.NewNopSettings(), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(nil)) require.NoError(t, err) assert.True(t, mp.Capabilities().MutatesData) @@ -33,9 +33,9 @@ func TestNewMetricsProcessor(t *testing.T) { assert.NoError(t, mp.Shutdown(context.Background())) } -func TestNewMetricsProcessor_WithOptions(t *testing.T) { +func TestNewMetrics_WithOptions(t *testing.T) { want := errors.New("my_error") - mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(nil), + mp, err := NewMetrics(context.Background(), processortest.NewNopSettings(), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(nil), WithStart(func(context.Context, component.Host) error { return want }), WithShutdown(func(context.Context) error { return want }), WithCapabilities(consumer.Capabilities{MutatesData: false})) @@ -46,20 +46,20 @@ func TestNewMetricsProcessor_WithOptions(t *testing.T) { assert.False(t, mp.Capabilities().MutatesData) } -func TestNewMetricsProcessor_NilRequiredFields(t *testing.T) { - _, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(), &testMetricsCfg, consumertest.NewNop(), nil) +func TestNewMetrics_NilRequiredFields(t *testing.T) { + _, err := NewMetrics(context.Background(), processortest.NewNopSettings(), &testMetricsCfg, consumertest.NewNop(), nil) assert.Error(t, err) } -func TestNewMetricsProcessor_ProcessMetricsError(t *testing.T) { +func TestNewMetrics_ProcessMetricsError(t *testing.T) { want := errors.New("my_error") - mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(want)) + mp, err := NewMetrics(context.Background(), processortest.NewNopSettings(), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(want)) require.NoError(t, err) assert.Equal(t, want, mp.ConsumeMetrics(context.Background(), pmetric.NewMetrics())) } -func TestNewMetricsProcessor_ProcessMetricsErrSkipProcessingData(t *testing.T) { - mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(ErrSkipProcessingData)) +func TestNewMetrics_ProcessMetricsErrSkipProcessingData(t *testing.T) { + mp, err := NewMetrics(context.Background(), processortest.NewNopSettings(), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(ErrSkipProcessingData)) require.NoError(t, err) assert.NoError(t, mp.ConsumeMetrics(context.Background(), pmetric.NewMetrics())) } @@ -70,7 +70,7 @@ func newTestMProcessor(retError error) ProcessMetricsFunc { } } -func TestMetricsProcessor_RecordInOut(t *testing.T) { +func TestMetrics_RecordInOut(t *testing.T) { // Regardless of how many data points are ingested, emit 3 mockAggregate := func(_ context.Context, _ pmetric.Metrics) (pmetric.Metrics, error) { md := pmetric.NewMetrics() @@ -88,7 +88,7 @@ func TestMetricsProcessor_RecordInOut(t *testing.T) { dps.AppendEmpty() testTelemetry := setupTestTelemetry() - mp, err := NewMetricsProcessor(context.Background(), testTelemetry.NewSettings(), &testMetricsCfg, consumertest.NewNop(), mockAggregate) + mp, err := NewMetrics(context.Background(), testTelemetry.NewSettings(), &testMetricsCfg, consumertest.NewNop(), mockAggregate) require.NoError(t, err) assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost())) diff --git a/processor/processorhelper/traces.go b/processor/processorhelper/traces.go index a796406be15..0891751c7a4 100644 --- a/processor/processorhelper/traces.go +++ b/processor/processorhelper/traces.go @@ -20,14 +20,14 @@ import ( // If error is returned then returned data are ignored. It MUST not call the next component. type ProcessTracesFunc func(context.Context, ptrace.Traces) (ptrace.Traces, error) -type tracesProcessor struct { +type traces struct { component.StartFunc component.ShutdownFunc consumer.Traces } -// NewTracesProcessor creates a processor.Traces that ensure context propagation and the right tags are set. -func NewTracesProcessor( +// NewTraces creates a processor.Traces that ensure context propagation and the right tags are set. +func NewTraces( _ context.Context, set processor.Settings, _ component.Config, @@ -35,7 +35,6 @@ func NewTracesProcessor( tracesFunc ProcessTracesFunc, options ...Option, ) (processor.Traces, error) { - // TODO: Add observability Traces support if tracesFunc == nil { return nil, errors.New("nil tracesFunc") } @@ -69,9 +68,12 @@ func NewTracesProcessor( return nil, err } - return &tracesProcessor{ + return &traces{ StartFunc: bs.StartFunc, ShutdownFunc: bs.ShutdownFunc, Traces: traceConsumer, }, nil } + +// Deprecated: [v0.111.0] use NewTraces. +var NewTracesProcessor = NewTraces diff --git a/processor/processorhelper/traces_test.go b/processor/processorhelper/traces_test.go index 74fbea2ed88..db2df7045e6 100644 --- a/processor/processorhelper/traces_test.go +++ b/processor/processorhelper/traces_test.go @@ -23,8 +23,8 @@ import ( var testTracesCfg = struct{}{} -func TestNewTracesProcessor(t *testing.T) { - tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(nil)) +func TestNewTraces(t *testing.T) { + tp, err := NewTraces(context.Background(), processortest.NewNopSettings(), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(nil)) require.NoError(t, err) assert.True(t, tp.Capabilities().MutatesData) @@ -33,9 +33,9 @@ func TestNewTracesProcessor(t *testing.T) { assert.NoError(t, tp.Shutdown(context.Background())) } -func TestNewTracesProcessor_WithOptions(t *testing.T) { +func TestNewTraces_WithOptions(t *testing.T) { want := errors.New("my_error") - tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(nil), + tp, err := NewTraces(context.Background(), processortest.NewNopSettings(), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(nil), WithStart(func(context.Context, component.Host) error { return want }), WithShutdown(func(context.Context) error { return want }), WithCapabilities(consumer.Capabilities{MutatesData: false})) @@ -46,20 +46,20 @@ func TestNewTracesProcessor_WithOptions(t *testing.T) { assert.False(t, tp.Capabilities().MutatesData) } -func TestNewTracesProcessor_NilRequiredFields(t *testing.T) { - _, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(), &testTracesCfg, consumertest.NewNop(), nil) +func TestNewTraces_NilRequiredFields(t *testing.T) { + _, err := NewTraces(context.Background(), processortest.NewNopSettings(), &testTracesCfg, consumertest.NewNop(), nil) assert.Error(t, err) } -func TestNewTracesProcessor_ProcessTraceError(t *testing.T) { +func TestNewTraces_ProcessTraceError(t *testing.T) { want := errors.New("my_error") - tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(want)) + tp, err := NewTraces(context.Background(), processortest.NewNopSettings(), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(want)) require.NoError(t, err) assert.Equal(t, want, tp.ConsumeTraces(context.Background(), ptrace.NewTraces())) } -func TestNewTracesProcessor_ProcessTracesErrSkipProcessingData(t *testing.T) { - tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(ErrSkipProcessingData)) +func TestNewTraces_ProcessTracesErrSkipProcessingData(t *testing.T) { + tp, err := NewTraces(context.Background(), processortest.NewNopSettings(), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(ErrSkipProcessingData)) require.NoError(t, err) assert.NoError(t, tp.ConsumeTraces(context.Background(), ptrace.NewTraces())) } @@ -70,7 +70,7 @@ func newTestTProcessor(retError error) ProcessTracesFunc { } } -func TestTracesProcessor_RecordInOut(t *testing.T) { +func TestTraces_RecordInOut(t *testing.T) { // Regardless of how many spans are ingested, emit just one mockAggregate := func(_ context.Context, _ ptrace.Traces) (ptrace.Traces, error) { td := ptrace.NewTraces() @@ -88,7 +88,7 @@ func TestTracesProcessor_RecordInOut(t *testing.T) { incomingSpans.AppendEmpty() testTelemetry := setupTestTelemetry() - tp, err := NewTracesProcessor(context.Background(), testTelemetry.NewSettings(), &testLogsCfg, consumertest.NewNop(), mockAggregate) + tp, err := NewTraces(context.Background(), testTelemetry.NewSettings(), &testLogsCfg, consumertest.NewNop(), mockAggregate) require.NoError(t, err) assert.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost())) diff --git a/processor/processorprofiles/processor.go b/processor/processorprofiles/processor.go index 85c61f941b1..78c80709c10 100644 --- a/processor/processorprofiles/processor.go +++ b/processor/processorprofiles/processor.go @@ -24,8 +24,8 @@ type Factory interface { // an error will be returned instead. CreateProfilesProcessor(ctx context.Context, set processor.Settings, cfg component.Config, nextConsumer consumerprofiles.Profiles) (Profiles, error) - // ProfilesProcessorStability gets the stability level of the ProfilesProcessor. - ProfilesProcessorStability() component.StabilityLevel + // ProfilesStability gets the stability level of the ProfilesProcessor. + ProfilesStability() component.StabilityLevel } // Profiles is a processor that can consume profiles. @@ -69,7 +69,7 @@ type factory struct { profilesStabilityLevel component.StabilityLevel } -func (f factory) ProfilesProcessorStability() component.StabilityLevel { +func (f factory) ProfilesStability() component.StabilityLevel { return f.profilesStabilityLevel } diff --git a/processor/processorprofiles/processor_test.go b/processor/processorprofiles/processor_test.go index 672e5f2c9fc..eca45f08bbb 100644 --- a/processor/processorprofiles/processor_test.go +++ b/processor/processorprofiles/processor_test.go @@ -26,7 +26,7 @@ func TestNewFactoryWithProfiles(t *testing.T) { assert.EqualValues(t, testType, factory.Type()) assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) - assert.Equal(t, component.StabilityLevelAlpha, factory.ProfilesProcessorStability()) + assert.Equal(t, component.StabilityLevelAlpha, factory.ProfilesStability()) _, err := factory.CreateProfilesProcessor(context.Background(), processor.Settings{}, &defaultCfg, consumertest.NewNop()) assert.NoError(t, err) } diff --git a/processor/processortest/nop_processor.go b/processor/processortest/nop_processor.go index 4d2e1fd4c50..9a5937e60e5 100644 --- a/processor/processortest/nop_processor.go +++ b/processor/processortest/nop_processor.go @@ -19,7 +19,7 @@ import ( var nopType = component.MustNewType("nop") -// NewNopSettings returns a new nop settings for Create*Processor functions. +// NewNopSettings returns a new nop settings for Create* functions. func NewNopSettings() processor.Settings { return processor.Settings{ ID: component.NewIDWithName(nopType, uuid.NewString()), @@ -33,37 +33,37 @@ func NewNopFactory() processor.Factory { return processorprofiles.NewFactory( nopType, func() component.Config { return &nopConfig{} }, - processorprofiles.WithTraces(createTracesProcessor, component.StabilityLevelStable), - processorprofiles.WithMetrics(createMetricsProcessor, component.StabilityLevelStable), - processorprofiles.WithLogs(createLogsProcessor, component.StabilityLevelStable), - processorprofiles.WithProfiles(createProfilesProcessor, component.StabilityLevelAlpha), + processorprofiles.WithTraces(createTraces, component.StabilityLevelStable), + processorprofiles.WithMetrics(createMetrics, component.StabilityLevelStable), + processorprofiles.WithLogs(createLogs, component.StabilityLevelStable), + processorprofiles.WithProfiles(createProfiles, component.StabilityLevelAlpha), ) } -func createTracesProcessor(context.Context, processor.Settings, component.Config, consumer.Traces) (processor.Traces, error) { +func createTraces(context.Context, processor.Settings, component.Config, consumer.Traces) (processor.Traces, error) { return nopInstance, nil } -func createMetricsProcessor(context.Context, processor.Settings, component.Config, consumer.Metrics) (processor.Metrics, error) { +func createMetrics(context.Context, processor.Settings, component.Config, consumer.Metrics) (processor.Metrics, error) { return nopInstance, nil } -func createLogsProcessor(context.Context, processor.Settings, component.Config, consumer.Logs) (processor.Logs, error) { +func createLogs(context.Context, processor.Settings, component.Config, consumer.Logs) (processor.Logs, error) { return nopInstance, nil } -func createProfilesProcessor(context.Context, processor.Settings, component.Config, consumerprofiles.Profiles) (processorprofiles.Profiles, error) { +func createProfiles(context.Context, processor.Settings, component.Config, consumerprofiles.Profiles) (processorprofiles.Profiles, error) { return nopInstance, nil } type nopConfig struct{} -var nopInstance = &nopProcessor{ +var nopInstance = &nop{ Consumer: consumertest.NewNop(), } -// nopProcessor acts as a processor for testing purposes. -type nopProcessor struct { +// nop acts as a processor for testing purposes. +type nop struct { component.StartFunc component.ShutdownFunc consumertest.Consumer diff --git a/processor/processortest/nop_processor_test.go b/processor/processortest/nop_processor_test.go index 35b17c94d14..848b7408aa0 100644 --- a/processor/processortest/nop_processor_test.go +++ b/processor/processortest/nop_processor_test.go @@ -28,21 +28,21 @@ func TestNewNopFactory(t *testing.T) { cfg := factory.CreateDefaultConfig() assert.Equal(t, &nopConfig{}, cfg) - traces, err := factory.CreateTracesProcessor(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) + traces, err := factory.CreateTraces(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, consumer.Capabilities{MutatesData: false}, traces.Capabilities()) assert.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, traces.ConsumeTraces(context.Background(), ptrace.NewTraces())) assert.NoError(t, traces.Shutdown(context.Background())) - metrics, err := factory.CreateMetricsProcessor(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) + metrics, err := factory.CreateMetrics(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, consumer.Capabilities{MutatesData: false}, metrics.Capabilities()) assert.NoError(t, metrics.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pmetric.NewMetrics())) assert.NoError(t, metrics.Shutdown(context.Background())) - logs, err := factory.CreateLogsProcessor(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) + logs, err := factory.CreateLogs(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, consumer.Capabilities{MutatesData: false}, logs.Capabilities()) assert.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost())) diff --git a/processor/processortest/shutdown_verifier.go b/processor/processortest/shutdown_verifier.go index 5561991f576..9fe4b9a048c 100644 --- a/processor/processortest/shutdown_verifier.go +++ b/processor/processortest/shutdown_verifier.go @@ -22,7 +22,7 @@ import ( func verifyTracesDoesNotProduceAfterShutdown(t *testing.T, factory processor.Factory, cfg component.Config) { // Create a proc and output its produce to a sink. nextSink := new(consumertest.TracesSink) - proc, err := factory.CreateTracesProcessor(context.Background(), NewNopSettings(), cfg, nextSink) + proc, err := factory.CreateTraces(context.Background(), NewNopSettings(), cfg, nextSink) if errors.Is(err, pipeline.ErrSignalNotSupported) { return } @@ -46,7 +46,7 @@ func verifyTracesDoesNotProduceAfterShutdown(t *testing.T, factory processor.Fac func verifyLogsDoesNotProduceAfterShutdown(t *testing.T, factory processor.Factory, cfg component.Config) { // Create a proc and output its produce to a sink. nextSink := new(consumertest.LogsSink) - proc, err := factory.CreateLogsProcessor(context.Background(), NewNopSettings(), cfg, nextSink) + proc, err := factory.CreateLogs(context.Background(), NewNopSettings(), cfg, nextSink) if errors.Is(err, pipeline.ErrSignalNotSupported) { return } @@ -70,7 +70,7 @@ func verifyLogsDoesNotProduceAfterShutdown(t *testing.T, factory processor.Facto func verifyMetricsDoesNotProduceAfterShutdown(t *testing.T, factory processor.Factory, cfg component.Config) { // Create a proc and output its produce to a sink. nextSink := new(consumertest.MetricsSink) - proc, err := factory.CreateMetricsProcessor(context.Background(), NewNopSettings(), cfg, nextSink) + proc, err := factory.CreateMetrics(context.Background(), NewNopSettings(), cfg, nextSink) if errors.Is(err, pipeline.ErrSignalNotSupported) { return } diff --git a/processor/processortest/shutdown_verifier_test.go b/processor/processortest/shutdown_verifier_test.go index c65e251c231..f6d4c7376b1 100644 --- a/processor/processortest/shutdown_verifier_test.go +++ b/processor/processortest/shutdown_verifier_test.go @@ -19,9 +19,9 @@ func TestShutdownVerifier(t *testing.T) { f := processor.NewFactory( component.MustNewType("passthrough"), func() component.Config { return struct{}{} }, - processor.WithTraces(createPassthroughTracesProcessor, component.StabilityLevelStable), - processor.WithMetrics(createPassthroughMetricsProcessor, component.StabilityLevelStable), - processor.WithLogs(createPassthroughLogsProcessor, component.StabilityLevelStable), + processor.WithTraces(createPassthroughTraces, component.StabilityLevelStable), + processor.WithMetrics(createPassthroughMetrics, component.StabilityLevelStable), + processor.WithLogs(createPassthroughLogs, component.StabilityLevelStable), ) VerifyShutdown(t, f, &struct{}{}) } @@ -30,7 +30,7 @@ func TestShutdownVerifierLogsOnly(t *testing.T) { f := processor.NewFactory( component.MustNewType("passthrough"), func() component.Config { return struct{}{} }, - processor.WithLogs(createPassthroughLogsProcessor, component.StabilityLevelStable), + processor.WithLogs(createPassthroughLogs, component.StabilityLevelStable), ) VerifyShutdown(t, f, &struct{}{}) } @@ -39,7 +39,7 @@ func TestShutdownVerifierMetricsOnly(t *testing.T) { f := processor.NewFactory( component.MustNewType("passthrough"), func() component.Config { return struct{}{} }, - processor.WithMetrics(createPassthroughMetricsProcessor, component.StabilityLevelStable), + processor.WithMetrics(createPassthroughMetrics, component.StabilityLevelStable), ) VerifyShutdown(t, f, &struct{}{}) } @@ -48,56 +48,56 @@ func TestShutdownVerifierTracesOnly(t *testing.T) { f := processor.NewFactory( component.MustNewType("passthrough"), func() component.Config { return struct{}{} }, - processor.WithTraces(createPassthroughTracesProcessor, component.StabilityLevelStable), + processor.WithTraces(createPassthroughTraces, component.StabilityLevelStable), ) VerifyShutdown(t, f, &struct{}{}) } -type passthroughProcessor struct { +type passthrough struct { processor.Traces nextLogs consumer.Logs nextMetrics consumer.Metrics nextTraces consumer.Traces } -func (passthroughProcessor) Start(context.Context, component.Host) error { +func (passthrough) Start(context.Context, component.Host) error { return nil } -func (passthroughProcessor) Shutdown(context.Context) error { +func (passthrough) Shutdown(context.Context) error { return nil } -func (passthroughProcessor) Capabilities() consumer.Capabilities { +func (passthrough) Capabilities() consumer.Capabilities { return consumer.Capabilities{} } -func createPassthroughLogsProcessor(_ context.Context, _ processor.Settings, _ component.Config, logs consumer.Logs) (processor.Logs, error) { - return passthroughProcessor{ +func createPassthroughLogs(_ context.Context, _ processor.Settings, _ component.Config, logs consumer.Logs) (processor.Logs, error) { + return passthrough{ nextLogs: logs, }, nil } -func createPassthroughMetricsProcessor(_ context.Context, _ processor.Settings, _ component.Config, metrics consumer.Metrics) (processor.Metrics, error) { - return passthroughProcessor{ +func createPassthroughMetrics(_ context.Context, _ processor.Settings, _ component.Config, metrics consumer.Metrics) (processor.Metrics, error) { + return passthrough{ nextMetrics: metrics, }, nil } -func createPassthroughTracesProcessor(_ context.Context, _ processor.Settings, _ component.Config, traces consumer.Traces) (processor.Traces, error) { - return passthroughProcessor{ +func createPassthroughTraces(_ context.Context, _ processor.Settings, _ component.Config, traces consumer.Traces) (processor.Traces, error) { + return passthrough{ nextTraces: traces, }, nil } -func (p passthroughProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { +func (p passthrough) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { return p.nextTraces.ConsumeTraces(ctx, td) } -func (p passthroughProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { +func (p passthrough) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { return p.nextMetrics.ConsumeMetrics(ctx, md) } -func (p passthroughProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error { +func (p passthrough) ConsumeLogs(ctx context.Context, ld plog.Logs) error { return p.nextLogs.ConsumeLogs(ctx, ld) } diff --git a/processor/processortest/unhealthy_processor.go b/processor/processortest/unhealthy_processor.go index 3db2a10b667..950792d26c7 100644 --- a/processor/processortest/unhealthy_processor.go +++ b/processor/processortest/unhealthy_processor.go @@ -14,7 +14,7 @@ import ( "go.opentelemetry.io/collector/processor" ) -// NewUnhealthyProcessorCreateSettings returns a new nop settings for Create*Processor functions. +// NewUnhealthyProcessorCreateSettings returns a new nop settings for Create* functions. func NewUnhealthyProcessorCreateSettings() processor.Settings { return processor.Settings{ TelemetrySettings: componenttest.NewNopTelemetrySettings(), @@ -22,48 +22,48 @@ func NewUnhealthyProcessorCreateSettings() processor.Settings { } } -// NewUnhealthyProcessorFactory returns a component.ProcessorFactory that constructs nop processors. +// NewUnhealthyProcessorFactory returns a processor.Factory that constructs nop processors. func NewUnhealthyProcessorFactory() processor.Factory { return processor.NewFactory( component.MustNewType("unhealthy"), func() component.Config { return &struct{}{} }, - processor.WithTraces(createUnhealthyTracesProcessor, component.StabilityLevelStable), - processor.WithMetrics(createUnhealthyMetricsProcessor, component.StabilityLevelStable), - processor.WithLogs(createUnhealthyLogsProcessor, component.StabilityLevelStable), + processor.WithTraces(createUnhealthyTraces, component.StabilityLevelStable), + processor.WithMetrics(createUnhealthyMetrics, component.StabilityLevelStable), + processor.WithLogs(createUnhealthyLogs, component.StabilityLevelStable), ) } -func createUnhealthyTracesProcessor(_ context.Context, set processor.Settings, _ component.Config, _ consumer.Traces) (processor.Traces, error) { - return &unhealthyProcessor{ +func createUnhealthyTraces(_ context.Context, set processor.Settings, _ component.Config, _ consumer.Traces) (processor.Traces, error) { + return &unhealthy{ Consumer: consumertest.NewNop(), telemetry: set.TelemetrySettings, }, nil } -func createUnhealthyMetricsProcessor(_ context.Context, set processor.Settings, _ component.Config, _ consumer.Metrics) (processor.Metrics, error) { - return &unhealthyProcessor{ +func createUnhealthyMetrics(_ context.Context, set processor.Settings, _ component.Config, _ consumer.Metrics) (processor.Metrics, error) { + return &unhealthy{ Consumer: consumertest.NewNop(), telemetry: set.TelemetrySettings, }, nil } -func createUnhealthyLogsProcessor(_ context.Context, set processor.Settings, _ component.Config, _ consumer.Logs) (processor.Logs, error) { - return &unhealthyProcessor{ +func createUnhealthyLogs(_ context.Context, set processor.Settings, _ component.Config, _ consumer.Logs) (processor.Logs, error) { + return &unhealthy{ Consumer: consumertest.NewNop(), telemetry: set.TelemetrySettings, }, nil } -type unhealthyProcessor struct { +type unhealthy struct { component.StartFunc component.ShutdownFunc consumertest.Consumer telemetry component.TelemetrySettings } -func (p unhealthyProcessor) Start(_ context.Context, host component.Host) error { +func (p unhealthy) Start(_ context.Context, host component.Host) error { go func() { componentstatus.ReportStatus(host, componentstatus.NewEvent(componentstatus.StatusRecoverableError)) }() diff --git a/processor/processortest/unhealthy_processor_test.go b/processor/processortest/unhealthy_processor_test.go index f00c4236a19..cd4859aa11d 100644 --- a/processor/processortest/unhealthy_processor_test.go +++ b/processor/processortest/unhealthy_processor_test.go @@ -26,21 +26,21 @@ func TestNewUnhealthyProcessorFactory(t *testing.T) { cfg := factory.CreateDefaultConfig() assert.Equal(t, &struct{}{}, cfg) - traces, err := factory.CreateTracesProcessor(context.Background(), NewUnhealthyProcessorCreateSettings(), cfg, consumertest.NewNop()) + traces, err := factory.CreateTraces(context.Background(), NewUnhealthyProcessorCreateSettings(), cfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, consumer.Capabilities{MutatesData: false}, traces.Capabilities()) assert.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, traces.ConsumeTraces(context.Background(), ptrace.NewTraces())) assert.NoError(t, traces.Shutdown(context.Background())) - metrics, err := factory.CreateMetricsProcessor(context.Background(), NewUnhealthyProcessorCreateSettings(), cfg, consumertest.NewNop()) + metrics, err := factory.CreateMetrics(context.Background(), NewUnhealthyProcessorCreateSettings(), cfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, consumer.Capabilities{MutatesData: false}, metrics.Capabilities()) assert.NoError(t, metrics.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pmetric.NewMetrics())) assert.NoError(t, metrics.Shutdown(context.Background())) - logs, err := factory.CreateLogsProcessor(context.Background(), NewUnhealthyProcessorCreateSettings(), cfg, consumertest.NewNop()) + logs, err := factory.CreateLogs(context.Background(), NewUnhealthyProcessorCreateSettings(), cfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, consumer.Capabilities{MutatesData: false}, logs.Capabilities()) assert.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost())) diff --git a/service/internal/builders/processor.go b/service/internal/builders/processor.go index 052d95063dc..8353767e7be 100644 --- a/service/internal/builders/processor.go +++ b/service/internal/builders/processor.go @@ -43,8 +43,8 @@ func (b *ProcessorBuilder) CreateTraces(ctx context.Context, set processor.Setti return nil, fmt.Errorf("processor factory not available for: %q", set.ID) } - logStabilityLevel(set.Logger, f.TracesProcessorStability()) - return f.CreateTracesProcessor(ctx, set, cfg, next) + logStabilityLevel(set.Logger, f.TracesStability()) + return f.CreateTraces(ctx, set, cfg, next) } // CreateMetrics creates a Metrics processor based on the settings and config. @@ -62,8 +62,8 @@ func (b *ProcessorBuilder) CreateMetrics(ctx context.Context, set processor.Sett return nil, fmt.Errorf("processor factory not available for: %q", set.ID) } - logStabilityLevel(set.Logger, f.MetricsProcessorStability()) - return f.CreateMetricsProcessor(ctx, set, cfg, next) + logStabilityLevel(set.Logger, f.MetricsStability()) + return f.CreateMetrics(ctx, set, cfg, next) } // CreateLogs creates a Logs processor based on the settings and config. @@ -81,8 +81,8 @@ func (b *ProcessorBuilder) CreateLogs(ctx context.Context, set processor.Setting return nil, fmt.Errorf("processor factory not available for: %q", set.ID) } - logStabilityLevel(set.Logger, f.LogsProcessorStability()) - return f.CreateLogsProcessor(ctx, set, cfg, next) + logStabilityLevel(set.Logger, f.LogsStability()) + return f.CreateLogs(ctx, set, cfg, next) } // CreateProfiles creates a Profiles processor based on the settings and config. @@ -104,7 +104,7 @@ func (b *ProcessorBuilder) CreateProfiles(ctx context.Context, set processor.Set if !ok { return nil, pipeline.ErrSignalNotSupported } - logStabilityLevel(set.Logger, f.ProfilesProcessorStability()) + logStabilityLevel(set.Logger, f.ProfilesStability()) return f.CreateProfilesProcessor(ctx, set, cfg, next) } diff --git a/service/internal/builders/processor_test.go b/service/internal/builders/processor_test.go index 425f3c89152..88032ef7d19 100644 --- a/service/internal/builders/processor_test.go +++ b/service/internal/builders/processor_test.go @@ -189,19 +189,19 @@ func TestNewNopProcessorBuilder(t *testing.T) { set := processortest.NewNopSettings() set.ID = component.NewID(nopType) - traces, err := factory.CreateTracesProcessor(context.Background(), set, cfg, consumertest.NewNop()) + traces, err := factory.CreateTraces(context.Background(), set, cfg, consumertest.NewNop()) require.NoError(t, err) bTraces, err := builder.CreateTraces(context.Background(), set, consumertest.NewNop()) require.NoError(t, err) assert.IsType(t, traces, bTraces) - metrics, err := factory.CreateMetricsProcessor(context.Background(), set, cfg, consumertest.NewNop()) + metrics, err := factory.CreateMetrics(context.Background(), set, cfg, consumertest.NewNop()) require.NoError(t, err) bMetrics, err := builder.CreateMetrics(context.Background(), set, consumertest.NewNop()) require.NoError(t, err) assert.IsType(t, metrics, bMetrics) - logs, err := factory.CreateLogsProcessor(context.Background(), set, cfg, consumertest.NewNop()) + logs, err := factory.CreateLogs(context.Background(), set, cfg, consumertest.NewNop()) require.NoError(t, err) bLogs, err := builder.CreateLogs(context.Background(), set, consumertest.NewNop()) require.NoError(t, err) diff --git a/service/internal/testcomponents/example_processor.go b/service/internal/testcomponents/example_processor.go index 950c9ed8078..18716d3e812 100644 --- a/service/internal/testcomponents/example_processor.go +++ b/service/internal/testcomponents/example_processor.go @@ -19,9 +19,9 @@ var procType = component.MustNewType("exampleprocessor") var ExampleProcessorFactory = processorprofiles.NewFactory( procType, createDefaultConfig, - processorprofiles.WithTraces(createTracesProcessor, component.StabilityLevelDevelopment), - processorprofiles.WithMetrics(createMetricsProcessor, component.StabilityLevelDevelopment), - processorprofiles.WithLogs(createLogsProcessor, component.StabilityLevelDevelopment), + processorprofiles.WithTraces(createTraces, component.StabilityLevelDevelopment), + processorprofiles.WithMetrics(createMetrics, component.StabilityLevelDevelopment), + processorprofiles.WithLogs(createLogs, component.StabilityLevelDevelopment), processorprofiles.WithProfiles(createProfilesProcessor, component.StabilityLevelDevelopment), ) @@ -30,21 +30,21 @@ func createDefaultConfig() component.Config { return &struct{}{} } -func createTracesProcessor(_ context.Context, set processor.Settings, _ component.Config, nextConsumer consumer.Traces) (processor.Traces, error) { +func createTraces(_ context.Context, set processor.Settings, _ component.Config, nextConsumer consumer.Traces) (processor.Traces, error) { return &ExampleProcessor{ ConsumeTracesFunc: nextConsumer.ConsumeTraces, mutatesData: set.ID.Name() == "mutate", }, nil } -func createMetricsProcessor(_ context.Context, set processor.Settings, _ component.Config, nextConsumer consumer.Metrics) (processor.Metrics, error) { +func createMetrics(_ context.Context, set processor.Settings, _ component.Config, nextConsumer consumer.Metrics) (processor.Metrics, error) { return &ExampleProcessor{ ConsumeMetricsFunc: nextConsumer.ConsumeMetrics, mutatesData: set.ID.Name() == "mutate", }, nil } -func createLogsProcessor(_ context.Context, set processor.Settings, _ component.Config, nextConsumer consumer.Logs) (processor.Logs, error) { +func createLogs(_ context.Context, set processor.Settings, _ component.Config, nextConsumer consumer.Logs) (processor.Logs, error) { return &ExampleProcessor{ ConsumeLogsFunc: nextConsumer.ConsumeLogs, mutatesData: set.ID.Name() == "mutate",