Skip to content

Commit

Permalink
sampling: add trace group sampling reservoirs (#4182) (#4216)
Browse files Browse the repository at this point in the history
* sampling: add trace group sampling reservoirs

Introduce a type for maintaining a collection of trace groups.

For each trace group we measure the observed ingest rate for
the trace group (number of root traces per sampling interval),
and maintain a weighted random sampling reservoir for recording
trace IDs. Trace IDs are weighted by root transaction duration,
which means slower traces will have a greater chance of being
captured.

* Address self-review comments

- better name for decay factor
- fix doc comments to reflect reality
- fix criteria for removing trace groups

* Update x-pack/apm-server/sampling/groups.go

* Remove unnecessary variable assignment
  • Loading branch information
axw authored Sep 18, 2020
1 parent 171347b commit 4095bdd
Show file tree
Hide file tree
Showing 4 changed files with 515 additions and 0 deletions.
188 changes: 188 additions & 0 deletions x-pack/apm-server/sampling/groups.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package sampling

import (
"errors"
"math"
"math/rand"
"sync"
"time"

"github.com/elastic/apm-server/model"
)

const minReservoirSize = 1000

var errTooManyTraceGroups = errors.New("too many trace groups")

// traceGroups maintains a collection of trace groups.
type traceGroups struct {
// defaultSamplingFraction holds the default sampling fraction to
// set for new trace groups. See traceGroup.samplingFraction.
defaultSamplingFraction float64

// ingestRateDecayFactor is λ, the decay factor used for calculating the
// exponentially weighted moving average ingest rate for each trace group.
ingestRateDecayFactor float64

// maxGroups holds the maximum number of trace groups to maintain.
// Once this is reached, all trace events will be sampled.
maxGroups int

mu sync.RWMutex

// TODO(axw) enable the user to define specific trace groups, along with
// a specific sampling rate. The "groups" field would then track all other
// trace groups, up to a configured maximum, using the default sampling rate.
groups map[traceGroupKey]*traceGroup
}

func newTraceGroups(
maxGroups int,
defaultSamplingFraction float64,
ingestRateDecayFactor float64,
) *traceGroups {
return &traceGroups{
defaultSamplingFraction: defaultSamplingFraction,
ingestRateDecayFactor: ingestRateDecayFactor,
maxGroups: maxGroups,
groups: make(map[traceGroupKey]*traceGroup),
}
}

// traceGroup represents a single trace group, including a measurement of the
// observed ingest rate, a trace ID weighted random sampling reservoir.
type traceGroup struct {
// samplingFraction holds the configured fraction of traces in this
// trace group to sample, as a fraction in the range (0,1).
samplingFraction float64

mu sync.Mutex
// reservoir holds a random sample of root transactions observed
// for this trace group, weighted by duration.
reservoir *weightedRandomSample
// total holds the total number of root transactions observed for
// this trace group, including those that are not added to the
// reservoir. This is used to update ingestRate.
total int
// ingestRate holds the exponentially weighted moving average number
// of root transactions observed for this trace group per tail
// sampling interval. This is read and written only by the periodic
// finalizeSampledTraces calls.
ingestRate float64
}

type traceGroupKey struct {
// TODO(axw) review grouping attributes
serviceName string
transactionName string
transactionOutcome string
}

// sampleTrace will return true if the root transaction is admitted to
// the in-memory sampling reservoir, and false otherwise.
//
// If the transaction is not admitted due to the transaction group limit
// having been reached, sampleTrace will return errTooManyTraceGroups.
func (g *traceGroups) sampleTrace(tx *model.Transaction) (bool, error) {
key := traceGroupKey{
serviceName: tx.Metadata.Service.Name,
transactionName: tx.Name,
transactionOutcome: tx.Outcome,
}

// First attempt to locate the group with a read lock, to avoid
// contention in the common case that a group has already been
// defined.
g.mu.RLock()
group, ok := g.groups[key]
if ok {
defer g.mu.RUnlock()
group.mu.Lock()
defer group.mu.Unlock()
group.total++
return group.reservoir.Sample(tx.Duration, tx.TraceID), nil
}
g.mu.RUnlock()

g.mu.Lock()
defer g.mu.Unlock()
group, ok = g.groups[key]
if ok {
// We've got a write lock on g.mu, no need to lock group too.
group.total++
return group.reservoir.Sample(tx.Duration, tx.TraceID), nil
} else if len(g.groups) == g.maxGroups {
return false, errTooManyTraceGroups
}

group = &traceGroup{
samplingFraction: g.defaultSamplingFraction,
total: 1,
reservoir: newWeightedRandomSample(
rand.New(rand.NewSource(time.Now().UnixNano())),
minReservoirSize,
),
}
group.reservoir.Sample(tx.Duration, tx.TraceID)
g.groups[key] = group
return true, nil
}

// finalizeSampledTraces locks the groups, appends their current trace IDs to
// traceIDs, and returns the extended slice. On return the groups' sampling
// reservoirs will be reset.
//
// If the maximum number of groups has been reached, then any groups with the
// minimum reservoir size (low ingest or sampling rate) may be removed. These
// groups may also be removed if they have seen no activity in this interval.
func (g *traceGroups) finalizeSampledTraces(traceIDs []string) []string {
g.mu.Lock()
defer g.mu.Unlock()
maxGroupsReached := len(g.groups) == g.maxGroups
for key, group := range g.groups {
total := group.total
traceIDs = group.finalizeSampledTraces(traceIDs, g.ingestRateDecayFactor)
if group.reservoir.Size() == minReservoirSize {
if total == 0 || maxGroupsReached {
delete(g.groups, key)
}
}
}
return traceIDs
}

// finalizeSampledTraces appends the group's current trace IDs to traceIDs, and
// returns the extended slice. On return the groups' sampling reservoirs will be
// reset.
func (g *traceGroup) finalizeSampledTraces(traceIDs []string, ingestRateDecayFactor float64) []string {
if g.ingestRate == 0 {
g.ingestRate = float64(g.total)
} else {
g.ingestRate *= 1 - ingestRateDecayFactor
g.ingestRate += ingestRateDecayFactor * float64(g.total)
}
desiredTotal := int(math.Round(g.samplingFraction * float64(g.total)))
g.total = 0

for n := g.reservoir.Len(); n > desiredTotal; n-- {
// The reservoir is larger than the desired fraction of the
// observed total number of traces in this interval. Pop the
// lowest weighted traces to limit to the desired total.
g.reservoir.Pop()
}
traceIDs = append(traceIDs, g.reservoir.Values()...)

// Resize the reservoir, so that it can hold the desired fraction of
// the observed ingest rate.
newReservoirSize := int(math.Round(g.samplingFraction * g.ingestRate))
if newReservoirSize < minReservoirSize {
newReservoirSize = minReservoirSize
}
g.reservoir.Reset()
g.reservoir.Resize(newReservoirSize)
return traceIDs
}
165 changes: 165 additions & 0 deletions x-pack/apm-server/sampling/groups_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package sampling

import (
"fmt"
"testing"

"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/apm-server/model"
)

func TestTraceGroups(t *testing.T) {
const (
maxGroups = 100
defaultSamplingFraction = 1.0
ingestRateCoefficient = 1.0
)

groups := newTraceGroups(maxGroups, defaultSamplingFraction, ingestRateCoefficient)
for i := 0; i < maxGroups; i++ {
transactionName := fmt.Sprintf("transaction_group_%d", i)
for i := 0; i < minReservoirSize; i++ {
admitted, err := groups.sampleTrace(&model.Transaction{
Name: transactionName,
TraceID: uuid.Must(uuid.NewV4()).String(),
ID: uuid.Must(uuid.NewV4()).String(),
})
require.NoError(t, err)
assert.True(t, admitted)
}
}

admitted, err := groups.sampleTrace(&model.Transaction{
Name: "overflow",
TraceID: uuid.Must(uuid.NewV4()).String(),
ID: uuid.Must(uuid.NewV4()).String(),
})
assert.Equal(t, errTooManyTraceGroups, err)
assert.False(t, admitted)
}

func TestTraceGroupReservoirResize(t *testing.T) {
const (
maxGroups = 1
defaultSamplingFraction = 0.2
ingestRateCoefficient = 0.75
)
groups := newTraceGroups(maxGroups, defaultSamplingFraction, ingestRateCoefficient)

sendTransactions := func(n int) {
for i := 0; i < n; i++ {
groups.sampleTrace(&model.Transaction{
TraceID: "0102030405060708090a0b0c0d0e0f10",
ID: "0102030405060708",
})
}
}

// All groups start out with a reservoir size of 1000.
sendTransactions(10000)
assert.Len(t, groups.finalizeSampledTraces(nil), 1000) // initial reservoir size

// We sent 10000 initially, and we send 20000 each subsequent iteration.
// The number of sampled trace IDs will converge on 4000 (0.2*20000).
for i, expected := range []int{
2000, // 0.2 * 10000 (initial ingest rate)
3500, // 0.2 * (0.25*10000 + 0.75*20000)
3875, // 0.2 * (0.25*17500 + 0.75*20000)
3969, // etc.
3992,
3998,
4000,
4000,
} {
sendTransactions(20000)
assert.Len(t, groups.finalizeSampledTraces(nil), expected, fmt.Sprintf("iteration %d", i))
}
}

func TestTraceGroupReservoirResizeMinimum(t *testing.T) {
const (
maxGroups = 1
defaultSamplingFraction = 0.1
ingestRateCoefficient = 1.0
)
groups := newTraceGroups(maxGroups, defaultSamplingFraction, ingestRateCoefficient)

sendTransactions := func(n int) {
for i := 0; i < n; i++ {
groups.sampleTrace(&model.Transaction{
TraceID: "0102030405060708090a0b0c0d0e0f10",
ID: "0102030405060708",
})
}
}

sendTransactions(10000)
assert.Len(t, groups.finalizeSampledTraces(nil), 1000) // initial reservoir size

// The reservoir would normally be resized to fit the desired sampling
// rate, but will never be resized to less than the minimum (1000).
sendTransactions(1000)
assert.Len(t, groups.finalizeSampledTraces(nil), 100)

sendTransactions(10000)
assert.Len(t, groups.finalizeSampledTraces(nil), 1000) // min reservoir size
}

func TestTraceGroupsRemoval(t *testing.T) {
const (
maxGroups = 2
defaultSamplingFraction = 0.5
ingestRateCoefficient = 1.0
)
groups := newTraceGroups(maxGroups, defaultSamplingFraction, ingestRateCoefficient)

for i := 0; i < 10000; i++ {
_, err := groups.sampleTrace(&model.Transaction{Name: "many"})
assert.NoError(t, err)
}
_, err := groups.sampleTrace(&model.Transaction{Name: "few"})
assert.NoError(t, err)

_, err = groups.sampleTrace(&model.Transaction{Name: "another"})
assert.Equal(t, errTooManyTraceGroups, err)

// Finalizing should remove the "few" trace group, since its reservoir
// size is at the minimum, and the number of groups is at the maximum.
groups.finalizeSampledTraces(nil)

// We should now be able to add another trace group.
_, err = groups.sampleTrace(&model.Transaction{Name: "another"})
assert.NoError(t, err)
}

func BenchmarkTraceGroups(b *testing.B) {
const (
maxGroups = 1000
defaultSamplingFraction = 1.0
ingestRateCoefficient = 1.0
)
groups := newTraceGroups(maxGroups, defaultSamplingFraction, ingestRateCoefficient)

b.RunParallel(func(pb *testing.PB) {
// Transaction identifiers are different for each goroutine, simulating
// multiple agentss. This should demonstrate low contention.
//
// Duration is non-zero to ensure transactions have a non-zero chance of
// being sampled.
tx := model.Transaction{
Duration: 1000,
Name: uuid.Must(uuid.NewV4()).String(),
}
for pb.Next() {
groups.sampleTrace(&tx)
tx.Duration += 1000
}
})
}
Loading

0 comments on commit 4095bdd

Please sign in to comment.