Skip to content

Commit

Permalink
core/sampler: Support decision hook (#813)
Browse files Browse the repository at this point in the history
This adds support for monitoring sampling decisions made by the Sampler
core with a `func(Entry, SamplingDecision)` where `SamplingDecision` is
a bit field.

To allow plumbing the hook to the sampler, this additionally deprecates
the `NewSampler` constructor in favor of `NewSamplerWithOptions`.

    type SamplerOption

    func SamplerHook(func(Entry, SamplingDecision))

    func NewSamplerWithOptions(/* ... */, opts ...SamplerOption)

This functionality is usable from the `zap` package via the new `Hook`
field of `zap.SamplingConfig`.

Refs T5056227
  • Loading branch information
shirchen authored Apr 21, 2020
1 parent b2382d7 commit 46939fc
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 18 deletions.
2 changes: 1 addition & 1 deletion benchmarks/zap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func newZapLogger(lvl zapcore.Level) *zap.Logger {
}

func newSampledLogger(lvl zapcore.Level) *zap.Logger {
return zap.New(zapcore.NewSampler(
return zap.New(zapcore.NewSamplerWithOptions(
newZapLogger(zap.DebugLevel).Core(),
100*time.Millisecond,
10, // first
Expand Down
24 changes: 19 additions & 5 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@ import (
// global CPU and I/O load that logging puts on your process while attempting
// to preserve a representative subset of your logs.
//
// Values configured here are per-second. See zapcore.NewSampler for details.
// If specified, the Sampler will invoke the Hook after each decision.
//
// Values configured here are per-second. See zapcore.NewSamplerWithOptions for
// details.
type SamplingConfig struct {
Initial int `json:"initial" yaml:"initial"`
Thereafter int `json:"thereafter" yaml:"thereafter"`
Initial int `json:"initial" yaml:"initial"`
Thereafter int `json:"thereafter" yaml:"thereafter"`
Hook func(zapcore.Entry, zapcore.SamplingDecision) `json:"-" yaml:"-"`
}

// Config offers a declarative way to construct a logger. It doesn't do
Expand Down Expand Up @@ -208,9 +212,19 @@ func (cfg Config) buildOptions(errSink zapcore.WriteSyncer) []Option {
opts = append(opts, AddStacktrace(stackLevel))
}

if cfg.Sampling != nil {
if scfg := cfg.Sampling; scfg != nil {
opts = append(opts, WrapCore(func(core zapcore.Core) zapcore.Core {
return zapcore.NewSampler(core, time.Second, int(cfg.Sampling.Initial), int(cfg.Sampling.Thereafter))
var samplerOpts []zapcore.SamplerOption
if scfg.Hook != nil {
samplerOpts = append(samplerOpts, zapcore.SamplerHook(scfg.Hook))
}
return zapcore.NewSamplerWithOptions(
core,
time.Second,
cfg.Sampling.Initial,
cfg.Sampling.Thereafter,
samplerOpts...,
)
}))
}

Expand Down
67 changes: 67 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/zap/zapcore"
)

Expand Down Expand Up @@ -144,3 +145,69 @@ func TestConfigWithMissingAttributes(t *testing.T) {
})
}
}

func makeSamplerCountingHook() (h func(zapcore.Entry, zapcore.SamplingDecision),
dropped, sampled *atomic.Int64) {
dropped = new(atomic.Int64)
sampled = new(atomic.Int64)
h = func(_ zapcore.Entry, dec zapcore.SamplingDecision) {
if dec&zapcore.LogDropped > 0 {
dropped.Inc()
} else if dec&zapcore.LogSampled > 0 {
sampled.Inc()
}
}
return h, dropped, sampled
}

func TestConfigWithSamplingHook(t *testing.T) {
shook, dcount, scount := makeSamplerCountingHook()
cfg := Config{
Level: NewAtomicLevelAt(InfoLevel),
Development: false,
Sampling: &SamplingConfig{
Initial: 100,
Thereafter: 100,
Hook: shook,
},
Encoding: "json",
EncoderConfig: NewProductionEncoderConfig(),
OutputPaths: []string{"stderr"},
ErrorOutputPaths: []string{"stderr"},
}
expectRe := `{"level":"info","caller":"zap/config_test.go:\d+","msg":"info","k":"v","z":"zz"}` + "\n" +
`{"level":"warn","caller":"zap/config_test.go:\d+","msg":"warn","k":"v","z":"zz"}` + "\n"
expectDropped := 99 // 200 - 100 initial - 1 thereafter
expectSampled := 103 // 2 from initial + 100 + 1 thereafter

temp, err := ioutil.TempFile("", "zap-prod-config-test")
require.NoError(t, err, "Failed to create temp file.")
defer func() {
err := os.Remove(temp.Name())
if err != nil {
return
}
}()

cfg.OutputPaths = []string{temp.Name()}
cfg.EncoderConfig.TimeKey = "" // no timestamps in tests
cfg.InitialFields = map[string]interface{}{"z": "zz", "k": "v"}

logger, err := cfg.Build()
require.NoError(t, err, "Unexpected error constructing logger.")

logger.Debug("debug")
logger.Info("info")
logger.Warn("warn")

byteContents, err := ioutil.ReadAll(temp)
require.NoError(t, err, "Couldn't read log contents from temp file.")
logs := string(byteContents)
assert.Regexp(t, expectRe, logs, "Unexpected log output.")

for i := 0; i < 200; i++ {
logger.Info("sampling")
}
assert.Equal(t, int64(expectDropped), dcount.Load())
assert.Equal(t, int64(expectSampled), scount.Load())
}
94 changes: 84 additions & 10 deletions zapcore/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,33 +81,104 @@ func (c *counter) IncCheckReset(t time.Time, tick time.Duration) uint64 {
return 1
}

type sampler struct {
Core
// SamplingDecision is a decision represented as a bit field made by sampler.
// More decisions may be added in the future.
type SamplingDecision uint32

counts *counters
tick time.Duration
first, thereafter uint64
const (
// LogDropped indicates that the Sampler dropped a log entry.
LogDropped SamplingDecision = 1 << iota
// LogSampled indicates that the Sampler sampled a log entry.
LogSampled
)

// optionFunc wraps a func so it satisfies the SamplerOption interface.
type optionFunc func(*sampler)

func (f optionFunc) apply(s *sampler) {
f(s)
}

// SamplerOption configures a Sampler.
type SamplerOption interface {
apply(*sampler)
}

// NewSampler creates a Core that samples incoming entries, which caps the CPU
// and I/O load of logging while attempting to preserve a representative subset
// of your logs.
// nopSamplingHook is the default hook used by sampler.
func nopSamplingHook(Entry, SamplingDecision) {}

// SamplerHook registers a function which will be called when Sampler makes a
// decision.
//
// This hook may be used to get visibility into the performance of the sampler.
// For example, use it to track metrics of dropped versus sampled logs.
//
// var dropped atomic.Int64
// zapcore.SamplerHook(func(ent zapcore.Entry, dec zapcore.SamplingDecision) {
// if dec&zapcore.LogDropped > 0 {
// dropped.Inc()
// }
// })
func SamplerHook(hook func(entry Entry, dec SamplingDecision)) SamplerOption {
return optionFunc(func(s *sampler) {
s.hook = hook
})
}

// NewSamplerWithOptions creates a Core that samples incoming entries, which
// caps the CPU and I/O load of logging while attempting to preserve a
// representative subset of your logs.
//
// Zap samples by logging the first N entries with a given level and message
// each tick. If more Entries with the same level and message are seen during
// the same interval, every Mth message is logged and the rest are dropped.
//
// Sampler can be configured to report sampling decisions with the SamplerHook
// option.
//
// Keep in mind that zap's sampling implementation is optimized for speed over
// absolute precision; under load, each tick may be slightly over- or
// under-sampled.
func NewSampler(core Core, tick time.Duration, first, thereafter int) Core {
return &sampler{
func NewSamplerWithOptions(core Core, tick time.Duration, first, thereafter int, opts ...SamplerOption) Core {
s := &sampler{
Core: core,
tick: tick,
counts: newCounters(),
first: uint64(first),
thereafter: uint64(thereafter),
hook: nopSamplingHook,
}
for _, opt := range opts {
opt.apply(s)
}

return s
}

type sampler struct {
Core

counts *counters
tick time.Duration
first, thereafter uint64
hook func(Entry, SamplingDecision)
}

// NewSampler creates a Core that samples incoming entries, which
// caps the CPU and I/O load of logging while attempting to preserve a
// representative subset of your logs.
//
// Zap samples by logging the first N entries with a given level and message
// each tick. If more Entries with the same level and message are seen during
// the same interval, every Mth message is logged and the rest are dropped.
//
// Keep in mind that zap's sampling implementation is optimized for speed over
// absolute precision; under load, each tick may be slightly over- or
// under-sampled.
//
// Deprecated: use NewSamplerWithOptions.
func NewSampler(core Core, tick time.Duration, first, thereafter int) Core {
return NewSamplerWithOptions(core, tick, first, thereafter)
}

func (s *sampler) With(fields []Field) Core {
Expand All @@ -117,6 +188,7 @@ func (s *sampler) With(fields []Field) Core {
counts: s.counts,
first: s.first,
thereafter: s.thereafter,
hook: s.hook,
}
}

Expand All @@ -128,7 +200,9 @@ func (s *sampler) Check(ent Entry, ce *CheckedEntry) *CheckedEntry {
counter := s.counts.get(ent.Level, ent.Message)
n := counter.IncCheckReset(ent.Time, s.tick)
if n > s.first && (n-s.first)%s.thereafter != 0 {
s.hook(ent, LogDropped)
return ce
}
s.hook(ent, LogSampled)
return s.Core.Check(ent, ce)
}
55 changes: 54 additions & 1 deletion zapcore/sampler_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
"go.uber.org/zap/internal/ztest"
. "go.uber.org/zap/zapcore"
)
Expand Down Expand Up @@ -203,7 +205,7 @@ var counterTestCases = [][]string{
func BenchmarkSampler_Check(b *testing.B) {
for _, keys := range counterTestCases {
b.Run(fmt.Sprintf("%v keys", len(keys)), func(b *testing.B) {
fac := NewSampler(
fac := NewSamplerWithOptions(
NewCore(
NewJSONEncoder(testEncoderConfig()),
&ztest.Discarder{},
Expand All @@ -228,3 +230,54 @@ func BenchmarkSampler_Check(b *testing.B) {
})
}
}

func makeSamplerCountingHook() (func(_ Entry, dec SamplingDecision), *atomic.Int64, *atomic.Int64) {
droppedCount := new(atomic.Int64)
sampledCount := new(atomic.Int64)
h := func(_ Entry, dec SamplingDecision) {
if dec&LogDropped > 0 {
droppedCount.Inc()
} else if dec&LogSampled > 0 {
sampledCount.Inc()
}
}
return h, droppedCount, sampledCount
}

func BenchmarkSampler_CheckWithHook(b *testing.B) {
hook, dropped, sampled := makeSamplerCountingHook()
for _, keys := range counterTestCases {
b.Run(fmt.Sprintf("%v keys", len(keys)), func(b *testing.B) {
fac := NewSamplerWithOptions(
NewCore(
NewJSONEncoder(testEncoderConfig()),
&ztest.Discarder{},
DebugLevel,
),
time.Millisecond,
1,
1000,
SamplerHook(hook),
)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
ent := Entry{
Level: DebugLevel + Level(i%4),
Message: keys[i],
}
_ = fac.Check(ent, nil)
i++
if n := len(keys); i >= n {
i -= n
}
}
})
})
}
// We expect to see 1000 dropped messages for every sampled per settings,
// with a delta due to less 1000 messages getting dropped after initial one
// is sampled.
assert.Greater(b, dropped.Load()/1000, sampled.Load()-1000)
}
3 changes: 2 additions & 1 deletion zapcore/sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (

func fakeSampler(lvl LevelEnabler, tick time.Duration, first, thereafter int) (Core, *observer.ObservedLogs) {
core, logs := observer.New(lvl)
// Keep using deprecated constructor for cc.
core = NewSampler(core, tick, first, thereafter)
return core, logs
}
Expand Down Expand Up @@ -162,7 +163,7 @@ func TestSamplerConcurrent(t *testing.T) {

tick := ztest.Timeout(10 * time.Millisecond)
cc := &countingCore{}
sampler := NewSampler(cc, tick, logsPerTick, 100000)
sampler := NewSamplerWithOptions(cc, tick, logsPerTick, 100000)

var (
done atomic.Bool
Expand Down

0 comments on commit 46939fc

Please sign in to comment.