Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sampling: add trace group sampling reservoirs #4182

Merged
merged 7 commits into from
Sep 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 188 additions & 0 deletions x-pack/apm-server/sampling/groups.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package sampling

import (
"errors"
"math"
"math/rand"
"sync"
"time"

"github.com/elastic/apm-server/model"
)

const minReservoirSize = 1000

var errTooManyTraceGroups = errors.New("too many trace groups")

// traceGroups maintains a collection of trace groups.
type traceGroups struct {
// defaultSamplingFraction holds the default sampling fraction to
// set for new trace groups. See traceGroup.samplingFraction.
defaultSamplingFraction float64

// ingestRateDecayFactor is λ, the decay factor used for calculating the
// exponentially weighted moving average ingest rate for each trace group.
ingestRateDecayFactor float64

// maxGroups holds the maximum number of trace groups to maintain.
// Once this is reached, all trace events will be sampled.
maxGroups int

mu sync.RWMutex

// TODO(axw) enable the user to define specific trace groups, along with
// a specific sampling rate. The "groups" field would then track all other
// trace groups, up to a configured maximum, using the default sampling rate.
groups map[traceGroupKey]*traceGroup
}

func newTraceGroups(
maxGroups int,
defaultSamplingFraction float64,
ingestRateDecayFactor float64,
) *traceGroups {
return &traceGroups{
defaultSamplingFraction: defaultSamplingFraction,
ingestRateDecayFactor: ingestRateDecayFactor,
maxGroups: maxGroups,
groups: make(map[traceGroupKey]*traceGroup),
}
}

// traceGroup represents a single trace group, including a measurement of the
// observed ingest rate, a trace ID weighted random sampling reservoir.
type traceGroup struct {
// samplingFraction holds the configured fraction of traces in this
// trace group to sample, as a fraction in the range (0,1).
samplingFraction float64

mu sync.Mutex
// reservoir holds a random sample of root transactions observed
// for this trace group, weighted by duration.
reservoir *weightedRandomSample
// total holds the total number of root transactions observed for
// this trace group, including those that are not added to the
// reservoir. This is used to update ingestRate.
total int
// ingestRate holds the exponentially weighted moving average number
// of root transactions observed for this trace group per tail
// sampling interval. This is read and written only by the periodic
// finalizeSampledTraces calls.
ingestRate float64
}

type traceGroupKey struct {
// TODO(axw) review grouping attributes
serviceName string
transactionName string
transactionOutcome string
}

// sampleTrace will return true if the root transaction is admitted to
// the in-memory sampling reservoir, and false otherwise.
//
// If the transaction is not admitted due to the transaction group limit
// having been reached, sampleTrace will return errTooManyTraceGroups.
func (g *traceGroups) sampleTrace(tx *model.Transaction) (bool, error) {
key := traceGroupKey{
serviceName: tx.Metadata.Service.Name,
transactionName: tx.Name,
transactionOutcome: tx.Outcome,
}

// First attempt to locate the group with a read lock, to avoid
// contention in the common case that a group has already been
// defined.
g.mu.RLock()
group, ok := g.groups[key]
if ok {
defer g.mu.RUnlock()
group.mu.Lock()
defer group.mu.Unlock()
group.total++
return group.reservoir.Sample(tx.Duration, tx.TraceID), nil
}
g.mu.RUnlock()

g.mu.Lock()
defer g.mu.Unlock()
group, ok = g.groups[key]
if ok {
// We've got a write lock on g.mu, no need to lock group too.
group.total++
return group.reservoir.Sample(tx.Duration, tx.TraceID), nil
} else if len(g.groups) == g.maxGroups {
return false, errTooManyTraceGroups
}

group = &traceGroup{
samplingFraction: g.defaultSamplingFraction,
total: 1,
reservoir: newWeightedRandomSample(
rand.New(rand.NewSource(time.Now().UnixNano())),
minReservoirSize,
),
}
group.reservoir.Sample(tx.Duration, tx.TraceID)
g.groups[key] = group
return true, nil
}

// finalizeSampledTraces locks the groups, appends their current trace IDs to
// traceIDs, and returns the extended slice. On return the groups' sampling
// reservoirs will be reset.
//
// If the maximum number of groups has been reached, then any groups with the
// minimum reservoir size (low ingest or sampling rate) may be removed. These
// groups may also be removed if they have seen no activity in this interval.
func (g *traceGroups) finalizeSampledTraces(traceIDs []string) []string {
g.mu.Lock()
defer g.mu.Unlock()
maxGroupsReached := len(g.groups) == g.maxGroups
for key, group := range g.groups {
total := group.total
traceIDs = group.finalizeSampledTraces(traceIDs, g.ingestRateDecayFactor)
if group.reservoir.Size() == minReservoirSize {
if total == 0 || maxGroupsReached {
delete(g.groups, key)
}
}
}
return traceIDs
}

// finalizeSampledTraces appends the group's current trace IDs to traceIDs, and
// returns the extended slice. On return the groups' sampling reservoirs will be
// reset.
func (g *traceGroup) finalizeSampledTraces(traceIDs []string, ingestRateDecayFactor float64) []string {
if g.ingestRate == 0 {
g.ingestRate = float64(g.total)
} else {
g.ingestRate *= 1 - ingestRateDecayFactor
g.ingestRate += ingestRateDecayFactor * float64(g.total)
}
desiredTotal := int(math.Round(g.samplingFraction * float64(g.total)))
g.total = 0

for n := g.reservoir.Len(); n > desiredTotal; n-- {
// The reservoir is larger than the desired fraction of the
// observed total number of traces in this interval. Pop the
// lowest weighted traces to limit to the desired total.
g.reservoir.Pop()
}
traceIDs = append(traceIDs, g.reservoir.Values()...)

// Resize the reservoir, so that it can hold the desired fraction of
// the observed ingest rate.
newReservoirSize := int(math.Round(g.samplingFraction * g.ingestRate))
if newReservoirSize < minReservoirSize {
newReservoirSize = minReservoirSize
}
g.reservoir.Reset()
g.reservoir.Resize(newReservoirSize)
return traceIDs
}
165 changes: 165 additions & 0 deletions x-pack/apm-server/sampling/groups_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package sampling

import (
"fmt"
"testing"

"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/apm-server/model"
)

func TestTraceGroups(t *testing.T) {
const (
maxGroups = 100
defaultSamplingFraction = 1.0
ingestRateCoefficient = 1.0
)

groups := newTraceGroups(maxGroups, defaultSamplingFraction, ingestRateCoefficient)
for i := 0; i < maxGroups; i++ {
transactionName := fmt.Sprintf("transaction_group_%d", i)
for i := 0; i < minReservoirSize; i++ {
admitted, err := groups.sampleTrace(&model.Transaction{
Name: transactionName,
TraceID: uuid.Must(uuid.NewV4()).String(),
ID: uuid.Must(uuid.NewV4()).String(),
})
require.NoError(t, err)
assert.True(t, admitted)
}
}

admitted, err := groups.sampleTrace(&model.Transaction{
Name: "overflow",
TraceID: uuid.Must(uuid.NewV4()).String(),
ID: uuid.Must(uuid.NewV4()).String(),
})
assert.Equal(t, errTooManyTraceGroups, err)
assert.False(t, admitted)
}

func TestTraceGroupReservoirResize(t *testing.T) {
const (
maxGroups = 1
defaultSamplingFraction = 0.2
ingestRateCoefficient = 0.75
)
groups := newTraceGroups(maxGroups, defaultSamplingFraction, ingestRateCoefficient)

sendTransactions := func(n int) {
for i := 0; i < n; i++ {
groups.sampleTrace(&model.Transaction{
TraceID: "0102030405060708090a0b0c0d0e0f10",
ID: "0102030405060708",
})
}
}

// All groups start out with a reservoir size of 1000.
sendTransactions(10000)
assert.Len(t, groups.finalizeSampledTraces(nil), 1000) // initial reservoir size

// We sent 10000 initially, and we send 20000 each subsequent iteration.
// The number of sampled trace IDs will converge on 4000 (0.2*20000).
for i, expected := range []int{
2000, // 0.2 * 10000 (initial ingest rate)
3500, // 0.2 * (0.25*10000 + 0.75*20000)
3875, // 0.2 * (0.25*17500 + 0.75*20000)
3969, // etc.
3992,
3998,
4000,
4000,
} {
sendTransactions(20000)
assert.Len(t, groups.finalizeSampledTraces(nil), expected, fmt.Sprintf("iteration %d", i))
}
}

func TestTraceGroupReservoirResizeMinimum(t *testing.T) {
const (
maxGroups = 1
defaultSamplingFraction = 0.1
ingestRateCoefficient = 1.0
)
groups := newTraceGroups(maxGroups, defaultSamplingFraction, ingestRateCoefficient)

sendTransactions := func(n int) {
for i := 0; i < n; i++ {
groups.sampleTrace(&model.Transaction{
TraceID: "0102030405060708090a0b0c0d0e0f10",
ID: "0102030405060708",
})
}
}

sendTransactions(10000)
assert.Len(t, groups.finalizeSampledTraces(nil), 1000) // initial reservoir size

// The reservoir would normally be resized to fit the desired sampling
// rate, but will never be resized to less than the minimum (1000).
sendTransactions(1000)
assert.Len(t, groups.finalizeSampledTraces(nil), 100)

sendTransactions(10000)
assert.Len(t, groups.finalizeSampledTraces(nil), 1000) // min reservoir size
}

func TestTraceGroupsRemoval(t *testing.T) {
const (
maxGroups = 2
defaultSamplingFraction = 0.5
ingestRateCoefficient = 1.0
)
groups := newTraceGroups(maxGroups, defaultSamplingFraction, ingestRateCoefficient)

for i := 0; i < 10000; i++ {
_, err := groups.sampleTrace(&model.Transaction{Name: "many"})
assert.NoError(t, err)
}
_, err := groups.sampleTrace(&model.Transaction{Name: "few"})
assert.NoError(t, err)

_, err = groups.sampleTrace(&model.Transaction{Name: "another"})
assert.Equal(t, errTooManyTraceGroups, err)

// Finalizing should remove the "few" trace group, since its reservoir
// size is at the minimum, and the number of groups is at the maximum.
groups.finalizeSampledTraces(nil)

// We should now be able to add another trace group.
_, err = groups.sampleTrace(&model.Transaction{Name: "another"})
assert.NoError(t, err)
}

func BenchmarkTraceGroups(b *testing.B) {
const (
maxGroups = 1000
defaultSamplingFraction = 1.0
ingestRateCoefficient = 1.0
)
groups := newTraceGroups(maxGroups, defaultSamplingFraction, ingestRateCoefficient)

b.RunParallel(func(pb *testing.PB) {
// Transaction identifiers are different for each goroutine, simulating
// multiple agentss. This should demonstrate low contention.
//
// Duration is non-zero to ensure transactions have a non-zero chance of
// being sampled.
tx := model.Transaction{
Duration: 1000,
Name: uuid.Must(uuid.NewV4()).String(),
}
for pb.Next() {
groups.sampleTrace(&tx)
tx.Duration += 1000
}
})
}
Loading