Skip to content

Commit

Permalink
bring back processor.Builder and deprecate it
Browse files Browse the repository at this point in the history
  • Loading branch information
dmathieu committed Aug 2, 2024
1 parent 273bc32 commit 284fab3
Show file tree
Hide file tree
Showing 10 changed files with 296 additions and 7 deletions.
4 changes: 2 additions & 2 deletions .chloggen/private-processor-builder.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking
change_type: deprecation

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: processor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Move processor.Builder into internal within the service module
note: Deprecate processor.Builder, and move it into an internal package of the service module

# One or more tracking issues or pull requests related to the change
issues: [10782]
Expand Down
100 changes: 100 additions & 0 deletions processor/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package processor // import "go.opentelemetry.io/collector/processor"

import (
"context"
"errors"
"fmt"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
)

var errNilNextConsumer = errors.New("nil next Consumer")

// Builder processor is a helper struct that given a set of Configs and Factories helps with creating processors.
type Builder struct {
cfgs map[component.ID]component.Config
factories map[component.Type]Factory
}

// NewBuilder creates a new processor.Builder to help with creating components form a set of configs and factories.
func NewBuilder(cfgs map[component.ID]component.Config, factories map[component.Type]Factory) *Builder {
return &Builder{cfgs: cfgs, factories: factories}
}

// CreateTraces creates a Traces processor based on the settings and config.
func (b *Builder) CreateTraces(ctx context.Context, set Settings, next consumer.Traces) (Traces, error) {
if next == nil {
return nil, errNilNextConsumer
}
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("processor %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("processor factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.TracesProcessorStability())
return f.CreateTracesProcessor(ctx, set, cfg, next)
}

// CreateMetrics creates a Metrics processor based on the settings and config.
func (b *Builder) CreateMetrics(ctx context.Context, set Settings, next consumer.Metrics) (Metrics, error) {
if next == nil {
return nil, errNilNextConsumer
}
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("processor %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("processor factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.MetricsProcessorStability())
return f.CreateMetricsProcessor(ctx, set, cfg, next)
}

// CreateLogs creates a Logs processor based on the settings and config.
func (b *Builder) CreateLogs(ctx context.Context, set Settings, next consumer.Logs) (Logs, error) {
if next == nil {
return nil, errNilNextConsumer
}
cfg, existsCfg := b.cfgs[set.ID]
if !existsCfg {
return nil, fmt.Errorf("processor %q is not configured", set.ID)
}

f, existsFactory := b.factories[set.ID.Type()]
if !existsFactory {
return nil, fmt.Errorf("processor factory not available for: %q", set.ID)
}

logStabilityLevel(set.Logger, f.LogsProcessorStability())
return f.CreateLogsProcessor(ctx, set, cfg, next)
}

func (b *Builder) Factory(componentType component.Type) component.Factory {
return b.factories[componentType]
}

// logStabilityLevel logs the stability level of a component. The log level is set to info for
// undefined, unmaintained, deprecated and development. The log level is set to debug
// for alpha, beta and stable.
func logStabilityLevel(logger *zap.Logger, sl component.StabilityLevel) {
if sl >= component.StabilityLevelAlpha {
logger.Debug(sl.LogMessage())
} else {
logger.Info(sl.LogMessage())
}
}
2 changes: 1 addition & 1 deletion processor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.28.0
go.opentelemetry.io/otel/trace v1.28.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
)

require (
Expand All @@ -40,7 +41,6 @@ require (
go.opentelemetry.io/otel/exporters/prometheus v0.50.0 // indirect
go.opentelemetry.io/otel/sdk v1.28.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
Expand Down
148 changes: 148 additions & 0 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
)
Expand Down Expand Up @@ -93,6 +95,144 @@ func TestMakeFactoryMap(t *testing.T) {
}
}

func TestBuilder(t *testing.T) {
defaultCfg := struct{}{}
factories, err := MakeFactoryMap([]Factory{
NewFactory(component.MustNewType("err"), nil),
NewFactory(
component.MustNewType("all"),
func() component.Config { return &defaultCfg },
WithTraces(createTraces, component.StabilityLevelDevelopment),
WithMetrics(createMetrics, component.StabilityLevelAlpha),
WithLogs(createLogs, component.StabilityLevelDeprecated),
),
}...)
require.NoError(t, err)

testCases := []struct {
name string
id component.ID
err string
nextTraces consumer.Traces
nextLogs consumer.Logs
nextMetrics consumer.Metrics
}{
{
name: "unknown",
id: component.MustNewID("unknown"),
err: "processor factory not available for: \"unknown\"",
nextTraces: consumertest.NewNop(),
nextLogs: consumertest.NewNop(),
nextMetrics: consumertest.NewNop(),
},
{
name: "err",
id: component.MustNewID("err"),
err: "telemetry type is not supported",
nextTraces: consumertest.NewNop(),
nextLogs: consumertest.NewNop(),
nextMetrics: consumertest.NewNop(),
},
{
name: "all",
id: component.MustNewID("all"),
nextTraces: consumertest.NewNop(),
nextLogs: consumertest.NewNop(),
nextMetrics: consumertest.NewNop(),
},
{
name: "all/named",
id: component.MustNewIDWithName("all", "named"),
nextTraces: consumertest.NewNop(),
nextLogs: consumertest.NewNop(),
nextMetrics: consumertest.NewNop(),
},
{
name: "no next consumer",
id: component.MustNewID("unknown"),
err: "nil next Consumer",
nextTraces: nil,
nextLogs: nil,
nextMetrics: nil,
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
cfgs := map[component.ID]component.Config{tt.id: defaultCfg}
b := NewBuilder(cfgs, factories)

te, err := b.CreateTraces(context.Background(), createSettings(tt.id), tt.nextTraces)
if tt.err != "" {
assert.EqualError(t, err, tt.err)
assert.Nil(t, te)
} else {
assert.NoError(t, err)
assert.Equal(t, nopInstance, te)
}

me, err := b.CreateMetrics(context.Background(), createSettings(tt.id), tt.nextMetrics)
if tt.err != "" {
assert.EqualError(t, err, tt.err)
assert.Nil(t, me)
} else {
assert.NoError(t, err)
assert.Equal(t, nopInstance, me)
}

le, err := b.CreateLogs(context.Background(), createSettings(tt.id), tt.nextLogs)
if tt.err != "" {
assert.EqualError(t, err, tt.err)
assert.Nil(t, le)
} else {
assert.NoError(t, err)
assert.Equal(t, nopInstance, le)
}
})
}
}

func TestBuilderMissingConfig(t *testing.T) {
defaultCfg := struct{}{}
factories, err := MakeFactoryMap([]Factory{
NewFactory(
component.MustNewType("all"),
func() component.Config { return &defaultCfg },
WithTraces(createTraces, component.StabilityLevelDevelopment),
WithMetrics(createMetrics, component.StabilityLevelAlpha),
WithLogs(createLogs, component.StabilityLevelDeprecated),
),
}...)

require.NoError(t, err)

bErr := NewBuilder(map[component.ID]component.Config{}, factories)
missingID := component.MustNewIDWithName("all", "missing")

te, err := bErr.CreateTraces(context.Background(), createSettings(missingID), consumertest.NewNop())
assert.EqualError(t, err, "processor \"all/missing\" is not configured")
assert.Nil(t, te)

me, err := bErr.CreateMetrics(context.Background(), createSettings(missingID), consumertest.NewNop())
assert.EqualError(t, err, "processor \"all/missing\" is not configured")
assert.Nil(t, me)

le, err := bErr.CreateLogs(context.Background(), createSettings(missingID), consumertest.NewNop())
assert.EqualError(t, err, "processor \"all/missing\" is not configured")
assert.Nil(t, le)
}

func TestBuilderFactory(t *testing.T) {
factories, err := MakeFactoryMap([]Factory{NewFactory(component.MustNewType("foo"), nil)}...)
require.NoError(t, err)

cfgs := map[component.ID]component.Config{component.MustNewID("foo"): struct{}{}}
b := NewBuilder(cfgs, factories)

assert.NotNil(t, b.Factory(component.MustNewID("foo").Type()))
assert.Nil(t, b.Factory(component.MustNewID("bar").Type()))
}

var nopInstance = &nopProcessor{
Consumer: consumertest.NewNop(),
}
Expand All @@ -115,3 +255,11 @@ func createMetrics(context.Context, Settings, component.Config, consumer.Metrics
func createLogs(context.Context, Settings, component.Config, consumer.Logs) (Logs, error) {
return nopInstance, nil
}

func createSettings(id component.ID) Settings {
return Settings{
ID: id,
TelemetrySettings: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
}
}
22 changes: 22 additions & 0 deletions processor/processorprofiles/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion service/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var _ component.Host = (*serviceHost)(nil)
type serviceHost struct {
asyncErrorChannel chan error
receivers *receiver.Builder
processors *builders.ProcessorBuilder
processors builders.Processor
exporters *exporter.Builder
connectors *connector.Builder
extensions *extension.Builder
Expand Down
7 changes: 7 additions & 0 deletions service/internal/builders/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ var (
nopType = component.MustNewType("nop")
)

type Processor interface {
CreateTraces(context.Context, processor.Settings, consumer.Traces) (processor.Traces, error)
CreateMetrics(context.Context, processor.Settings, consumer.Metrics) (processor.Metrics, error)
CreateLogs(context.Context, processor.Settings, consumer.Logs) (processor.Logs, error)
Factory(component.Type) component.Factory
}

// ProcessorBuilder processor is a helper struct that given a set of Configs
// and Factories helps with creating processors.
type ProcessorBuilder struct {
Expand Down
2 changes: 1 addition & 1 deletion service/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Settings struct {
BuildInfo component.BuildInfo

ReceiverBuilder *receiver.Builder
ProcessorBuilder *builders.ProcessorBuilder
ProcessorBuilder builders.Processor
ExporterBuilder *exporter.Builder
ConnectorBuilder *connector.Builder

Expand Down
2 changes: 1 addition & 1 deletion service/internal/graph/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (n *processorNode) getConsumer() baseConsumer {
func (n *processorNode) buildComponent(ctx context.Context,
tel component.TelemetrySettings,
info component.BuildInfo,
builder *builders.ProcessorBuilder,
builder builders.Processor,
next baseConsumer,
) error {
tel.Logger = components.ProcessorLogger(tel.Logger, n.componentID, n.pipelineID)
Expand Down
Loading

0 comments on commit 284fab3

Please sign in to comment.