From 708d424b6207e25199f783278dbd7f44cfdb673d Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Tue, 21 May 2024 17:36:18 +0200 Subject: [PATCH] [service/telemetry] Switch to a factory pattern (#10001) #### Description Switches `service/telemetry` to a factory pattern. To avoid adding a lot of public API in one go: 1. the actual factory builder is in an internal package 2. I have not added the `CreateMeterProvider` method yet There are two goals with this: one is to make progress on #4970, the other is to allow initializing telemetry sooner: #### Link to tracking issue Updates #4970. #### Testing Updates existing tests to use `NewFactory` --- .chloggen/mx-psi_tel-factory.yaml | 25 +++++ otelcol/unmarshaler.go | 32 +------ service/service.go | 23 +++-- service/telemetry/factory.go | 59 ++++++++++++ service/telemetry/internal/factory.go | 116 ++++++++++++++++++++++ service/telemetry/logger.go | 53 +++++++++++ service/telemetry/telemetry.go | 132 +++----------------------- service/telemetry/telemetry_test.go | 16 ++-- service/telemetry/tracer.go | 85 +++++++++++++++++ 9 files changed, 382 insertions(+), 159 deletions(-) create mode 100644 .chloggen/mx-psi_tel-factory.yaml create mode 100644 service/telemetry/factory.go create mode 100644 service/telemetry/internal/factory.go create mode 100644 service/telemetry/logger.go create mode 100644 service/telemetry/tracer.go diff --git a/.chloggen/mx-psi_tel-factory.yaml b/.chloggen/mx-psi_tel-factory.yaml new file mode 100644 index 00000000000..2a07ec8b46c --- /dev/null +++ b/.chloggen/mx-psi_tel-factory.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: service/telemetry + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Deprecate telemetry.New in favor of telemetry.NewFactory + +# One or more tracking issues or pull requests related to the change +issues: [4970] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/otelcol/unmarshaler.go b/otelcol/unmarshaler.go index a522d9fa60d..4967b0a731d 100644 --- a/otelcol/unmarshaler.go +++ b/otelcol/unmarshaler.go @@ -4,11 +4,6 @@ package otelcol // import "go.opentelemetry.io/collector/otelcol" import ( - "time" - - "go.uber.org/zap/zapcore" - - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/exporter" @@ -32,6 +27,10 @@ type configSettings struct { // unmarshal the configSettings from a confmap.Conf. // After the config is unmarshalled, `Validate()` must be called to validate. func unmarshal(v *confmap.Conf, factories Factories) (*configSettings, error) { + + telFactory := telemetry.NewFactory() + defaultTelConfig := *telFactory.CreateDefaultConfig().(*telemetry.Config) + // Unmarshal top level sections and validate. cfg := &configSettings{ Receivers: configunmarshaler.NewConfigs(factories.Receivers), @@ -41,28 +40,7 @@ func unmarshal(v *confmap.Conf, factories Factories) (*configSettings, error) { Extensions: configunmarshaler.NewConfigs(factories.Extensions), // TODO: Add a component.ServiceFactory to allow this to be defined by the Service. Service: service.Config{ - Telemetry: telemetry.Config{ - Logs: telemetry.LogsConfig{ - Level: zapcore.InfoLevel, - Development: false, - Encoding: "console", - Sampling: &telemetry.LogsSamplingConfig{ - Enabled: true, - Tick: 10 * time.Second, - Initial: 10, - Thereafter: 100, - }, - OutputPaths: []string{"stderr"}, - ErrorOutputPaths: []string{"stderr"}, - DisableCaller: false, - DisableStacktrace: false, - InitialFields: map[string]any(nil), - }, - Metrics: telemetry.MetricsConfig{ - Level: configtelemetry.LevelNormal, - Address: ":8888", - }, - }, + Telemetry: defaultTelConfig, }, } diff --git a/service/service.go b/service/service.go index 902454c78f1..53ea3eebe65 100644 --- a/service/service.go +++ b/service/service.go @@ -89,16 +89,27 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { }, collectorConf: set.CollectorConf, } - tel, err := telemetry.New(ctx, telemetry.Settings{BuildInfo: set.BuildInfo, ZapOptions: set.LoggingOptions}, cfg.Telemetry) - if err != nil { - return nil, fmt.Errorf("failed to get logger: %w", err) - } // Fetch data for internal telemetry like instance id and sdk version to provide for internal telemetry. res := resource.New(set.BuildInfo, cfg.Telemetry.Resource) pcommonRes := pdataFromSdk(res) - logger := tel.Logger() + telFactory := telemetry.NewFactory() + telset := telemetry.Settings{ + BuildInfo: set.BuildInfo, + ZapOptions: set.LoggingOptions, + } + + logger, err := telFactory.CreateLogger(ctx, telset, &cfg.Telemetry) + if err != nil { + return nil, fmt.Errorf("failed to create logger: %w", err) + } + + tracerProvider, err := telFactory.CreateTracerProvider(ctx, telset, &cfg.Telemetry) + if err != nil { + return nil, fmt.Errorf("failed to create tracer provider: %w", err) + } + logger.Info("Setting up own telemetry...") mp, err := newMeterProvider( meterProviderSettings{ @@ -116,7 +127,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { srv.telemetrySettings = servicetelemetry.TelemetrySettings{ Logger: logger, MeterProvider: mp, - TracerProvider: tel.TracerProvider(), + TracerProvider: tracerProvider, MetricsLevel: cfg.Telemetry.Metrics.Level, // Construct telemetry attributes from build info and config's resource attributes. Resource: pcommonRes, diff --git a/service/telemetry/factory.go b/service/telemetry/factory.go new file mode 100644 index 00000000000..c236d3c5733 --- /dev/null +++ b/service/telemetry/factory.go @@ -0,0 +1,59 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package telemetry // import "go.opentelemetry.io/collector/service/telemetry" + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/service/telemetry/internal" +) + +func createDefaultConfig() component.Config { + return &Config{ + Logs: LogsConfig{ + Level: zapcore.InfoLevel, + Development: false, + Encoding: "console", + Sampling: &LogsSamplingConfig{ + Enabled: true, + Tick: 10 * time.Second, + Initial: 10, + Thereafter: 100, + }, + OutputPaths: []string{"stderr"}, + ErrorOutputPaths: []string{"stderr"}, + DisableCaller: false, + DisableStacktrace: false, + InitialFields: map[string]any(nil), + }, + Metrics: MetricsConfig{ + Level: configtelemetry.LevelNormal, + Address: ":8888", + }, + } +} + +// Factory is a telemetry factory. +type Factory = internal.Factory + +// NewFactory creates a new Factory. +func NewFactory() Factory { + return internal.NewFactory(createDefaultConfig, + internal.WithLogger(func(_ context.Context, set Settings, cfg component.Config) (*zap.Logger, error) { + c := *cfg.(*Config) + return newLogger(c.Logs, set.ZapOptions) + }), + internal.WithTracerProvider(func(ctx context.Context, _ Settings, cfg component.Config) (trace.TracerProvider, error) { + c := *cfg.(*Config) + return newTracerProvider(ctx, c) + }), + ) +} diff --git a/service/telemetry/internal/factory.go b/service/telemetry/internal/factory.go new file mode 100644 index 00000000000..f1368c9704d --- /dev/null +++ b/service/telemetry/internal/factory.go @@ -0,0 +1,116 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/service/telemetry/internal" + +import ( + "context" + + "go.opentelemetry.io/otel/trace" + tracenoop "go.opentelemetry.io/otel/trace/noop" + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" +) + +// CreateSettings holds configuration for building Telemetry. +type CreateSettings struct { + BuildInfo component.BuildInfo + AsyncErrorChannel chan error + ZapOptions []zap.Option +} + +// Factory is factory interface for telemetry. +// This interface cannot be directly implemented. Implementations must +// use the NewFactory to implement it. +type Factory interface { + // CreateDefaultConfig creates the default configuration for the telemetry. + // TODO: Should we just inherit from component.Factory? + CreateDefaultConfig() component.Config + + // CreateLogger creates a logger. + CreateLogger(ctx context.Context, set CreateSettings, cfg component.Config) (*zap.Logger, error) + + // CreateTracerProvider creates a TracerProvider. + CreateTracerProvider(ctx context.Context, set CreateSettings, cfg component.Config) (trace.TracerProvider, error) + + // TODO: Add CreateMeterProvider. + + // unexportedFactoryFunc is used to prevent external implementations of Factory. + unexportedFactoryFunc() +} + +// FactoryOption apply changes to Factory. +type FactoryOption interface { + // applyTelemetryFactoryOption applies the option. + applyTelemetryFactoryOption(o *factory) +} + +var _ FactoryOption = (*factoryOptionFunc)(nil) + +// factoryOptionFunc is an FactoryOption created through a function. +type factoryOptionFunc func(*factory) + +func (f factoryOptionFunc) applyTelemetryFactoryOption(o *factory) { + f(o) +} + +var _ Factory = (*factory)(nil) + +// factory is the implementation of Factory. +type factory struct { + createDefaultConfig component.CreateDefaultConfigFunc + CreateLoggerFunc + CreateTracerProviderFunc +} + +func (f *factory) CreateDefaultConfig() component.Config { + return f.createDefaultConfig() +} + +// CreateLoggerFunc is the equivalent of Factory.CreateLogger. +type CreateLoggerFunc func(context.Context, CreateSettings, component.Config) (*zap.Logger, error) + +// WithLogger overrides the default no-op logger. +func WithLogger(createLogger CreateLoggerFunc) FactoryOption { + return factoryOptionFunc(func(o *factory) { + o.CreateLoggerFunc = createLogger + }) +} + +func (f *factory) CreateLogger(ctx context.Context, set CreateSettings, cfg component.Config) (*zap.Logger, error) { + if f.CreateLoggerFunc == nil { + return zap.NewNop(), nil + } + return f.CreateLoggerFunc(ctx, set, cfg) +} + +// CreateTracerProviderFunc is the equivalent of Factory.CreateTracerProvider. +type CreateTracerProviderFunc func(context.Context, CreateSettings, component.Config) (trace.TracerProvider, error) + +// WithTracerProvider overrides the default no-op tracer provider. +func WithTracerProvider(createTracerProvider CreateTracerProviderFunc) FactoryOption { + return factoryOptionFunc(func(o *factory) { + o.CreateTracerProviderFunc = createTracerProvider + }) +} + +func (f *factory) CreateTracerProvider(ctx context.Context, set CreateSettings, cfg component.Config) (trace.TracerProvider, error) { + if f.CreateTracerProviderFunc == nil { + return tracenoop.NewTracerProvider(), nil + } + return f.CreateTracerProviderFunc(ctx, set, cfg) +} + +func (f *factory) unexportedFactoryFunc() {} + +// NewFactory returns a new Factory. +func NewFactory(createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory { + f := &factory{ + createDefaultConfig: createDefaultConfig, + } + for _, op := range options { + op.applyTelemetryFactoryOption(f) + } + return f +} diff --git a/service/telemetry/logger.go b/service/telemetry/logger.go new file mode 100644 index 00000000000..eb675bc459f --- /dev/null +++ b/service/telemetry/logger.go @@ -0,0 +1,53 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package telemetry // import "go.opentelemetry.io/collector/service/telemetry" + +import ( + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +func newLogger(cfg LogsConfig, options []zap.Option) (*zap.Logger, error) { + // Copied from NewProductionConfig. + zapCfg := &zap.Config{ + Level: zap.NewAtomicLevelAt(cfg.Level), + Development: cfg.Development, + Encoding: cfg.Encoding, + EncoderConfig: zap.NewProductionEncoderConfig(), + OutputPaths: cfg.OutputPaths, + ErrorOutputPaths: cfg.ErrorOutputPaths, + DisableCaller: cfg.DisableCaller, + DisableStacktrace: cfg.DisableStacktrace, + InitialFields: cfg.InitialFields, + } + + if zapCfg.Encoding == "console" { + // Human-readable timestamps for console format of logs. + zapCfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder + } + + logger, err := zapCfg.Build(options...) + if err != nil { + return nil, err + } + if cfg.Sampling != nil && cfg.Sampling.Enabled { + logger = newSampledLogger(logger, cfg.Sampling) + } + + return logger, nil +} + +func newSampledLogger(logger *zap.Logger, sc *LogsSamplingConfig) *zap.Logger { + // Create a logger that samples every Nth message after the first M messages every S seconds + // where N = sc.Thereafter, M = sc.Initial, S = sc.Tick. + opts := zap.WrapCore(func(core zapcore.Core) zapcore.Core { + return zapcore.NewSamplerWithOptions( + core, + sc.Tick, + sc.Initial, + sc.Thereafter, + ) + }) + return logger.WithOptions(opts) +} diff --git a/service/telemetry/telemetry.go b/service/telemetry/telemetry.go index 008a50c07ea..53e36f3c76c 100644 --- a/service/telemetry/telemetry.go +++ b/service/telemetry/telemetry.go @@ -5,31 +5,18 @@ package telemetry // import "go.opentelemetry.io/collector/service/telemetry" import ( "context" - "errors" + "fmt" - "go.opentelemetry.io/contrib/config" - "go.opentelemetry.io/contrib/propagators/b3" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/propagation" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" "go.uber.org/multierr" "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "go.opentelemetry.io/collector/component" -) - -const ( - // supported trace propagators - traceContextPropagator = "tracecontext" - b3Propagator = "b3" -) - -var ( - errUnsupportedPropagator = errors.New("unsupported trace propagator") + "go.opentelemetry.io/collector/service/telemetry/internal" ) +// Telemetry is the service telemetry. +// Deprecated: [v0.99.0] Use Factory. type Telemetry struct { logger *zap.Logger tracerProvider trace.TracerProvider @@ -55,119 +42,24 @@ func (t *Telemetry) Shutdown(ctx context.Context) error { } // Settings holds configuration for building Telemetry. -type Settings struct { - BuildInfo component.BuildInfo - ZapOptions []zap.Option -} +type Settings = internal.CreateSettings // New creates a new Telemetry from Config. +// Deprecated: [v0.99.0] Use NewFactory. func New(ctx context.Context, set Settings, cfg Config) (*Telemetry, error) { - logger, err := newLogger(cfg.Logs, set.ZapOptions) + f := NewFactory() + logger, err := f.CreateLogger(ctx, set, &cfg) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to created logger: %w", err) } - sdk, err := config.NewSDK( - config.WithContext(ctx), - config.WithOpenTelemetryConfiguration( - config.OpenTelemetryConfiguration{ - TracerProvider: &config.TracerProvider{ - Processors: cfg.Traces.Processors, - // TODO: once https://github.com/open-telemetry/opentelemetry-configuration/issues/83 is resolved, - // configuration for sampler should be done here via something like the following: - // - // Sampler: &config.Sampler{ - // ParentBased: &config.SamplerParentBased{ - // LocalParentSampled: &config.Sampler{ - // AlwaysOn: config.SamplerAlwaysOn{}, - // }, - // LocalParentNotSampled: &config.Sampler{ - // RecordOnly: config.SamplerRecordOnly{}, - // }, - // RemoteParentSampled: &config.Sampler{ - // AlwaysOn: config.SamplerAlwaysOn{}, - // }, - // RemoteParentNotSampled: &config.Sampler{ - // RecordOnly: config.SamplerRecordOnly{}, - // }, - // }, - // }, - }, - }, - ), - ) - + tracerProvider, err := f.CreateTracerProvider(ctx, set, &cfg) if err != nil { - return nil, err - } - - if tp, err := textMapPropagatorFromConfig(cfg.Traces.Propagators); err == nil { - otel.SetTextMapPropagator(tp) - } else { - return nil, err + return nil, fmt.Errorf("failed to create tracer provider: %w", err) } return &Telemetry{ logger: logger, - tracerProvider: sdk.TracerProvider(), + tracerProvider: tracerProvider, }, nil } - -func textMapPropagatorFromConfig(props []string) (propagation.TextMapPropagator, error) { - var textMapPropagators []propagation.TextMapPropagator - for _, prop := range props { - switch prop { - case traceContextPropagator: - textMapPropagators = append(textMapPropagators, propagation.TraceContext{}) - case b3Propagator: - textMapPropagators = append(textMapPropagators, b3.New()) - default: - return nil, errUnsupportedPropagator - } - } - return propagation.NewCompositeTextMapPropagator(textMapPropagators...), nil -} - -func newLogger(cfg LogsConfig, options []zap.Option) (*zap.Logger, error) { - // Copied from NewProductionConfig. - zapCfg := &zap.Config{ - Level: zap.NewAtomicLevelAt(cfg.Level), - Development: cfg.Development, - Encoding: cfg.Encoding, - EncoderConfig: zap.NewProductionEncoderConfig(), - OutputPaths: cfg.OutputPaths, - ErrorOutputPaths: cfg.ErrorOutputPaths, - DisableCaller: cfg.DisableCaller, - DisableStacktrace: cfg.DisableStacktrace, - InitialFields: cfg.InitialFields, - } - - if zapCfg.Encoding == "console" { - // Human-readable timestamps for console format of logs. - zapCfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder - } - - logger, err := zapCfg.Build(options...) - if err != nil { - return nil, err - } - if cfg.Sampling != nil && cfg.Sampling.Enabled { - logger = newSampledLogger(logger, cfg.Sampling) - } - - return logger, nil -} - -func newSampledLogger(logger *zap.Logger, sc *LogsSamplingConfig) *zap.Logger { - // Create a logger that samples every Nth message after the first M messages every S seconds - // where N = sc.Thereafter, M = sc.Initial, S = sc.Tick. - opts := zap.WrapCore(func(core zapcore.Core) zapcore.Core { - return zapcore.NewSamplerWithOptions( - core, - sc.Tick, - sc.Initial, - sc.Thereafter, - ) - }) - return logger.WithOptions(opts) -} diff --git a/service/telemetry/telemetry_test.go b/service/telemetry/telemetry_test.go index c72d8c75e55..3a46c3cd8bc 100644 --- a/service/telemetry/telemetry_test.go +++ b/service/telemetry/telemetry_test.go @@ -55,13 +55,15 @@ func TestTelemetryConfiguration(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - telemetry, err := New(context.Background(), Settings{ZapOptions: []zap.Option{}}, *tt.cfg) + f := NewFactory() + set := Settings{ZapOptions: []zap.Option{}} + logger, err := f.CreateLogger(context.Background(), set, tt.cfg) if tt.success { assert.NoError(t, err) - assert.NotNil(t, telemetry) + assert.NotNil(t, logger) } else { assert.Error(t, err) - assert.Nil(t, telemetry) + assert.Nil(t, logger) } }) } @@ -111,10 +113,12 @@ func TestSampledLogger(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - telemetry, err := New(context.Background(), Settings{ZapOptions: []zap.Option{}}, *tt.cfg) + f := NewFactory() + ctx := context.Background() + set := Settings{ZapOptions: []zap.Option{}} + logger, err := f.CreateLogger(ctx, set, tt.cfg) assert.NoError(t, err) - assert.NotNil(t, telemetry) - assert.NotNil(t, telemetry.Logger()) + assert.NotNil(t, logger) }) } } diff --git a/service/telemetry/tracer.go b/service/telemetry/tracer.go new file mode 100644 index 00000000000..d235ca3aaf2 --- /dev/null +++ b/service/telemetry/tracer.go @@ -0,0 +1,85 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package telemetry // import "go.opentelemetry.io/collector/service/telemetry" + +import ( + "context" + "errors" + + "go.opentelemetry.io/contrib/config" + "go.opentelemetry.io/contrib/propagators/b3" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" +) + +const ( + // supported trace propagators + traceContextPropagator = "tracecontext" + b3Propagator = "b3" +) + +var ( + errUnsupportedPropagator = errors.New("unsupported trace propagator") +) + +// New creates a new Telemetry from Config. +func newTracerProvider(ctx context.Context, cfg Config) (trace.TracerProvider, error) { + sdk, err := config.NewSDK( + config.WithContext(ctx), + config.WithOpenTelemetryConfiguration( + config.OpenTelemetryConfiguration{ + TracerProvider: &config.TracerProvider{ + Processors: cfg.Traces.Processors, + // TODO: once https://github.com/open-telemetry/opentelemetry-configuration/issues/83 is resolved, + // configuration for sampler should be done here via something like the following: + // + // Sampler: &config.Sampler{ + // ParentBased: &config.SamplerParentBased{ + // LocalParentSampled: &config.Sampler{ + // AlwaysOn: config.SamplerAlwaysOn{}, + // }, + // LocalParentNotSampled: &config.Sampler{ + // RecordOnly: config.SamplerRecordOnly{}, + // }, + // RemoteParentSampled: &config.Sampler{ + // AlwaysOn: config.SamplerAlwaysOn{}, + // }, + // RemoteParentNotSampled: &config.Sampler{ + // RecordOnly: config.SamplerRecordOnly{}, + // }, + // }, + // }, + }, + }, + ), + ) + + if err != nil { + return nil, err + } + + if tp, err := textMapPropagatorFromConfig(cfg.Traces.Propagators); err == nil { + otel.SetTextMapPropagator(tp) + } else { + return nil, err + } + + return sdk.TracerProvider(), nil +} + +func textMapPropagatorFromConfig(props []string) (propagation.TextMapPropagator, error) { + var textMapPropagators []propagation.TextMapPropagator + for _, prop := range props { + switch prop { + case traceContextPropagator: + textMapPropagators = append(textMapPropagators, propagation.TraceContext{}) + case b3Propagator: + textMapPropagators = append(textMapPropagators, b3.New()) + default: + return nil, errUnsupportedPropagator + } + } + return propagation.NewCompositeTextMapPropagator(textMapPropagators...), nil +}