Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding absolute sampling middleware #6

Merged
merged 2 commits into from
Oct 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 57 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,15 @@ The sampling middleware can be used standalone or with `slog-multi` helper.
3 strategies are available:
- [Uniform sampling](#uniform-sampling): drop % of logs
- [Threshold sampling](#threshold-sampling): drop % of logs after a threshold
- [Absolute sampling](#absolute-sampling): dynamic sampling beyond max accepted logs throughput
- [Custom sampler](#custom-sampler)

Similar log records can be deduplicated and rate limited using the `Matcher` API.

A combination of multiple sampling strategies can be chained. Eg:
- drop when a single log message is produced more than 100 times per second
- drop above 1000 log records per second (globally)

### Uniform sampling

```go
Expand All @@ -89,13 +96,6 @@ import (
option := slogsampling.UniformSamplingOption{
// The sample rate for sampling traces in the range [0.0, 1.0].
Rate: 0.33,

OnAccepted: func(context.Context, slog.Record) {
// ...
},
OnDropped: func(context.Context, slog.Record) {
// ...
},
}

logger := slog.New(
Expand All @@ -109,7 +109,7 @@ logger := slog.New(

```go
type ThresholdSamplingOption struct {
// This will log the first `Threshold` log entries with the same hash.
// This will log the first `Threshold` log entries with the same hash,
// in a `Tick` interval as-is. Following that, it will allow `Rate` in the range [0.0, 1.0].
Tick time.Duration
Threshold uint64
Expand Down Expand Up @@ -140,13 +140,56 @@ option := slogsampling.ThresholdSamplingOption{
Tick: 5 * time.Second,
Threshold: 10,
Rate: 10,
}

OnAccepted: func(context.Context, slog.Record) {
// ...
},
OnDropped: func(context.Context, slog.Record) {
// ...
},
logger := slog.New(
slogmulti.
Pipe(option.NewMiddleware()).
Handler(slog.NewJSONHandler(os.Stdout, nil)),
)
```

Available `Matcher`:
- `slogsampling.MatchByLevelAndMessage` (default)
- `slogsampling.MatchAll`
- `slogsampling.MatchByLevel`
- `slogsampling.MatchByMessage`
- `slogsampling.MatchBySource`
- `slogsampling.MatchByAttribute`
- `slogsampling.MatchByContextValue`

### Absolute sampling

```go
type AbsoluteSamplingOption struct {
// This will log all entries with the same hash until max is reached,
// in a `Tick` interval as-is. Following that, it will reduce log throughput
// depending on previous interval.
Tick time.Duration
Max uint64

// Group similar logs (default: by level and message)
Matcher Matcher

// Optional hooks
OnAccepted func(context.Context, slog.Record)
OnDropped func(context.Context, slog.Record)
}
```

Using `slog-multi`:

```go
import (
slogmulti "github.com/samber/slog-multi"
slogsampling "github.com/samber/slog-sampling"
"log/slog"
)

// Will print the first 10 entries having the same level+message during the first 5s, then a fraction of messages during the following intervals.
option := slogsampling.AbsoluteSamplingOption{
Tick: 5 * time.Second,
Max: 10,
}

logger := slog.New(
Expand Down Expand Up @@ -203,13 +246,6 @@ option := slogsampling.CustomSamplingOption{
return 0.01
}
},

OnAccepted: func(context.Context, slog.Record) {
// ...
},
OnDropped: func(context.Context, slog.Record) {
// ...
},
}

logger := slog.New(
Expand Down
41 changes: 39 additions & 2 deletions counters.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package slogsampling
import (
"sync/atomic"
"time"

"github.com/samber/lo"
)

func newCounter() *counter {
Expand All @@ -17,8 +19,9 @@ type counter struct {
counter atomic.Uint64
}

func (c *counter) Inc(t time.Time, tick time.Duration) uint64 {
tn := t.UnixNano()
func (c *counter) Inc(tick time.Duration) uint64 {
// i prefer not using record.Time, because only the sampling middleware time is relevant
tn := time.Now().UnixNano()
resetAfter := c.resetAt.Load()
if resetAfter > tn {
return c.counter.Add(1)
Expand All @@ -35,3 +38,37 @@ func (c *counter) Inc(t time.Time, tick time.Duration) uint64 {

return 1
}

func newCounterWithMemory() *counterWithMemory {
c := &counterWithMemory{
resetAtAndPreviousCounter: atomic.Pointer[lo.Tuple2[int64, uint64]]{},
counter: atomic.Uint64{},
}
c.resetAtAndPreviousCounter.Store(lo.ToPtr(lo.T2(int64(0), uint64(0))))
return c
}

type counterWithMemory struct {
resetAtAndPreviousCounter atomic.Pointer[lo.Tuple2[int64, uint64]] // it would be more memory-efficient with a dedicated struct, but i'm lazy
counter atomic.Uint64
}

func (c *counterWithMemory) Inc(tick time.Duration) (n uint64, previousCycle uint64) {
// i prefer not using record.Time, because only the sampling middleware time is relevant
tn := time.Now().UnixNano()
resetAtAndPreviousCounter := c.resetAtAndPreviousCounter.Load()
if resetAtAndPreviousCounter.A > tn {
return c.counter.Add(1), resetAtAndPreviousCounter.B
}

old := c.counter.Swap(1)

newResetAfter := lo.T2(tn+tick.Nanoseconds(), old)
if !c.resetAtAndPreviousCounter.CompareAndSwap(resetAtAndPreviousCounter, lo.ToPtr(newResetAfter)) {
// We raced with another goroutine trying to reset, and it also reset
// the counter to 1, so we need to reincrement the counter.
return c.counter.Add(1), resetAtAndPreviousCounter.B // we should load again instead of returning this outdated value, but it's not a big deal
}

return 1, resetAtAndPreviousCounter.B
}
16 changes: 16 additions & 0 deletions example/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,22 @@ func main() {
// },
// }

// option := slogsampling.AbsoluteSamplingOption{
// Tick: 5 * time.Second,
// Max: 10,

// Matcher: func(ctx context.Context, record *slog.Record) string {
// return record.Level.String()
// },

// OnAccepted: func(context.Context, slog.Record) {
// accepted.Add(1)
// },
// OnDropped: func(context.Context, slog.Record) {
// dropped.Add(1)
// },
// }

logger := slog.New(
slogmulti.
Pipe(option.NewMiddleware()).
Expand Down
72 changes: 72 additions & 0 deletions middleware_absolute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package slogsampling

import (
"context"
"log/slog"
"math/rand"
"time"

"github.com/cornelk/hashmap"
slogmulti "github.com/samber/slog-multi"
)

type AbsoluteSamplingOption struct {
// This will log all entries with the same hash until max is reached,
// in a `Tick` interval as-is. Following that, it will reduce log throughput
// depending on previous interval.
Tick time.Duration
Max uint64

// Group similar logs (default: by level and message)
Matcher Matcher

// Optional hooks
OnAccepted func(context.Context, slog.Record)
OnDropped func(context.Context, slog.Record)
}

// NewMiddleware returns a slog-multi middleware.
func (o AbsoluteSamplingOption) NewMiddleware() slogmulti.Middleware {
if o.Max == 0 {
panic("unexpected Max: must be greater than 0")
}

if o.Matcher == nil {
o.Matcher = DefaultMatcher
}

rand := rand.New(rand.NewSource(time.Now().UnixNano()))
counters := hashmap.New[string, *counterWithMemory]() // @TODO: implement LRU or LFU draining

return slogmulti.NewInlineMiddleware(
func(ctx context.Context, level slog.Level, next func(context.Context, slog.Level) bool) bool {
return next(ctx, level)
},
func(ctx context.Context, record slog.Record, next func(context.Context, slog.Record) error) error {
key := o.Matcher(ctx, &record)

c, _ := counters.GetOrInsert(key, newCounterWithMemory())

n, p := c.Inc(o.Tick)

// 3 cases:
// - current interval is over threshold but not previous -> drop
// - previous interval is over threshold -> apply rate limit
// - none of current and previous intervals are over threshold -> accept

if (n > o.Max && p <= o.Max) || (p > o.Max && rand.Float64() >= float64(o.Max)/float64(p)) {
hook(o.OnDropped, ctx, record)
return nil
}

hook(o.OnAccepted, ctx, record)
return next(ctx, record)
},
func(attrs []slog.Attr, next func([]slog.Attr) slog.Handler) slog.Handler {
return next(attrs)
},
func(name string, next func(string) slog.Handler) slog.Handler {
return next(name)
},
)
}
4 changes: 2 additions & 2 deletions middleware_threshold.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

type ThresholdSamplingOption struct {
// This will log the first `Threshold` log entries with the same hash.
// This will log the first `Threshold` log entries with the same hash,
// in a `Tick` interval as-is. Following that, it will allow `Rate` in the range [0.0, 1.0].
Tick time.Duration
Threshold uint64
Expand Down Expand Up @@ -48,7 +48,7 @@ func (o ThresholdSamplingOption) NewMiddleware() slogmulti.Middleware {

c, _ := counters.GetOrInsert(key, newCounter())

n := c.Inc(record.Time, o.Tick)
n := c.Inc(o.Tick)
if n > o.Threshold && rand.Float64() >= o.Rate {
hook(o.OnDropped, ctx, record)
return nil
Expand Down
Loading