From 632461fb918917e950303d5c4e041dd50a88f7a6 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Fri, 27 Sep 2024 11:07:54 -0700 Subject: [PATCH] [chore] Move back processor definitions, make profile embed processor (#11286) Same as https://github.com/open-telemetry/opentelemetry-collector/pull/11254 but for processor Signed-off-by: Bogdan Drutu --- processor/internal/factory.go | 214 ------------------ processor/internal/logs.go | 15 -- processor/internal/metrics.go | 15 -- processor/internal/processor.go | 17 -- processor/internal/profiles.go | 15 -- processor/internal/traces.go | 15 -- processor/processor.go | 175 ++++++++++++-- processor/processorprofiles/go.mod | 2 +- processor/processorprofiles/processor.go | 116 +++++++++- processor/processorprofiles/processor_test.go | 2 +- processor/processortest/nop_processor.go | 8 +- processor/processortest/nop_processor_test.go | 3 +- service/internal/builders/processor.go | 7 +- service/internal/builders/processor_test.go | 18 +- service/internal/graph/graph_test.go | 8 +- .../testcomponents/example_processor.go | 8 +- 16 files changed, 302 insertions(+), 336 deletions(-) delete mode 100644 processor/internal/factory.go delete mode 100644 processor/internal/logs.go delete mode 100644 processor/internal/metrics.go delete mode 100644 processor/internal/processor.go delete mode 100644 processor/internal/profiles.go delete mode 100644 processor/internal/traces.go diff --git a/processor/internal/factory.go b/processor/internal/factory.go deleted file mode 100644 index 2f6729ca2fe..00000000000 --- a/processor/internal/factory.go +++ /dev/null @@ -1,214 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package internal // import "go.opentelemetry.io/collector/processor/internal" - -import ( - "context" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumerprofiles" - "go.opentelemetry.io/collector/pipeline" -) - -// Factory is a Factory interface for processors. -// -// This interface cannot be directly implemented. Implementations must -// use the NewProcessorFactory to implement it. -type Factory interface { - component.Factory - - // CreateTracesProcessor creates a TracesProcessor based on this config. - // If the processor type does not support traces, - // this function returns the error [pipeline.ErrSignalNotSupported]. - // Implementers can assume `nextConsumer` is never nil. - CreateTracesProcessor(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Traces) (Traces, error) - - // TracesProcessorStability gets the stability level of the TracesProcessor. - TracesProcessorStability() component.StabilityLevel - - // CreateMetricsProcessor creates a MetricsProcessor based on this config. - // If the processor type does not support metrics, - // this function returns the error [pipeline.ErrSignalNotSupported]. - // Implementers can assume `nextConsumer` is never nil. - CreateMetricsProcessor(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Metrics) (Metrics, error) - - // MetricsProcessorStability gets the stability level of the MetricsProcessor. - MetricsProcessorStability() component.StabilityLevel - - // CreateLogsProcessor creates a LogsProcessor based on the config. - // If the processor type does not support logs, - // this function returns the error [pipeline.ErrSignalNotSupported]. - // Implementers can assume `nextConsumer` is never nil. - CreateLogsProcessor(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Logs) (Logs, error) - - // LogsProcessorStability gets the stability level of the LogsProcessor. - LogsProcessorStability() component.StabilityLevel - - // CreateProfilesProcessor creates a ProfilesProcessor based on this config. - // If the processor type does not support tracing or if the config is not valid, - // an error will be returned instead. - CreateProfilesProcessor(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumerprofiles.Profiles) (Profiles, error) - - // ProfilesProcessorStability gets the stability level of the ProfilesProcessor. - ProfilesProcessorStability() component.StabilityLevel - - unexportedFactoryFunc() -} - -// FactoryOption apply changes to Options. -type FactoryOption interface { - // applyProcessorFactoryOption applies the option. - applyProcessorFactoryOption(o *factory) -} - -var _ FactoryOption = (*factoryOptionFunc)(nil) - -// factoryOptionFunc is a FactoryOption created through a function. -type factoryOptionFunc func(*factory) - -func (f factoryOptionFunc) applyProcessorFactoryOption(o *factory) { - f(o) -} - -// CreateTracesFunc is the equivalent of Factory.CreateTraces(). -type CreateTracesFunc func(context.Context, Settings, component.Config, consumer.Traces) (Traces, error) - -// CreateTracesProcessor implements Factory.CreateTracesProcessor(). -func (f CreateTracesFunc) CreateTracesProcessor( - ctx context.Context, - set Settings, - cfg component.Config, - nextConsumer consumer.Traces) (Traces, error) { - if f == nil { - return nil, pipeline.ErrSignalNotSupported - } - return f(ctx, set, cfg, nextConsumer) -} - -// CreateMetricsFunc is the equivalent of Factory.CreateMetrics(). -type CreateMetricsFunc func(context.Context, Settings, component.Config, consumer.Metrics) (Metrics, error) - -// CreateMetricsProcessor implements Factory.CreateMetricsProcessor(). -func (f CreateMetricsFunc) CreateMetricsProcessor( - ctx context.Context, - set Settings, - cfg component.Config, - nextConsumer consumer.Metrics, -) (Metrics, error) { - if f == nil { - return nil, pipeline.ErrSignalNotSupported - } - return f(ctx, set, cfg, nextConsumer) -} - -// CreateLogsFunc is the equivalent of Factory.CreateLogs(). -type CreateLogsFunc func(context.Context, Settings, component.Config, consumer.Logs) (Logs, error) - -// CreateLogsProcessor implements Factory.CreateLogsProcessor(). -func (f CreateLogsFunc) CreateLogsProcessor( - ctx context.Context, - set Settings, - cfg component.Config, - nextConsumer consumer.Logs, -) (Logs, error) { - if f == nil { - return nil, pipeline.ErrSignalNotSupported - } - return f(ctx, set, cfg, nextConsumer) -} - -// CreateProfilesFunc is the equivalent of Factory.CreateProfiles(). -type CreateProfilesFunc func(context.Context, Settings, component.Config, consumerprofiles.Profiles) (Profiles, error) - -// CreateProfilesProcessor implements Factory.CreateProfilesProcessor(). -func (f CreateProfilesFunc) CreateProfilesProcessor( - ctx context.Context, - set Settings, - cfg component.Config, - nextConsumer consumerprofiles.Profiles) (Profiles, error) { - if f == nil { - return nil, pipeline.ErrSignalNotSupported - } - return f(ctx, set, cfg, nextConsumer) -} - -type factory struct { - cfgType component.Type - component.CreateDefaultConfigFunc - CreateTracesFunc - tracesStabilityLevel component.StabilityLevel - CreateMetricsFunc - metricsStabilityLevel component.StabilityLevel - CreateLogsFunc - logsStabilityLevel component.StabilityLevel - CreateProfilesFunc - profilesStabilityLevel component.StabilityLevel -} - -func (f *factory) Type() component.Type { - return f.cfgType -} - -func (f *factory) unexportedFactoryFunc() {} - -func (f factory) TracesProcessorStability() component.StabilityLevel { - return f.tracesStabilityLevel -} - -func (f factory) MetricsProcessorStability() component.StabilityLevel { - return f.metricsStabilityLevel -} - -func (f factory) LogsProcessorStability() component.StabilityLevel { - return f.logsStabilityLevel -} - -func (f factory) ProfilesProcessorStability() component.StabilityLevel { - return f.profilesStabilityLevel -} - -// WithTraces overrides the default "error not supported" implementation for CreateTraces and the default "undefined" stability level. -func WithTraces(createTraces CreateTracesFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factory) { - o.tracesStabilityLevel = sl - o.CreateTracesFunc = createTraces - }) -} - -// WithMetrics overrides the default "error not supported" implementation for CreateMetrics and the default "undefined" stability level. -func WithMetrics(createMetrics CreateMetricsFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factory) { - o.metricsStabilityLevel = sl - o.CreateMetricsFunc = createMetrics - }) -} - -// WithLogs overrides the default "error not supported" implementation for CreateLogs and the default "undefined" stability level. -func WithLogs(createLogs CreateLogsFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factory) { - o.logsStabilityLevel = sl - o.CreateLogsFunc = createLogs - }) -} - -// WithProfiles overrides the default "error not supported" implementation for CreateProfiles and the default "undefined" stability level. -func WithProfiles(createProfiles CreateProfilesFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factory) { - o.profilesStabilityLevel = sl - o.CreateProfilesFunc = createProfiles - }) -} - -// NewFactory returns a Factory. -func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory { - f := &factory{ - cfgType: cfgType, - CreateDefaultConfigFunc: createDefaultConfig, - } - for _, opt := range options { - opt.applyProcessorFactoryOption(f) - } - return f -} diff --git a/processor/internal/logs.go b/processor/internal/logs.go deleted file mode 100644 index defced909ad..00000000000 --- a/processor/internal/logs.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package internal // import "go.opentelemetry.io/collector/processor/internal" - -import ( - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" -) - -// Logs is a processor that can consume logs. -type Logs interface { - component.Component - consumer.Logs -} diff --git a/processor/internal/metrics.go b/processor/internal/metrics.go deleted file mode 100644 index b1265e43b1a..00000000000 --- a/processor/internal/metrics.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package internal // import "go.opentelemetry.io/collector/processor/internal" - -import ( - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" -) - -// Metrics is a processor that can consume metrics. -type Metrics interface { - component.Component - consumer.Metrics -} diff --git a/processor/internal/processor.go b/processor/internal/processor.go deleted file mode 100644 index 4e7a07a18b8..00000000000 --- a/processor/internal/processor.go +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package internal // import "go.opentelemetry.io/collector/processor/internal" - -import "go.opentelemetry.io/collector/component" - -// Settings is passed to Create* functions in Factory. -type Settings struct { - // ID returns the ID of the component that will be created. - ID component.ID - - component.TelemetrySettings - - // BuildInfo can be used by components for informational purposes - BuildInfo component.BuildInfo -} diff --git a/processor/internal/profiles.go b/processor/internal/profiles.go deleted file mode 100644 index e5e955e06e6..00000000000 --- a/processor/internal/profiles.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package internal // import "go.opentelemetry.io/collector/processor/internal" - -import ( - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer/consumerprofiles" -) - -// Profiles is a processor that can consume profiles. -type Profiles interface { - component.Component - consumerprofiles.Profiles -} diff --git a/processor/internal/traces.go b/processor/internal/traces.go deleted file mode 100644 index a1fe0ced3cb..00000000000 --- a/processor/internal/traces.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package internal // import "go.opentelemetry.io/collector/processor/internal" - -import ( - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" -) - -// Traces is a processor that can consume traces. -type Traces interface { - component.Component - consumer.Traces -} diff --git a/processor/processor.go b/processor/processor.go index 72233b425b3..6ded6505e2c 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -4,60 +4,205 @@ package processor // import "go.opentelemetry.io/collector/processor" import ( + "context" "fmt" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/processor/internal" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pipeline" ) // Traces is a processor that can consume traces. -type Traces = internal.Traces +type Traces interface { + component.Component + consumer.Traces +} // Metrics is a processor that can consume metrics. -type Metrics = internal.Metrics +type Metrics interface { + component.Component + consumer.Metrics +} // Logs is a processor that can consume logs. -type Logs = internal.Logs +type Logs interface { + component.Component + consumer.Logs +} // Settings is passed to Create* functions in Factory. -type Settings = internal.Settings +type Settings struct { + // ID returns the ID of the component that will be created. + ID component.ID + + component.TelemetrySettings + + // BuildInfo can be used by components for informational purposes + BuildInfo component.BuildInfo +} // Factory is Factory interface for processors. // // This interface cannot be directly implemented. Implementations must -// use the NewProcessorFactory to implement it. -type Factory = internal.Factory +// use the NewFactory to implement it. +type Factory interface { + component.Factory + + // CreateTracesProcessor creates a TracesProcessor based on this config. + // If the processor type does not support traces, + // this function returns the error [pipeline.ErrSignalNotSupported]. + // Implementers can assume `nextConsumer` is never nil. + CreateTracesProcessor(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Traces) (Traces, error) + + // TracesProcessorStability gets the stability level of the TracesProcessor. + TracesProcessorStability() component.StabilityLevel + + // CreateMetricsProcessor creates a MetricsProcessor based on this config. + // If the processor type does not support metrics, + // this function returns the error [pipeline.ErrSignalNotSupported]. + // Implementers can assume `nextConsumer` is never nil. + CreateMetricsProcessor(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Metrics) (Metrics, error) + + // MetricsProcessorStability gets the stability level of the MetricsProcessor. + MetricsProcessorStability() component.StabilityLevel + + // CreateLogsProcessor creates a LogsProcessor based on the config. + // If the processor type does not support logs, + // this function returns the error [pipeline.ErrSignalNotSupported]. + // Implementers can assume `nextConsumer` is never nil. + CreateLogsProcessor(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Logs) (Logs, error) + + // LogsProcessorStability gets the stability level of the LogsProcessor. + LogsProcessorStability() component.StabilityLevel + + unexportedFactoryFunc() +} // FactoryOption apply changes to Options. -type FactoryOption = internal.FactoryOption +type FactoryOption interface { + // applyProcessorFactoryOption applies the option. + applyProcessorFactoryOption(o *factory) +} + +var _ FactoryOption = (*factoryOptionFunc)(nil) + +// factoryOptionFunc is a FactoryOption created through a function. +type factoryOptionFunc func(*factory) + +func (f factoryOptionFunc) applyProcessorFactoryOption(o *factory) { + f(o) +} + +type factory struct { + cfgType component.Type + component.CreateDefaultConfigFunc + CreateTracesFunc + tracesStabilityLevel component.StabilityLevel + CreateMetricsFunc + metricsStabilityLevel component.StabilityLevel + CreateLogsFunc + logsStabilityLevel component.StabilityLevel +} + +func (f *factory) Type() component.Type { + return f.cfgType +} + +func (f *factory) unexportedFactoryFunc() {} + +func (f factory) TracesProcessorStability() component.StabilityLevel { + return f.tracesStabilityLevel +} + +func (f factory) MetricsProcessorStability() component.StabilityLevel { + return f.metricsStabilityLevel +} + +func (f factory) LogsProcessorStability() component.StabilityLevel { + return f.logsStabilityLevel +} // CreateTracesFunc is the equivalent of Factory.CreateTraces(). -type CreateTracesFunc = internal.CreateTracesFunc +type CreateTracesFunc func(context.Context, Settings, component.Config, consumer.Traces) (Traces, error) + +// CreateTracesProcessor implements Factory.CreateTracesProcessor(). +func (f CreateTracesFunc) CreateTracesProcessor( + ctx context.Context, + set Settings, + cfg component.Config, + nextConsumer consumer.Traces) (Traces, error) { + if f == nil { + return nil, pipeline.ErrSignalNotSupported + } + return f(ctx, set, cfg, nextConsumer) +} // CreateMetricsFunc is the equivalent of Factory.CreateMetrics(). -type CreateMetricsFunc = internal.CreateMetricsFunc +type CreateMetricsFunc func(context.Context, Settings, component.Config, consumer.Metrics) (Metrics, error) + +// CreateMetricsProcessor implements Factory.CreateMetricsProcessor(). +func (f CreateMetricsFunc) CreateMetricsProcessor( + ctx context.Context, + set Settings, + cfg component.Config, + nextConsumer consumer.Metrics, +) (Metrics, error) { + if f == nil { + return nil, pipeline.ErrSignalNotSupported + } + return f(ctx, set, cfg, nextConsumer) +} // CreateLogsFunc is the equivalent of Factory.CreateLogs(). -type CreateLogsFunc = internal.CreateLogsFunc +type CreateLogsFunc func(context.Context, Settings, component.Config, consumer.Logs) (Logs, error) + +// CreateLogsProcessor implements Factory.CreateLogsProcessor(). +func (f CreateLogsFunc) CreateLogsProcessor( + ctx context.Context, + set Settings, + cfg component.Config, + nextConsumer consumer.Logs, +) (Logs, error) { + if f == nil { + return nil, pipeline.ErrSignalNotSupported + } + return f(ctx, set, cfg, nextConsumer) +} // WithTraces overrides the default "error not supported" implementation for CreateTraces and the default "undefined" stability level. func WithTraces(createTraces CreateTracesFunc, sl component.StabilityLevel) FactoryOption { - return internal.WithTraces(createTraces, sl) + return factoryOptionFunc(func(o *factory) { + o.tracesStabilityLevel = sl + o.CreateTracesFunc = createTraces + }) } // WithMetrics overrides the default "error not supported" implementation for CreateMetrics and the default "undefined" stability level. func WithMetrics(createMetrics CreateMetricsFunc, sl component.StabilityLevel) FactoryOption { - return internal.WithMetrics(createMetrics, sl) + return factoryOptionFunc(func(o *factory) { + o.metricsStabilityLevel = sl + o.CreateMetricsFunc = createMetrics + }) } // WithLogs overrides the default "error not supported" implementation for CreateLogs and the default "undefined" stability level. func WithLogs(createLogs CreateLogsFunc, sl component.StabilityLevel) FactoryOption { - return internal.WithLogs(createLogs, sl) + return factoryOptionFunc(func(o *factory) { + o.logsStabilityLevel = sl + o.CreateLogsFunc = createLogs + }) } // NewFactory returns a Factory. func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory { - return internal.NewFactory(cfgType, createDefaultConfig, options...) + f := &factory{ + cfgType: cfgType, + CreateDefaultConfigFunc: createDefaultConfig, + } + for _, opt := range options { + opt.applyProcessorFactoryOption(f) + } + return f } // MakeFactoryMap takes a list of factories and returns a map with Factory type as keys. diff --git a/processor/processorprofiles/go.mod b/processor/processorprofiles/go.mod index 072e9361145..1db82b94993 100644 --- a/processor/processorprofiles/go.mod +++ b/processor/processorprofiles/go.mod @@ -7,6 +7,7 @@ require ( go.opentelemetry.io/collector/component v0.110.0 go.opentelemetry.io/collector/consumer/consumerprofiles v0.110.0 go.opentelemetry.io/collector/consumer/consumertest v0.110.0 + go.opentelemetry.io/collector/pipeline v0.110.0 go.opentelemetry.io/collector/processor v0.110.0 ) @@ -22,7 +23,6 @@ require ( go.opentelemetry.io/collector/internal/globalsignal v0.110.0 // indirect go.opentelemetry.io/collector/pdata v1.16.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.110.0 // indirect - go.opentelemetry.io/collector/pipeline v0.110.0 // indirect go.opentelemetry.io/otel v1.30.0 // indirect go.opentelemetry.io/otel/metric v1.30.0 // indirect go.opentelemetry.io/otel/trace v1.30.0 // indirect diff --git a/processor/processorprofiles/processor.go b/processor/processorprofiles/processor.go index f9ec83e00a4..85c61f941b1 100644 --- a/processor/processorprofiles/processor.go +++ b/processor/processorprofiles/processor.go @@ -4,18 +4,124 @@ package processorprofiles // import "go.opentelemetry.io/collector/processor/processorprofiles" import ( + "context" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer/consumerprofiles" + "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor" - "go.opentelemetry.io/collector/processor/internal" ) +// Factory is a component.Factory interface for processors. +// +// This interface cannot be directly implemented. Implementations must +// use the NewFactory to implement it. +type Factory interface { + processor.Factory + + // CreateProfilesProcessor creates a ProfilesProcessor based on this config. + // If the processor type does not support tracing or if the config is not valid, + // an error will be returned instead. + CreateProfilesProcessor(ctx context.Context, set processor.Settings, cfg component.Config, nextConsumer consumerprofiles.Profiles) (Profiles, error) + + // ProfilesProcessorStability gets the stability level of the ProfilesProcessor. + ProfilesProcessorStability() component.StabilityLevel +} + // Profiles is a processor that can consume profiles. -type Profiles = internal.Profiles +type Profiles interface { + component.Component + consumerprofiles.Profiles +} // CreateProfilesFunc is the equivalent of Factory.CreateProfiles(). -type CreateProfilesFunc = internal.CreateProfilesFunc +// CreateProfilesFunc is the equivalent of Factory.CreateProfiles(). +type CreateProfilesFunc func(context.Context, processor.Settings, component.Config, consumerprofiles.Profiles) (Profiles, error) + +// CreateProfilesProcessor implements Factory.CreateProfilesProcessor(). +func (f CreateProfilesFunc) CreateProfilesProcessor( + ctx context.Context, + set processor.Settings, + cfg component.Config, + nextConsumer consumerprofiles.Profiles) (Profiles, error) { + if f == nil { + return nil, pipeline.ErrSignalNotSupported + } + return f(ctx, set, cfg, nextConsumer) +} + +// FactoryOption apply changes to ReceiverOptions. +type FactoryOption interface { + // applyOption applies the option. + applyOption(o *factoryOpts) +} + +// factoryOptionFunc is an ReceiverFactoryOption created through a function. +type factoryOptionFunc func(*factoryOpts) + +func (f factoryOptionFunc) applyOption(o *factoryOpts) { + f(o) +} + +type factory struct { + processor.Factory + CreateProfilesFunc + profilesStabilityLevel component.StabilityLevel +} + +func (f factory) ProfilesProcessorStability() component.StabilityLevel { + return f.profilesStabilityLevel +} + +type factoryOpts struct { + cfgType component.Type + component.CreateDefaultConfigFunc + opts []processor.FactoryOption + CreateProfilesFunc + profilesStabilityLevel component.StabilityLevel +} + +// WithTraces overrides the default "error not supported" implementation for CreateTraces and the default "undefined" stability level. +func WithTraces(createTraces processor.CreateTracesFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.opts = append(o.opts, processor.WithTraces(createTraces, sl)) + }) +} + +// WithMetrics overrides the default "error not supported" implementation for CreateMetrics and the default "undefined" stability level. +func WithMetrics(createMetrics processor.CreateMetricsFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.opts = append(o.opts, processor.WithMetrics(createMetrics, sl)) + }) +} + +// WithLogs overrides the default "error not supported" implementation for CreateLogs and the default "undefined" stability level. +func WithLogs(createLogs processor.CreateLogsFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.opts = append(o.opts, processor.WithLogs(createLogs, sl)) + }) +} // WithProfiles overrides the default "error not supported" implementation for CreateProfiles and the default "undefined" stability level. -func WithProfiles(createProfiles CreateProfilesFunc, sl component.StabilityLevel) processor.FactoryOption { - return internal.WithProfiles(createProfiles, sl) +func WithProfiles(createProfiles CreateProfilesFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.profilesStabilityLevel = sl + o.CreateProfilesFunc = createProfiles + }) +} + +// NewFactory returns a Factory. +func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory { + opts := factoryOpts{ + cfgType: cfgType, + CreateDefaultConfigFunc: createDefaultConfig, + } + for _, opt := range options { + opt.applyOption(&opts) + } + return &factory{ + Factory: processor.NewFactory(opts.cfgType, opts.CreateDefaultConfig, opts.opts...), + CreateProfilesFunc: opts.CreateProfilesFunc, + profilesStabilityLevel: opts.profilesStabilityLevel, + } } diff --git a/processor/processorprofiles/processor_test.go b/processor/processorprofiles/processor_test.go index 05b1c592ce9..672e5f2c9fc 100644 --- a/processor/processorprofiles/processor_test.go +++ b/processor/processorprofiles/processor_test.go @@ -18,7 +18,7 @@ import ( func TestNewFactoryWithProfiles(t *testing.T) { var testType = component.MustNewType("test") defaultCfg := struct{}{} - factory := processor.NewFactory( + factory := NewFactory( testType, func() component.Config { return &defaultCfg }, WithProfiles(createProfiles, component.StabilityLevelAlpha), diff --git a/processor/processortest/nop_processor.go b/processor/processortest/nop_processor.go index f1dd64bfd52..4d2e1fd4c50 100644 --- a/processor/processortest/nop_processor.go +++ b/processor/processortest/nop_processor.go @@ -30,12 +30,12 @@ func NewNopSettings() processor.Settings { // NewNopFactory returns a component.ProcessorFactory that constructs nop processors. func NewNopFactory() processor.Factory { - return processor.NewFactory( + return processorprofiles.NewFactory( nopType, func() component.Config { return &nopConfig{} }, - processor.WithTraces(createTracesProcessor, component.StabilityLevelStable), - processor.WithMetrics(createMetricsProcessor, component.StabilityLevelStable), - processor.WithLogs(createLogsProcessor, component.StabilityLevelStable), + processorprofiles.WithTraces(createTracesProcessor, component.StabilityLevelStable), + processorprofiles.WithMetrics(createMetricsProcessor, component.StabilityLevelStable), + processorprofiles.WithLogs(createLogsProcessor, component.StabilityLevelStable), processorprofiles.WithProfiles(createProfilesProcessor, component.StabilityLevelAlpha), ) } diff --git a/processor/processortest/nop_processor_test.go b/processor/processortest/nop_processor_test.go index fcf902e552d..35b17c94d14 100644 --- a/processor/processortest/nop_processor_test.go +++ b/processor/processortest/nop_processor_test.go @@ -18,6 +18,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/processor/processorprofiles" ) func TestNewNopFactory(t *testing.T) { @@ -48,7 +49,7 @@ func TestNewNopFactory(t *testing.T) { assert.NoError(t, logs.ConsumeLogs(context.Background(), plog.NewLogs())) assert.NoError(t, logs.Shutdown(context.Background())) - profiles, err := factory.CreateProfilesProcessor(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) + profiles, err := factory.(processorprofiles.Factory).CreateProfilesProcessor(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, consumer.Capabilities{MutatesData: false}, profiles.Capabilities()) assert.NoError(t, profiles.Start(context.Background(), componenttest.NewNopHost())) diff --git a/service/internal/builders/processor.go b/service/internal/builders/processor.go index 57dbff9e5ba..052d95063dc 100644 --- a/service/internal/builders/processor.go +++ b/service/internal/builders/processor.go @@ -10,6 +10,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumerprofiles" + "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processorprofiles" "go.opentelemetry.io/collector/processor/processortest" @@ -94,11 +95,15 @@ func (b *ProcessorBuilder) CreateProfiles(ctx context.Context, set processor.Set return nil, fmt.Errorf("processor %q is not configured", set.ID) } - f, existsFactory := b.factories[set.ID.Type()] + procFact, existsFactory := b.factories[set.ID.Type()] if !existsFactory { return nil, fmt.Errorf("processor factory not available for: %q", set.ID) } + f, ok := procFact.(processorprofiles.Factory) + if !ok { + return nil, pipeline.ErrSignalNotSupported + } logStabilityLevel(set.Logger, f.ProfilesProcessorStability()) return f.CreateProfilesProcessor(ctx, set, cfg, next) } diff --git a/service/internal/builders/processor_test.go b/service/internal/builders/processor_test.go index 09443387817..425f3c89152 100644 --- a/service/internal/builders/processor_test.go +++ b/service/internal/builders/processor_test.go @@ -24,12 +24,12 @@ func TestProcessorBuilder(t *testing.T) { defaultCfg := struct{}{} factories, err := processor.MakeFactoryMap([]processor.Factory{ processor.NewFactory(component.MustNewType("err"), nil), - processor.NewFactory( + processorprofiles.NewFactory( component.MustNewType("all"), func() component.Config { return &defaultCfg }, - processor.WithTraces(createProcessorTraces, component.StabilityLevelDevelopment), - processor.WithMetrics(createProcessorMetrics, component.StabilityLevelAlpha), - processor.WithLogs(createProcessorLogs, component.StabilityLevelDeprecated), + processorprofiles.WithTraces(createProcessorTraces, component.StabilityLevelDevelopment), + processorprofiles.WithMetrics(createProcessorMetrics, component.StabilityLevelAlpha), + processorprofiles.WithLogs(createProcessorLogs, component.StabilityLevelDeprecated), processorprofiles.WithProfiles(createProcessorProfiles, component.StabilityLevelDevelopment), ), }...) @@ -136,12 +136,12 @@ func TestProcessorBuilder(t *testing.T) { func TestProcessorBuilderMissingConfig(t *testing.T) { defaultCfg := struct{}{} factories, err := processor.MakeFactoryMap([]processor.Factory{ - processor.NewFactory( + processorprofiles.NewFactory( component.MustNewType("all"), func() component.Config { return &defaultCfg }, - processor.WithTraces(createProcessorTraces, component.StabilityLevelDevelopment), - processor.WithMetrics(createProcessorMetrics, component.StabilityLevelAlpha), - processor.WithLogs(createProcessorLogs, component.StabilityLevelDeprecated), + processorprofiles.WithTraces(createProcessorTraces, component.StabilityLevelDevelopment), + processorprofiles.WithMetrics(createProcessorMetrics, component.StabilityLevelAlpha), + processorprofiles.WithLogs(createProcessorLogs, component.StabilityLevelDeprecated), processorprofiles.WithProfiles(createProcessorProfiles, component.StabilityLevelDevelopment), ), }...) @@ -207,7 +207,7 @@ func TestNewNopProcessorBuilder(t *testing.T) { require.NoError(t, err) assert.IsType(t, logs, bLogs) - profiles, err := factory.CreateProfilesProcessor(context.Background(), set, cfg, consumertest.NewNop()) + profiles, err := factory.(processorprofiles.Factory).CreateProfilesProcessor(context.Background(), set, cfg, consumertest.NewNop()) require.NoError(t, err) bProfiles, err := builder.CreateProfiles(context.Background(), set, consumertest.NewNop()) require.NoError(t, err) diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index 03849252ffb..a430cb7882e 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -3056,15 +3056,15 @@ func newErrReceiverFactory() receiver.Factory { } func newErrProcessorFactory() processor.Factory { - return processor.NewFactory(component.MustNewType("err"), + return processorprofiles.NewFactory(component.MustNewType("err"), func() component.Config { return &struct{}{} }, - processor.WithTraces(func(context.Context, processor.Settings, component.Config, consumer.Traces) (processor.Traces, error) { + processorprofiles.WithTraces(func(context.Context, processor.Settings, component.Config, consumer.Traces) (processor.Traces, error) { return &errComponent{}, nil }, component.StabilityLevelUndefined), - processor.WithLogs(func(context.Context, processor.Settings, component.Config, consumer.Logs) (processor.Logs, error) { + processorprofiles.WithLogs(func(context.Context, processor.Settings, component.Config, consumer.Logs) (processor.Logs, error) { return &errComponent{}, nil }, component.StabilityLevelUndefined), - processor.WithMetrics(func(context.Context, processor.Settings, component.Config, consumer.Metrics) (processor.Metrics, error) { + processorprofiles.WithMetrics(func(context.Context, processor.Settings, component.Config, consumer.Metrics) (processor.Metrics, error) { return &errComponent{}, nil }, component.StabilityLevelUndefined), processorprofiles.WithProfiles(func(context.Context, processor.Settings, component.Config, consumerprofiles.Profiles) (processorprofiles.Profiles, error) { diff --git a/service/internal/testcomponents/example_processor.go b/service/internal/testcomponents/example_processor.go index 2634d6474b6..950c9ed8078 100644 --- a/service/internal/testcomponents/example_processor.go +++ b/service/internal/testcomponents/example_processor.go @@ -16,12 +16,12 @@ import ( var procType = component.MustNewType("exampleprocessor") // ExampleProcessorFactory is factory for ExampleProcessor. -var ExampleProcessorFactory = processor.NewFactory( +var ExampleProcessorFactory = processorprofiles.NewFactory( procType, createDefaultConfig, - processor.WithTraces(createTracesProcessor, component.StabilityLevelDevelopment), - processor.WithMetrics(createMetricsProcessor, component.StabilityLevelDevelopment), - processor.WithLogs(createLogsProcessor, component.StabilityLevelDevelopment), + processorprofiles.WithTraces(createTracesProcessor, component.StabilityLevelDevelopment), + processorprofiles.WithMetrics(createMetricsProcessor, component.StabilityLevelDevelopment), + processorprofiles.WithLogs(createLogsProcessor, component.StabilityLevelDevelopment), processorprofiles.WithProfiles(createProfilesProcessor, component.StabilityLevelDevelopment), )