diff --git a/README.md b/README.md index 010cf3f..ab1cbb2 100644 --- a/README.md +++ b/README.md @@ -61,8 +61,15 @@ The sampling middleware can be used standalone or with `slog-multi` helper. 3 strategies are available: - [Uniform sampling](#uniform-sampling): drop % of logs - [Threshold sampling](#threshold-sampling): drop % of logs after a threshold +- [Absolute sampling](#absolute-sampling): dynamic sampling beyond max accepted logs throughput - [Custom sampler](#custom-sampler) +Similar log records can be deduplicated and rate limited using the `Matcher` API. + +A combination of multiple sampling strategies can be chained. Eg: +- drop when a single log message is produced more than 100 times per second +- drop above 1000 log records per second (globally) + ### Uniform sampling ```go @@ -89,13 +96,6 @@ import ( option := slogsampling.UniformSamplingOption{ // The sample rate for sampling traces in the range [0.0, 1.0]. Rate: 0.33, - - OnAccepted: func(context.Context, slog.Record) { - // ... - }, - OnDropped: func(context.Context, slog.Record) { - // ... - }, } logger := slog.New( @@ -109,7 +109,7 @@ logger := slog.New( ```go type ThresholdSamplingOption struct { - // This will log the first `Threshold` log entries with the same hash. + // This will log the first `Threshold` log entries with the same hash, // in a `Tick` interval as-is. Following that, it will allow `Rate` in the range [0.0, 1.0]. Tick time.Duration Threshold uint64 @@ -140,13 +140,56 @@ option := slogsampling.ThresholdSamplingOption{ Tick: 5 * time.Second, Threshold: 10, Rate: 10, +} - OnAccepted: func(context.Context, slog.Record) { - // ... - }, - OnDropped: func(context.Context, slog.Record) { - // ... - }, +logger := slog.New( + slogmulti. + Pipe(option.NewMiddleware()). + Handler(slog.NewJSONHandler(os.Stdout, nil)), +) +``` + +Available `Matcher`: +- `slogsampling.MatchByLevelAndMessage` (default) +- `slogsampling.MatchAll` +- `slogsampling.MatchByLevel` +- `slogsampling.MatchByMessage` +- `slogsampling.MatchBySource` +- `slogsampling.MatchByAttribute` +- `slogsampling.MatchByContextValue` + +### Absolute sampling + +```go +type AbsoluteSamplingOption struct { + // This will log all entries with the same hash until max is reached, + // in a `Tick` interval as-is. Following that, it will reduce log throughput + // depending on previous interval. + Tick time.Duration + Max uint64 + + // Group similar logs (default: by level and message) + Matcher Matcher + + // Optional hooks + OnAccepted func(context.Context, slog.Record) + OnDropped func(context.Context, slog.Record) +} +``` + +Using `slog-multi`: + +```go +import ( + slogmulti "github.com/samber/slog-multi" + slogsampling "github.com/samber/slog-sampling" + "log/slog" +) + +// Will print the first 10 entries having the same level+message during the first 5s, then a fraction of messages during the following intervals. +option := slogsampling.AbsoluteSamplingOption{ + Tick: 5 * time.Second, + Max: 10, } logger := slog.New( @@ -203,13 +246,6 @@ option := slogsampling.CustomSamplingOption{ return 0.01 } }, - - OnAccepted: func(context.Context, slog.Record) { - // ... - }, - OnDropped: func(context.Context, slog.Record) { - // ... - }, } logger := slog.New( diff --git a/counters.go b/counters.go index 1e1a004..90dfb91 100644 --- a/counters.go +++ b/counters.go @@ -3,6 +3,8 @@ package slogsampling import ( "sync/atomic" "time" + + "github.com/samber/lo" ) func newCounter() *counter { @@ -17,8 +19,9 @@ type counter struct { counter atomic.Uint64 } -func (c *counter) Inc(t time.Time, tick time.Duration) uint64 { - tn := t.UnixNano() +func (c *counter) Inc(tick time.Duration) uint64 { + // i prefer not using record.Time, because only the sampling middleware time is relevant + tn := time.Now().UnixNano() resetAfter := c.resetAt.Load() if resetAfter > tn { return c.counter.Add(1) @@ -35,3 +38,37 @@ func (c *counter) Inc(t time.Time, tick time.Duration) uint64 { return 1 } + +func newCounterWithMemory() *counterWithMemory { + c := &counterWithMemory{ + resetAtAndPreviousCounter: atomic.Pointer[lo.Tuple2[int64, uint64]]{}, + counter: atomic.Uint64{}, + } + c.resetAtAndPreviousCounter.Store(lo.ToPtr(lo.T2(int64(0), uint64(0)))) + return c +} + +type counterWithMemory struct { + resetAtAndPreviousCounter atomic.Pointer[lo.Tuple2[int64, uint64]] // it would be more memory-efficient with a dedicated struct, but i'm lazy + counter atomic.Uint64 +} + +func (c *counterWithMemory) Inc(tick time.Duration) (n uint64, previousCycle uint64) { + // i prefer not using record.Time, because only the sampling middleware time is relevant + tn := time.Now().UnixNano() + resetAtAndPreviousCounter := c.resetAtAndPreviousCounter.Load() + if resetAtAndPreviousCounter.A > tn { + return c.counter.Add(1), resetAtAndPreviousCounter.B + } + + old := c.counter.Swap(1) + + newResetAfter := lo.T2(tn+tick.Nanoseconds(), old) + if !c.resetAtAndPreviousCounter.CompareAndSwap(resetAtAndPreviousCounter, lo.ToPtr(newResetAfter)) { + // We raced with another goroutine trying to reset, and it also reset + // the counter to 1, so we need to reincrement the counter. + return c.counter.Add(1), resetAtAndPreviousCounter.B // we should load again instead of returning this outdated value, but it's not a big deal + } + + return 1, resetAtAndPreviousCounter.B +} diff --git a/example/example.go b/example/example.go index e9ce349..0163ea5 100644 --- a/example/example.go +++ b/example/example.go @@ -63,6 +63,22 @@ func main() { // }, // } + // option := slogsampling.AbsoluteSamplingOption{ + // Tick: 5 * time.Second, + // Max: 10, + + // Matcher: func(ctx context.Context, record *slog.Record) string { + // return record.Level.String() + // }, + + // OnAccepted: func(context.Context, slog.Record) { + // accepted.Add(1) + // }, + // OnDropped: func(context.Context, slog.Record) { + // dropped.Add(1) + // }, + // } + logger := slog.New( slogmulti. Pipe(option.NewMiddleware()). diff --git a/middleware_absolute.go b/middleware_absolute.go new file mode 100644 index 0000000..b63755b --- /dev/null +++ b/middleware_absolute.go @@ -0,0 +1,72 @@ +package slogsampling + +import ( + "context" + "log/slog" + "math/rand" + "time" + + "github.com/cornelk/hashmap" + slogmulti "github.com/samber/slog-multi" +) + +type AbsoluteSamplingOption struct { + // This will log all entries with the same hash until max is reached, + // in a `Tick` interval as-is. Following that, it will reduce log throughput + // depending on previous interval. + Tick time.Duration + Max uint64 + + // Group similar logs (default: by level and message) + Matcher Matcher + + // Optional hooks + OnAccepted func(context.Context, slog.Record) + OnDropped func(context.Context, slog.Record) +} + +// NewMiddleware returns a slog-multi middleware. +func (o AbsoluteSamplingOption) NewMiddleware() slogmulti.Middleware { + if o.Max == 0 { + panic("unexpected Max: must be greater than 0") + } + + if o.Matcher == nil { + o.Matcher = DefaultMatcher + } + + rand := rand.New(rand.NewSource(time.Now().UnixNano())) + counters := hashmap.New[string, *counterWithMemory]() // @TODO: implement LRU or LFU draining + + return slogmulti.NewInlineMiddleware( + func(ctx context.Context, level slog.Level, next func(context.Context, slog.Level) bool) bool { + return next(ctx, level) + }, + func(ctx context.Context, record slog.Record, next func(context.Context, slog.Record) error) error { + key := o.Matcher(ctx, &record) + + c, _ := counters.GetOrInsert(key, newCounterWithMemory()) + + n, p := c.Inc(o.Tick) + + // 3 cases: + // - current interval is over threshold but not previous -> drop + // - previous interval is over threshold -> apply rate limit + // - none of current and previous intervals are over threshold -> accept + + if (n > o.Max && p <= o.Max) || (p > o.Max && rand.Float64() >= float64(o.Max)/float64(p)) { + hook(o.OnDropped, ctx, record) + return nil + } + + hook(o.OnAccepted, ctx, record) + return next(ctx, record) + }, + func(attrs []slog.Attr, next func([]slog.Attr) slog.Handler) slog.Handler { + return next(attrs) + }, + func(name string, next func(string) slog.Handler) slog.Handler { + return next(name) + }, + ) +} diff --git a/middleware_threshold.go b/middleware_threshold.go index 9266b6a..5c3401a 100644 --- a/middleware_threshold.go +++ b/middleware_threshold.go @@ -12,7 +12,7 @@ import ( ) type ThresholdSamplingOption struct { - // This will log the first `Threshold` log entries with the same hash. + // This will log the first `Threshold` log entries with the same hash, // in a `Tick` interval as-is. Following that, it will allow `Rate` in the range [0.0, 1.0]. Tick time.Duration Threshold uint64 @@ -48,7 +48,7 @@ func (o ThresholdSamplingOption) NewMiddleware() slogmulti.Middleware { c, _ := counters.GetOrInsert(key, newCounter()) - n := c.Inc(record.Time, o.Tick) + n := c.Inc(o.Tick) if n > o.Threshold && rand.Float64() >= o.Rate { hook(o.OnDropped, ctx, record) return nil