diff --git a/x-pack/apm-server/sampling/groups.go b/x-pack/apm-server/sampling/groups.go new file mode 100644 index 00000000000..1a825832890 --- /dev/null +++ b/x-pack/apm-server/sampling/groups.go @@ -0,0 +1,188 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package sampling + +import ( + "errors" + "math" + "math/rand" + "sync" + "time" + + "github.com/elastic/apm-server/model" +) + +const minReservoirSize = 1000 + +var errTooManyTraceGroups = errors.New("too many trace groups") + +// traceGroups maintains a collection of trace groups. +type traceGroups struct { + // defaultSamplingFraction holds the default sampling fraction to + // set for new trace groups. See traceGroup.samplingFraction. + defaultSamplingFraction float64 + + // ingestRateDecayFactor is λ, the decay factor used for calculating the + // exponentially weighted moving average ingest rate for each trace group. + ingestRateDecayFactor float64 + + // maxGroups holds the maximum number of trace groups to maintain. + // Once this is reached, all trace events will be sampled. + maxGroups int + + mu sync.RWMutex + + // TODO(axw) enable the user to define specific trace groups, along with + // a specific sampling rate. The "groups" field would then track all other + // trace groups, up to a configured maximum, using the default sampling rate. + groups map[traceGroupKey]*traceGroup +} + +func newTraceGroups( + maxGroups int, + defaultSamplingFraction float64, + ingestRateDecayFactor float64, +) *traceGroups { + return &traceGroups{ + defaultSamplingFraction: defaultSamplingFraction, + ingestRateDecayFactor: ingestRateDecayFactor, + maxGroups: maxGroups, + groups: make(map[traceGroupKey]*traceGroup), + } +} + +// traceGroup represents a single trace group, including a measurement of the +// observed ingest rate, a trace ID weighted random sampling reservoir. +type traceGroup struct { + // samplingFraction holds the configured fraction of traces in this + // trace group to sample, as a fraction in the range (0,1). + samplingFraction float64 + + mu sync.Mutex + // reservoir holds a random sample of root transactions observed + // for this trace group, weighted by duration. + reservoir *weightedRandomSample + // total holds the total number of root transactions observed for + // this trace group, including those that are not added to the + // reservoir. This is used to update ingestRate. + total int + // ingestRate holds the exponentially weighted moving average number + // of root transactions observed for this trace group per tail + // sampling interval. This is read and written only by the periodic + // finalizeSampledTraces calls. + ingestRate float64 +} + +type traceGroupKey struct { + // TODO(axw) review grouping attributes + serviceName string + transactionName string + transactionOutcome string +} + +// sampleTrace will return true if the root transaction is admitted to +// the in-memory sampling reservoir, and false otherwise. +// +// If the transaction is not admitted due to the transaction group limit +// having been reached, sampleTrace will return errTooManyTraceGroups. +func (g *traceGroups) sampleTrace(tx *model.Transaction) (bool, error) { + key := traceGroupKey{ + serviceName: tx.Metadata.Service.Name, + transactionName: tx.Name, + transactionOutcome: tx.Outcome, + } + + // First attempt to locate the group with a read lock, to avoid + // contention in the common case that a group has already been + // defined. + g.mu.RLock() + group, ok := g.groups[key] + if ok { + defer g.mu.RUnlock() + group.mu.Lock() + defer group.mu.Unlock() + group.total++ + return group.reservoir.Sample(tx.Duration, tx.TraceID), nil + } + g.mu.RUnlock() + + g.mu.Lock() + defer g.mu.Unlock() + group, ok = g.groups[key] + if ok { + // We've got a write lock on g.mu, no need to lock group too. + group.total++ + return group.reservoir.Sample(tx.Duration, tx.TraceID), nil + } else if len(g.groups) == g.maxGroups { + return false, errTooManyTraceGroups + } + + group = &traceGroup{ + samplingFraction: g.defaultSamplingFraction, + total: 1, + reservoir: newWeightedRandomSample( + rand.New(rand.NewSource(time.Now().UnixNano())), + minReservoirSize, + ), + } + group.reservoir.Sample(tx.Duration, tx.TraceID) + g.groups[key] = group + return true, nil +} + +// finalizeSampledTraces locks the groups, appends their current trace IDs to +// traceIDs, and returns the extended slice. On return the groups' sampling +// reservoirs will be reset. +// +// If the maximum number of groups has been reached, then any groups with the +// minimum reservoir size (low ingest or sampling rate) may be removed. These +// groups may also be removed if they have seen no activity in this interval. +func (g *traceGroups) finalizeSampledTraces(traceIDs []string) []string { + g.mu.Lock() + defer g.mu.Unlock() + maxGroupsReached := len(g.groups) == g.maxGroups + for key, group := range g.groups { + total := group.total + traceIDs = group.finalizeSampledTraces(traceIDs, g.ingestRateDecayFactor) + if group.reservoir.Size() == minReservoirSize { + if total == 0 || maxGroupsReached { + delete(g.groups, key) + } + } + } + return traceIDs +} + +// finalizeSampledTraces appends the group's current trace IDs to traceIDs, and +// returns the extended slice. On return the groups' sampling reservoirs will be +// reset. +func (g *traceGroup) finalizeSampledTraces(traceIDs []string, ingestRateDecayFactor float64) []string { + if g.ingestRate == 0 { + g.ingestRate = float64(g.total) + } else { + g.ingestRate *= 1 - ingestRateDecayFactor + g.ingestRate += ingestRateDecayFactor * float64(g.total) + } + desiredTotal := int(math.Round(g.samplingFraction * float64(g.total))) + g.total = 0 + + for n := g.reservoir.Len(); n > desiredTotal; n-- { + // The reservoir is larger than the desired fraction of the + // observed total number of traces in this interval. Pop the + // lowest weighted traces to limit to the desired total. + g.reservoir.Pop() + } + traceIDs = append(traceIDs, g.reservoir.Values()...) + + // Resize the reservoir, so that it can hold the desired fraction of + // the observed ingest rate. + newReservoirSize := int(math.Round(g.samplingFraction * g.ingestRate)) + if newReservoirSize < minReservoirSize { + newReservoirSize = minReservoirSize + } + g.reservoir.Reset() + g.reservoir.Resize(newReservoirSize) + return traceIDs +} diff --git a/x-pack/apm-server/sampling/groups_test.go b/x-pack/apm-server/sampling/groups_test.go new file mode 100644 index 00000000000..acd52bca6af --- /dev/null +++ b/x-pack/apm-server/sampling/groups_test.go @@ -0,0 +1,165 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package sampling + +import ( + "fmt" + "testing" + + "github.com/gofrs/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/apm-server/model" +) + +func TestTraceGroups(t *testing.T) { + const ( + maxGroups = 100 + defaultSamplingFraction = 1.0 + ingestRateCoefficient = 1.0 + ) + + groups := newTraceGroups(maxGroups, defaultSamplingFraction, ingestRateCoefficient) + for i := 0; i < maxGroups; i++ { + transactionName := fmt.Sprintf("transaction_group_%d", i) + for i := 0; i < minReservoirSize; i++ { + admitted, err := groups.sampleTrace(&model.Transaction{ + Name: transactionName, + TraceID: uuid.Must(uuid.NewV4()).String(), + ID: uuid.Must(uuid.NewV4()).String(), + }) + require.NoError(t, err) + assert.True(t, admitted) + } + } + + admitted, err := groups.sampleTrace(&model.Transaction{ + Name: "overflow", + TraceID: uuid.Must(uuid.NewV4()).String(), + ID: uuid.Must(uuid.NewV4()).String(), + }) + assert.Equal(t, errTooManyTraceGroups, err) + assert.False(t, admitted) +} + +func TestTraceGroupReservoirResize(t *testing.T) { + const ( + maxGroups = 1 + defaultSamplingFraction = 0.2 + ingestRateCoefficient = 0.75 + ) + groups := newTraceGroups(maxGroups, defaultSamplingFraction, ingestRateCoefficient) + + sendTransactions := func(n int) { + for i := 0; i < n; i++ { + groups.sampleTrace(&model.Transaction{ + TraceID: "0102030405060708090a0b0c0d0e0f10", + ID: "0102030405060708", + }) + } + } + + // All groups start out with a reservoir size of 1000. + sendTransactions(10000) + assert.Len(t, groups.finalizeSampledTraces(nil), 1000) // initial reservoir size + + // We sent 10000 initially, and we send 20000 each subsequent iteration. + // The number of sampled trace IDs will converge on 4000 (0.2*20000). + for i, expected := range []int{ + 2000, // 0.2 * 10000 (initial ingest rate) + 3500, // 0.2 * (0.25*10000 + 0.75*20000) + 3875, // 0.2 * (0.25*17500 + 0.75*20000) + 3969, // etc. + 3992, + 3998, + 4000, + 4000, + } { + sendTransactions(20000) + assert.Len(t, groups.finalizeSampledTraces(nil), expected, fmt.Sprintf("iteration %d", i)) + } +} + +func TestTraceGroupReservoirResizeMinimum(t *testing.T) { + const ( + maxGroups = 1 + defaultSamplingFraction = 0.1 + ingestRateCoefficient = 1.0 + ) + groups := newTraceGroups(maxGroups, defaultSamplingFraction, ingestRateCoefficient) + + sendTransactions := func(n int) { + for i := 0; i < n; i++ { + groups.sampleTrace(&model.Transaction{ + TraceID: "0102030405060708090a0b0c0d0e0f10", + ID: "0102030405060708", + }) + } + } + + sendTransactions(10000) + assert.Len(t, groups.finalizeSampledTraces(nil), 1000) // initial reservoir size + + // The reservoir would normally be resized to fit the desired sampling + // rate, but will never be resized to less than the minimum (1000). + sendTransactions(1000) + assert.Len(t, groups.finalizeSampledTraces(nil), 100) + + sendTransactions(10000) + assert.Len(t, groups.finalizeSampledTraces(nil), 1000) // min reservoir size +} + +func TestTraceGroupsRemoval(t *testing.T) { + const ( + maxGroups = 2 + defaultSamplingFraction = 0.5 + ingestRateCoefficient = 1.0 + ) + groups := newTraceGroups(maxGroups, defaultSamplingFraction, ingestRateCoefficient) + + for i := 0; i < 10000; i++ { + _, err := groups.sampleTrace(&model.Transaction{Name: "many"}) + assert.NoError(t, err) + } + _, err := groups.sampleTrace(&model.Transaction{Name: "few"}) + assert.NoError(t, err) + + _, err = groups.sampleTrace(&model.Transaction{Name: "another"}) + assert.Equal(t, errTooManyTraceGroups, err) + + // Finalizing should remove the "few" trace group, since its reservoir + // size is at the minimum, and the number of groups is at the maximum. + groups.finalizeSampledTraces(nil) + + // We should now be able to add another trace group. + _, err = groups.sampleTrace(&model.Transaction{Name: "another"}) + assert.NoError(t, err) +} + +func BenchmarkTraceGroups(b *testing.B) { + const ( + maxGroups = 1000 + defaultSamplingFraction = 1.0 + ingestRateCoefficient = 1.0 + ) + groups := newTraceGroups(maxGroups, defaultSamplingFraction, ingestRateCoefficient) + + b.RunParallel(func(pb *testing.PB) { + // Transaction identifiers are different for each goroutine, simulating + // multiple agentss. This should demonstrate low contention. + // + // Duration is non-zero to ensure transactions have a non-zero chance of + // being sampled. + tx := model.Transaction{ + Duration: 1000, + Name: uuid.Must(uuid.NewV4()).String(), + } + for pb.Next() { + groups.sampleTrace(&tx) + tx.Duration += 1000 + } + }) +} diff --git a/x-pack/apm-server/sampling/reservoir.go b/x-pack/apm-server/sampling/reservoir.go new file mode 100644 index 00000000000..535cb4d4808 --- /dev/null +++ b/x-pack/apm-server/sampling/reservoir.go @@ -0,0 +1,130 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package sampling + +import ( + "container/heap" + "math" + "math/rand" +) + +// weightedRandomSample provides a weighted, random, reservoir sampling +// implementing Algorithm A-Res (Algorithm A with Reservoir). +// +// See https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_A-Res +type weightedRandomSample struct { + rng *rand.Rand + itemheap +} + +// newWeightedRandomSample constructs a new weighted random sampler, +// with the given random number generator and reservoir size. +func newWeightedRandomSample(rng *rand.Rand, reservoirSize int) *weightedRandomSample { + return &weightedRandomSample{ + rng: rng, + itemheap: itemheap{ + keys: make([]float64, 0, reservoirSize), + values: make([]string, 0, reservoirSize), + }, + } +} + +// Sample records a trace ID with a random probability, proportional to +// the given weight in the range [0, math.MaxFloat64]. +func (s *weightedRandomSample) Sample(weight float64, traceID string) bool { + k := math.Pow(s.rng.Float64(), 1/weight) + if len(s.values) < cap(s.values) { + heap.Push(&s.itemheap, item{key: k, value: traceID}) + return true + } + if k > s.keys[0] { + s.keys[0] = k + s.values[0] = traceID + heap.Fix(&s.itemheap, 0) + return true + } + return false +} + +// Reset clears the current values, retaining the underlying storage space. +func (s *weightedRandomSample) Reset() { + s.keys = s.keys[:0] + s.values = s.values[:0] +} + +// Size returns the reservoir capacity. +func (s *weightedRandomSample) Size() int { + return cap(s.keys) +} + +// Resize resizes the reservoir capacity to n. +// +// Resize is not guaranteed to retain the items with the greatest weight, +// due to randomisation. +func (s *weightedRandomSample) Resize(n int) { + if n > cap(s.keys) { + // Increase capacity by copying into a new slice. + keys := make([]float64, len(s.keys), n) + values := make([]string, len(s.values), n) + copy(keys, s.keys) + copy(values, s.values) + s.keys = keys + s.values = values + } else if cap(s.keys) > n { + for len(s.keys) > n { + heap.Pop(&s.itemheap) + } + s.keys = s.keys[0:len(s.keys):n] + s.values = s.values[0:len(s.values):n] + } +} + +// Pop removes the trace ID with the lowest weight. +// Pop panics when called on an empty reservoir. +func (s *weightedRandomSample) Pop() string { + item := heap.Pop(&s.itemheap).(item) + return item.value +} + +// Values returns a copy of at most n of the currently sampled trace IDs. +func (s *weightedRandomSample) Values() []string { + values := make([]string, len(s.values)) + copy(values, s.values) + return values +} + +type itemheap struct { + keys []float64 + values []string +} + +type item struct { + key float64 + value string +} + +func (h itemheap) Len() int { return len(h.keys) } +func (h itemheap) Less(i, j int) bool { return h.keys[i] < h.keys[j] } +func (h itemheap) Swap(i, j int) { + h.keys[i], h.keys[j] = h.keys[j], h.keys[i] + h.values[i], h.values[j] = h.values[j], h.values[i] +} + +func (h *itemheap) Push(x interface{}) { + item := x.(item) + h.keys = append(h.keys, item.key) + h.values = append(h.values, item.value) +} + +func (h *itemheap) Pop() interface{} { + n := len(h.keys) + item := item{ + key: h.keys[n-1], + value: h.values[n-1], + } + h.keys = h.keys[:n-1] + h.values = h.values[:n-1] + return item +} diff --git a/x-pack/apm-server/sampling/reservoir_test.go b/x-pack/apm-server/sampling/reservoir_test.go new file mode 100644 index 00000000000..26a0bf86311 --- /dev/null +++ b/x-pack/apm-server/sampling/reservoir_test.go @@ -0,0 +1,32 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package sampling + +import ( + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestResizeReservoir(t *testing.T) { + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + res := newWeightedRandomSample(rng, 2) + res.Sample(1, "a") + res.Sample(2, "b") + assert.Len(t, res.Values(), 2) + res.Resize(1) + assert.Len(t, res.Values(), 1) +} + +func TestResetReservoir(t *testing.T) { + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + res := newWeightedRandomSample(rng, 2) + res.Sample(1, "a") + res.Sample(2, "b") + res.Reset() + assert.Len(t, res.Values(), 0) +}