From 7c5ecef11dff4ce5501c9683b277a25a61ea0f1a Mon Sep 17 00:00:00 2001 From: Antonio Jimenez <123171955+antonjim-te@users.noreply.github.com> Date: Fri, 29 Sep 2023 18:36:09 +0200 Subject: [PATCH] [telemetry] Enable sampling logging by default and apply it to all components (#8134) The sampled logger configuration can be disabled easily by setting the `service::telemetry::logs::sampling::enabled` to `false`. --- .chloggen/SampledLoggerTelemetry.yaml | 26 ++++++ exporter/exporterhelper/common.go | 37 ++------ otelcol/unmarshaler.go | 6 +- otelcol/unmarshaler_test.go | 5 +- service/service_test.go | 13 +++ service/telemetry/config.go | 19 ++++- service/telemetry/telemetry.go | 24 ++++-- service/telemetry/telemetry_test.go | 117 ++++++++++++++++++++++++++ 8 files changed, 203 insertions(+), 44 deletions(-) create mode 100644 .chloggen/SampledLoggerTelemetry.yaml create mode 100644 service/telemetry/telemetry_test.go diff --git a/.chloggen/SampledLoggerTelemetry.yaml b/.chloggen/SampledLoggerTelemetry.yaml new file mode 100644 index 00000000000..68678e936d1 --- /dev/null +++ b/.chloggen/SampledLoggerTelemetry.yaml @@ -0,0 +1,26 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: service/telemetry exporter/exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Enable sampling logging by default and apply it to all components." + +# One or more tracking issues or pull requests related to the change +issues: [8134] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: The sampled logger configuration can be disabled easily by setting the `service::telemetry::logs::sampling::enabled` to `false`. + + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] \ No newline at end of file diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 127bd66ec55..8a68efcc617 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -5,10 +5,6 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporte import ( "context" - "time" - - "go.uber.org/zap" - "go.uber.org/zap/zapcore" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -101,7 +97,7 @@ func WithTimeout(timeoutSettings TimeoutSettings) Option { // The default RetrySettings is to disable retries. func WithRetry(retrySettings RetrySettings) Option { return func(o *baseExporter) { - o.retrySender = newRetrySender(o.set.ID, retrySettings, o.sampledLogger, o.onTemporaryFailure) + o.retrySender = newRetrySender(o.set.ID, retrySettings, o.set.Logger, o.onTemporaryFailure) } } @@ -121,7 +117,7 @@ func WithQueue(config QueueSettings) Option { queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler) } } - qs := newQueueSender(o.set.ID, o.signal, queue, o.sampledLogger) + qs := newQueueSender(o.set.ID, o.signal, queue, o.set.Logger) o.queueSender = qs o.setOnTemporaryFailure(qs.onTemporaryFailure) } @@ -146,9 +142,8 @@ type baseExporter struct { unmarshaler internal.RequestUnmarshaler signal component.DataType - set exporter.CreateSettings - obsrep *obsExporter - sampledLogger *zap.Logger + set exporter.CreateSettings + obsrep *obsExporter // Chain of senders that the exporter helper applies before passing the data to the actual exporter. // The data is handled by each sender in the respective order starting from the queueSender. @@ -184,9 +179,8 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req retrySender: &baseRequestSender{}, timeoutSender: &timeoutSender{cfg: NewDefaultTimeoutSettings()}, - set: set, - obsrep: obsrep, - sampledLogger: createSampledLogger(set.Logger), + set: set, + obsrep: obsrep, } for _, op := range options { @@ -236,22 +230,3 @@ func (be *baseExporter) setOnTemporaryFailure(onTemporaryFailure onRequestHandli rs.onTemporaryFailure = onTemporaryFailure } } - -func createSampledLogger(logger *zap.Logger) *zap.Logger { - if logger.Core().Enabled(zapcore.DebugLevel) { - // Debugging is enabled. Don't do any sampling. - return logger - } - - // Create a logger that samples all messages to 1 per 10 seconds initially, - // and 1/100 of messages after that. - opts := zap.WrapCore(func(core zapcore.Core) zapcore.Core { - return zapcore.NewSamplerWithOptions( - core, - 10*time.Second, - 1, - 100, - ) - }) - return logger.WithOptions(opts) -} diff --git a/otelcol/unmarshaler.go b/otelcol/unmarshaler.go index 88f25dfc93f..66b2a87a0c6 100644 --- a/otelcol/unmarshaler.go +++ b/otelcol/unmarshaler.go @@ -4,6 +4,8 @@ package otelcol // import "go.opentelemetry.io/collector/otelcol" import ( + "time" + "go.uber.org/zap/zapcore" "go.opentelemetry.io/collector/config/configtelemetry" @@ -45,7 +47,9 @@ func unmarshal(v *confmap.Conf, factories Factories) (*configSettings, error) { Development: false, Encoding: "console", Sampling: &telemetry.LogsSamplingConfig{ - Initial: 100, + Enabled: true, + Tick: 10 * time.Second, + Initial: 10, Thereafter: 100, }, OutputPaths: []string{"stderr"}, diff --git a/otelcol/unmarshaler_test.go b/otelcol/unmarshaler_test.go index c4ef21906db..1e7881a8a0d 100644 --- a/otelcol/unmarshaler_test.go +++ b/otelcol/unmarshaler_test.go @@ -5,6 +5,7 @@ package otelcol import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -45,7 +46,9 @@ func TestUnmarshalEmptyAllSections(t *testing.T) { Development: zapProdCfg.Development, Encoding: "console", Sampling: &telemetry.LogsSamplingConfig{ - Initial: 100, + Enabled: true, + Tick: 10 * time.Second, + Initial: 10, Thereafter: 100, }, DisableCaller: zapProdCfg.DisableCaller, diff --git a/service/service_test.go b/service/service_test.go index 2a3ec346d65..16c5fd6f82f 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -403,6 +403,17 @@ func TestNilCollectorEffectiveConfig(t *testing.T) { require.NoError(t, srv.Shutdown(context.Background())) } +func TestServiceTelemetryLogger(t *testing.T) { + srv, err := New(context.Background(), newNopSettings(), newNopConfig()) + require.NoError(t, err) + + assert.NoError(t, srv.Start(context.Background())) + t.Cleanup(func() { + assert.NoError(t, srv.Shutdown(context.Background())) + }) + assert.NotNil(t, srv.telemetrySettings.Logger) +} + func assertResourceLabels(t *testing.T, res pcommon.Resource, expectedLabels map[string]labelValue) { for key, labelValue := range expectedLabels { lookupKey, ok := prometheusToOtelConv[key] @@ -533,6 +544,8 @@ func newNopConfigPipelineConfigs(pipelineCfgs pipelines.Config) Config { Development: false, Encoding: "console", Sampling: &telemetry.LogsSamplingConfig{ + Enabled: true, + Tick: 10 * time.Second, Initial: 100, Thereafter: 100, }, diff --git a/service/telemetry/config.go b/service/telemetry/config.go index 514ae03278a..becc1b11e38 100644 --- a/service/telemetry/config.go +++ b/service/telemetry/config.go @@ -5,6 +5,7 @@ package telemetry // import "go.opentelemetry.io/collector/service/telemetry" import ( "fmt" + "time" "go.uber.org/zap/zapcore" @@ -54,7 +55,14 @@ type LogsConfig struct { // (default = false) DisableStacktrace bool `mapstructure:"disable_stacktrace"` - // Sampling sets a sampling policy. A nil SamplingConfig disables sampling. + // Sampling sets a sampling policy. + // Default: + // sampling: + // enabled: true + // tick: 10s + // initial: 10 + // thereafter: 100 + // Sampling can be disabled by setting 'enabled' to false Sampling *LogsSamplingConfig `mapstructure:"sampling"` // OutputPaths is a list of URLs or file paths to write logging output to. @@ -91,7 +99,14 @@ type LogsConfig struct { // global CPU and I/O load that logging puts on your process while attempting // to preserve a representative subset of your logs. type LogsSamplingConfig struct { - Initial int `mapstructure:"initial"` + // Enabled enable sampling logging + Enabled bool `mapstructure:"enabled"` + // Tick represents the interval in seconds that the logger apply each sampling. + Tick time.Duration `mapstructure:"tick"` + // Initial represents the first M messages logged each Tick. + Initial int `mapstructure:"initial"` + // Thereafter represents the sampling rate, every Nth message will be sampled after Initial messages are logged during each Tick. + // If Thereafter is zero, the logger will drop all the messages after the Initial each Tick. Thereafter int `mapstructure:"thereafter"` } diff --git a/service/telemetry/telemetry.go b/service/telemetry/telemetry.go index 00f9db630b0..56dec2da0a6 100644 --- a/service/telemetry/telemetry.go +++ b/service/telemetry/telemetry.go @@ -59,7 +59,6 @@ func newLogger(cfg LogsConfig, options []zap.Option) (*zap.Logger, error) { zapCfg := &zap.Config{ Level: zap.NewAtomicLevelAt(cfg.Level), Development: cfg.Development, - Sampling: toSamplingConfig(cfg.Sampling), Encoding: cfg.Encoding, EncoderConfig: zap.NewProductionEncoderConfig(), OutputPaths: cfg.OutputPaths, @@ -78,16 +77,23 @@ func newLogger(cfg LogsConfig, options []zap.Option) (*zap.Logger, error) { if err != nil { return nil, err } + if cfg.Sampling != nil && cfg.Sampling.Enabled { + logger = newSampledLogger(logger, cfg.Sampling) + } return logger, nil } -func toSamplingConfig(sc *LogsSamplingConfig) *zap.SamplingConfig { - if sc == nil { - return nil - } - return &zap.SamplingConfig{ - Initial: sc.Initial, - Thereafter: sc.Thereafter, - } +func newSampledLogger(logger *zap.Logger, sc *LogsSamplingConfig) *zap.Logger { + // Create a logger that samples every Nth message after the first M messages every S seconds + // where N = sc.Thereafter, M = sc.Initial, S = sc.Tick. + opts := zap.WrapCore(func(core zapcore.Core) zapcore.Core { + return zapcore.NewSamplerWithOptions( + core, + sc.Tick, + sc.Initial, + sc.Thereafter, + ) + }) + return logger.WithOptions(opts) } diff --git a/service/telemetry/telemetry_test.go b/service/telemetry/telemetry_test.go new file mode 100644 index 00000000000..b01b3c7d8c8 --- /dev/null +++ b/service/telemetry/telemetry_test.go @@ -0,0 +1,117 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package telemetry + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "go.opentelemetry.io/collector/config/configtelemetry" +) + +func TestTelemetryConfiguration(t *testing.T) { + tests := []struct { + name string + cfg *Config + success bool + }{ + { + name: "Valid config", + cfg: &Config{ + Logs: LogsConfig{ + Level: zapcore.DebugLevel, + Encoding: "console", + }, + Metrics: MetricsConfig{ + Level: configtelemetry.LevelBasic, + Address: "127.0.0.1:3333", + }, + }, + success: true, + }, + { + name: "Invalid config", + cfg: &Config{ + Logs: LogsConfig{ + Level: zapcore.DebugLevel, + }, + Metrics: MetricsConfig{ + Level: configtelemetry.LevelBasic, + Address: "127.0.0.1:3333", + }, + }, + success: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + telemetry, err := New(context.Background(), Settings{ZapOptions: []zap.Option{}}, *tt.cfg) + if tt.success { + assert.NoError(t, err) + assert.NotNil(t, telemetry) + } else { + assert.Error(t, err) + assert.Nil(t, telemetry) + } + }) + } +} + +func TestSampledLogger(t *testing.T) { + tests := []struct { + name string + cfg *Config + }{ + { + name: "Default sampling", + cfg: &Config{ + Logs: LogsConfig{ + Encoding: "console", + }, + }, + }, + { + name: "Custom sampling", + cfg: &Config{ + Logs: LogsConfig{ + Level: zapcore.DebugLevel, + Encoding: "console", + Sampling: &LogsSamplingConfig{ + Enabled: true, + Tick: 1 * time.Second, + Initial: 100, + Thereafter: 100, + }, + }, + }, + }, + { + name: "Disable sampling", + cfg: &Config{ + Logs: LogsConfig{ + Level: zapcore.DebugLevel, + Encoding: "console", + Sampling: &LogsSamplingConfig{ + Enabled: false, + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + telemetry, err := New(context.Background(), Settings{ZapOptions: []zap.Option{}}, *tt.cfg) + assert.NoError(t, err) + assert.NotNil(t, telemetry) + assert.NotNil(t, telemetry.Logger()) + }) + } +}