Skip to content

Commit

Permalink
[telemetry] Enable sampling logging by default and apply it to all co…
Browse files Browse the repository at this point in the history
…mponents (#8134)

The sampled logger configuration can be disabled easily by setting the `service::telemetry::logs::sampling::enabled` to `false`.
  • Loading branch information
antonjim-te authored Sep 29, 2023
1 parent 5894888 commit 7c5ecef
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 44 deletions.
26 changes: 26 additions & 0 deletions .chloggen/SampledLoggerTelemetry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service/telemetry exporter/exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Enable sampling logging by default and apply it to all components."

# One or more tracking issues or pull requests related to the change
issues: [8134]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: The sampled logger configuration can be disabled easily by setting the `service::telemetry::logs::sampling::enabled` to `false`.


# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
37 changes: 6 additions & 31 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporte

import (
"context"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand Down Expand Up @@ -101,7 +97,7 @@ func WithTimeout(timeoutSettings TimeoutSettings) Option {
// The default RetrySettings is to disable retries.
func WithRetry(retrySettings RetrySettings) Option {
return func(o *baseExporter) {
o.retrySender = newRetrySender(o.set.ID, retrySettings, o.sampledLogger, o.onTemporaryFailure)
o.retrySender = newRetrySender(o.set.ID, retrySettings, o.set.Logger, o.onTemporaryFailure)
}
}

Expand All @@ -121,7 +117,7 @@ func WithQueue(config QueueSettings) Option {
queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler)
}
}
qs := newQueueSender(o.set.ID, o.signal, queue, o.sampledLogger)
qs := newQueueSender(o.set.ID, o.signal, queue, o.set.Logger)
o.queueSender = qs
o.setOnTemporaryFailure(qs.onTemporaryFailure)
}
Expand All @@ -146,9 +142,8 @@ type baseExporter struct {
unmarshaler internal.RequestUnmarshaler
signal component.DataType

set exporter.CreateSettings
obsrep *obsExporter
sampledLogger *zap.Logger
set exporter.CreateSettings
obsrep *obsExporter

// Chain of senders that the exporter helper applies before passing the data to the actual exporter.
// The data is handled by each sender in the respective order starting from the queueSender.
Expand Down Expand Up @@ -184,9 +179,8 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req
retrySender: &baseRequestSender{},
timeoutSender: &timeoutSender{cfg: NewDefaultTimeoutSettings()},

set: set,
obsrep: obsrep,
sampledLogger: createSampledLogger(set.Logger),
set: set,
obsrep: obsrep,
}

for _, op := range options {
Expand Down Expand Up @@ -236,22 +230,3 @@ func (be *baseExporter) setOnTemporaryFailure(onTemporaryFailure onRequestHandli
rs.onTemporaryFailure = onTemporaryFailure
}
}

func createSampledLogger(logger *zap.Logger) *zap.Logger {
if logger.Core().Enabled(zapcore.DebugLevel) {
// Debugging is enabled. Don't do any sampling.
return logger
}

// Create a logger that samples all messages to 1 per 10 seconds initially,
// and 1/100 of messages after that.
opts := zap.WrapCore(func(core zapcore.Core) zapcore.Core {
return zapcore.NewSamplerWithOptions(
core,
10*time.Second,
1,
100,
)
})
return logger.WithOptions(opts)
}
6 changes: 5 additions & 1 deletion otelcol/unmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package otelcol // import "go.opentelemetry.io/collector/otelcol"

import (
"time"

"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/config/configtelemetry"
Expand Down Expand Up @@ -45,7 +47,9 @@ func unmarshal(v *confmap.Conf, factories Factories) (*configSettings, error) {
Development: false,
Encoding: "console",
Sampling: &telemetry.LogsSamplingConfig{
Initial: 100,
Enabled: true,
Tick: 10 * time.Second,
Initial: 10,
Thereafter: 100,
},
OutputPaths: []string{"stderr"},
Expand Down
5 changes: 4 additions & 1 deletion otelcol/unmarshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package otelcol

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -45,7 +46,9 @@ func TestUnmarshalEmptyAllSections(t *testing.T) {
Development: zapProdCfg.Development,
Encoding: "console",
Sampling: &telemetry.LogsSamplingConfig{
Initial: 100,
Enabled: true,
Tick: 10 * time.Second,
Initial: 10,
Thereafter: 100,
},
DisableCaller: zapProdCfg.DisableCaller,
Expand Down
13 changes: 13 additions & 0 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,17 @@ func TestNilCollectorEffectiveConfig(t *testing.T) {
require.NoError(t, srv.Shutdown(context.Background()))
}

func TestServiceTelemetryLogger(t *testing.T) {
srv, err := New(context.Background(), newNopSettings(), newNopConfig())
require.NoError(t, err)

assert.NoError(t, srv.Start(context.Background()))
t.Cleanup(func() {
assert.NoError(t, srv.Shutdown(context.Background()))
})
assert.NotNil(t, srv.telemetrySettings.Logger)
}

func assertResourceLabels(t *testing.T, res pcommon.Resource, expectedLabels map[string]labelValue) {
for key, labelValue := range expectedLabels {
lookupKey, ok := prometheusToOtelConv[key]
Expand Down Expand Up @@ -533,6 +544,8 @@ func newNopConfigPipelineConfigs(pipelineCfgs pipelines.Config) Config {
Development: false,
Encoding: "console",
Sampling: &telemetry.LogsSamplingConfig{
Enabled: true,
Tick: 10 * time.Second,
Initial: 100,
Thereafter: 100,
},
Expand Down
19 changes: 17 additions & 2 deletions service/telemetry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package telemetry // import "go.opentelemetry.io/collector/service/telemetry"

import (
"fmt"
"time"

"go.uber.org/zap/zapcore"

Expand Down Expand Up @@ -54,7 +55,14 @@ type LogsConfig struct {
// (default = false)
DisableStacktrace bool `mapstructure:"disable_stacktrace"`

// Sampling sets a sampling policy. A nil SamplingConfig disables sampling.
// Sampling sets a sampling policy.
// Default:
// sampling:
// enabled: true
// tick: 10s
// initial: 10
// thereafter: 100
// Sampling can be disabled by setting 'enabled' to false
Sampling *LogsSamplingConfig `mapstructure:"sampling"`

// OutputPaths is a list of URLs or file paths to write logging output to.
Expand Down Expand Up @@ -91,7 +99,14 @@ type LogsConfig struct {
// global CPU and I/O load that logging puts on your process while attempting
// to preserve a representative subset of your logs.
type LogsSamplingConfig struct {
Initial int `mapstructure:"initial"`
// Enabled enable sampling logging
Enabled bool `mapstructure:"enabled"`
// Tick represents the interval in seconds that the logger apply each sampling.
Tick time.Duration `mapstructure:"tick"`
// Initial represents the first M messages logged each Tick.
Initial int `mapstructure:"initial"`
// Thereafter represents the sampling rate, every Nth message will be sampled after Initial messages are logged during each Tick.
// If Thereafter is zero, the logger will drop all the messages after the Initial each Tick.
Thereafter int `mapstructure:"thereafter"`
}

Expand Down
24 changes: 15 additions & 9 deletions service/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func newLogger(cfg LogsConfig, options []zap.Option) (*zap.Logger, error) {
zapCfg := &zap.Config{
Level: zap.NewAtomicLevelAt(cfg.Level),
Development: cfg.Development,
Sampling: toSamplingConfig(cfg.Sampling),
Encoding: cfg.Encoding,
EncoderConfig: zap.NewProductionEncoderConfig(),
OutputPaths: cfg.OutputPaths,
Expand All @@ -78,16 +77,23 @@ func newLogger(cfg LogsConfig, options []zap.Option) (*zap.Logger, error) {
if err != nil {
return nil, err
}
if cfg.Sampling != nil && cfg.Sampling.Enabled {
logger = newSampledLogger(logger, cfg.Sampling)
}

return logger, nil
}

func toSamplingConfig(sc *LogsSamplingConfig) *zap.SamplingConfig {
if sc == nil {
return nil
}
return &zap.SamplingConfig{
Initial: sc.Initial,
Thereafter: sc.Thereafter,
}
func newSampledLogger(logger *zap.Logger, sc *LogsSamplingConfig) *zap.Logger {
// Create a logger that samples every Nth message after the first M messages every S seconds
// where N = sc.Thereafter, M = sc.Initial, S = sc.Tick.
opts := zap.WrapCore(func(core zapcore.Core) zapcore.Core {
return zapcore.NewSamplerWithOptions(
core,
sc.Tick,
sc.Initial,
sc.Thereafter,
)
})
return logger.WithOptions(opts)
}
117 changes: 117 additions & 0 deletions service/telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package telemetry

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/config/configtelemetry"
)

func TestTelemetryConfiguration(t *testing.T) {
tests := []struct {
name string
cfg *Config
success bool
}{
{
name: "Valid config",
cfg: &Config{
Logs: LogsConfig{
Level: zapcore.DebugLevel,
Encoding: "console",
},
Metrics: MetricsConfig{
Level: configtelemetry.LevelBasic,
Address: "127.0.0.1:3333",
},
},
success: true,
},
{
name: "Invalid config",
cfg: &Config{
Logs: LogsConfig{
Level: zapcore.DebugLevel,
},
Metrics: MetricsConfig{
Level: configtelemetry.LevelBasic,
Address: "127.0.0.1:3333",
},
},
success: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
telemetry, err := New(context.Background(), Settings{ZapOptions: []zap.Option{}}, *tt.cfg)
if tt.success {
assert.NoError(t, err)
assert.NotNil(t, telemetry)
} else {
assert.Error(t, err)
assert.Nil(t, telemetry)
}
})
}
}

func TestSampledLogger(t *testing.T) {
tests := []struct {
name string
cfg *Config
}{
{
name: "Default sampling",
cfg: &Config{
Logs: LogsConfig{
Encoding: "console",
},
},
},
{
name: "Custom sampling",
cfg: &Config{
Logs: LogsConfig{
Level: zapcore.DebugLevel,
Encoding: "console",
Sampling: &LogsSamplingConfig{
Enabled: true,
Tick: 1 * time.Second,
Initial: 100,
Thereafter: 100,
},
},
},
},
{
name: "Disable sampling",
cfg: &Config{
Logs: LogsConfig{
Level: zapcore.DebugLevel,
Encoding: "console",
Sampling: &LogsSamplingConfig{
Enabled: false,
},
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
telemetry, err := New(context.Background(), Settings{ZapOptions: []zap.Option{}}, *tt.cfg)
assert.NoError(t, err)
assert.NotNil(t, telemetry)
assert.NotNil(t, telemetry.Logger())
})
}
}

0 comments on commit 7c5ecef

Please sign in to comment.