Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

cmd/trace-agent: combine sampler rates #494

Merged
merged 8 commits into from
Oct 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 22 additions & 28 deletions cmd/trace-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,40 +272,34 @@ func (a *Agent) Process(t model.Trace) {
go func() {
defer watchdog.LogOnPanic()

// All traces should go through either through the normal score sampler or
// the one dedicated to errors.
samplers := make([]*Sampler, 0, 2)
if traceContainsError(t) {
samplers = append(samplers, a.ErrorsScoreSampler)
} else {
samplers = append(samplers, a.ScoreSampler)
sampled, rate := a.sample(pt)
if !sampled {
return
}
if hasPriority {
// If Priority is defined, send to priority sampling, regardless of priority value.
// The sampler will keep or discard the trace, but we send everything so that it
// gets the big picture and can set the sampling rates accordingly.
samplers = append(samplers, a.PrioritySampler)
sampler.AddSampleRate(pt.Root, rate)

a.sampledTraceChan <- &writer.SampledTrace{
Trace: &pt.Trace,
Transactions: a.TransactionSampler.Extract(pt),
}
}()
}

// Trace sampling.
var sampledTrace writer.SampledTrace
func (a *Agent) sample(pt processedTrace) (sampled bool, rate float64) {
var sampledPriority, sampledScore bool
var ratePriority, rateScore float64

sampled := false
for _, s := range samplers {
// Consider trace as sampled if at least one of the samplers kept it.
sampled = s.Add(pt) || sampled
}
if sampled {
sampledTrace.Trace = &pt.Trace
}
if _, ok := pt.Root.Metrics[sampler.SamplingPriorityKey]; ok {
sampledPriority, ratePriority = a.PrioritySampler.Add(pt)
}

sampledTrace.Transactions = a.TransactionSampler.Extract(pt)
// TODO: attach to these transactions the client, pre-sampler and transaction sample rates.
if traceContainsError(pt.Trace) {
sampledScore, rateScore = a.ErrorsScoreSampler.Add(pt)
} else {
sampledScore, rateScore = a.ScoreSampler.Add(pt)
}

if !sampledTrace.Empty() {
a.sampledTraceChan <- &sampledTrace
}
}()
return sampledScore || sampledPriority, sampler.CombineRates(ratePriority, rateScore)
}

// dieFunc is used by watchdog to kill the agent; replaced in tests.
Expand Down
143 changes: 142 additions & 1 deletion cmd/trace-agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,24 @@ import (
"time"

log "github.com/cihub/seelog"
"github.com/stretchr/testify/assert"

"github.com/DataDog/datadog-trace-agent/config"
"github.com/DataDog/datadog-trace-agent/info"
"github.com/DataDog/datadog-trace-agent/model"
"github.com/DataDog/datadog-trace-agent/obfuscate"
"github.com/DataDog/datadog-trace-agent/sampler"
"github.com/DataDog/datadog-trace-agent/testutil"
"github.com/stretchr/testify/assert"
)

type mockSamplerEngine struct {
engine sampler.Engine
}

func newMockSampler(wantSampled bool, wantRate float64) *Sampler {
return &Sampler{engine: testutil.NewMockEngine(wantSampled, wantRate)}
}

func TestWatchdog(t *testing.T) {
if testing.Short() {
return
Expand Down Expand Up @@ -210,6 +218,139 @@ func TestProcess(t *testing.T) {
})
}

func TestSampling(t *testing.T) {
gbbr marked this conversation as resolved.
Show resolved Hide resolved
for name, tt := range map[string]struct {
// hasErrors will be true if the input trace should have errors
// hasPriority will be true if the input trace should have sampling priority set
hasErrors, hasPriority bool

// scoreRate, scoreErrorRate, priorityRate are the rates used by the mock samplers
scoreRate, scoreErrorRate, priorityRate float64

// scoreSampled, scoreErrorSampled, prioritySampled are the sample decisions of the mock samplers
scoreSampled, scoreErrorSampled, prioritySampled bool

// wantRate and wantSampled are the expected result
wantRate float64
wantSampled bool
}{
"score and priority rate": {
hasPriority: true,
scoreRate: 0.5,
priorityRate: 0.6,
wantRate: sampler.CombineRates(0.5, 0.6),
},
"score only rate": {
scoreRate: 0.5,
priorityRate: 0.1,
wantRate: 0.5,
},
"error and priority rate": {
hasErrors: true,
hasPriority: true,
scoreErrorRate: 0.8,
priorityRate: 0.2,
wantRate: sampler.CombineRates(0.8, 0.2),
},
gbbr marked this conversation as resolved.
Show resolved Hide resolved
"score not sampled decision": {
scoreSampled: false,
wantSampled: false,
},
"score sampled decision": {
scoreSampled: true,
wantSampled: true,
},
"score sampled priority not sampled": {
hasPriority: true,
scoreSampled: true,
prioritySampled: false,
wantSampled: true,
},
"score not sampled priority sampled": {
hasPriority: true,
scoreSampled: false,
prioritySampled: true,
wantSampled: true,
},
"score sampled priority sampled": {
hasPriority: true,
scoreSampled: true,
prioritySampled: true,
wantSampled: true,
},
"score and priority not sampled": {
hasPriority: true,
scoreSampled: false,
prioritySampled: false,
wantSampled: false,
},
"error not sampled decision": {
hasErrors: true,
scoreErrorSampled: false,
wantSampled: false,
},
"error sampled decision": {
hasErrors: true,
scoreErrorSampled: true,
wantSampled: true,
},
"error sampled priority not sampled": {
hasErrors: true,
hasPriority: true,
scoreErrorSampled: true,
prioritySampled: false,
wantSampled: true,
},
"error not sampled priority sampled": {
hasErrors: true,
hasPriority: true,
scoreErrorSampled: false,
prioritySampled: true,
wantSampled: true,
},
"error sampled priority sampled": {
hasErrors: true,
hasPriority: true,
scoreErrorSampled: true,
prioritySampled: true,
wantSampled: true,
},
"error and priority not sampled": {
hasErrors: true,
hasPriority: true,
scoreErrorSampled: false,
prioritySampled: false,
wantSampled: false,
},
} {
t.Run(name, func(t *testing.T) {
a := &Agent{
ScoreSampler: newMockSampler(tt.scoreSampled, tt.scoreRate),
ErrorsScoreSampler: newMockSampler(tt.scoreErrorSampled, tt.scoreErrorRate),
PrioritySampler: newMockSampler(tt.prioritySampled, tt.priorityRate),
}
root := &model.Span{
Service: "serv1",
Start: time.Now().UnixNano(),
Duration: (100 * time.Millisecond).Nanoseconds(),
Metrics: map[string]float64{},
}

if tt.hasErrors {
root.Error = 1
}
pt := processedTrace{Trace: model.Trace{root}, Root: root}
if tt.hasPriority {
pt.Root.Metrics[sampler.SamplingPriorityKey] = 1
}

sampled, rate := a.sample(pt)
assert.EqualValues(t, tt.wantRate, rate)
assert.EqualValues(t, tt.wantSampled, sampled)
})
}
}

func BenchmarkAgentTraceProcessing(b *testing.B) {
c := config.New()
c.Endpoints[0].APIKey = "test"
Expand Down
10 changes: 4 additions & 6 deletions cmd/trace-agent/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,13 @@ func (s *Sampler) Run() {
}

// Add samples a trace and returns true if trace was sampled (should be kept), false otherwise
func (s *Sampler) Add(t processedTrace) bool {
func (s *Sampler) Add(t processedTrace) (sampled bool, rate float64) {
atomic.AddUint64(&s.totalTraceCount, 1)

if s.engine.Sample(t.Trace, t.Root, t.Env) {
sampled, rate = s.engine.Sample(t.Trace, t.Root, t.Env)
if sampled {
atomic.AddUint64(&s.keptTraceCount, 1)
return true
}

return false
return sampled, rate
}

// Stop stops the sampler
Expand Down
27 changes: 22 additions & 5 deletions sampler/coresampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Engine interface {
// Stop the sampler.
Stop()
// Sample a trace.
Sample(trace model.Trace, root *model.Span, env string) bool
Sample(trace model.Trace, root *model.Span, env string) (sampled bool, samplingRate float64)
gbbr marked this conversation as resolved.
Show resolved Hide resolved
// GetState returns information about the sampler.
GetState() interface{}
// GetType returns the type of the sampler.
Expand Down Expand Up @@ -147,9 +147,9 @@ func (s *Sampler) RunAdjustScoring() {

// GetSampleRate returns the sample rate to apply to a trace.
func (s *Sampler) GetSampleRate(trace model.Trace, root *model.Span, signature Signature) float64 {
sampleRate := s.GetSignatureSampleRate(signature) * s.extraRate
rate := s.GetSignatureSampleRate(signature) * s.extraRate

return sampleRate
return rate
}

// GetMaxTPSSampleRate returns an extra sample rate to apply if we are above maxTPS.
Expand All @@ -176,6 +176,23 @@ func GetTraceAppliedSampleRate(root *model.Span) float64 {
}

// SetTraceAppliedSampleRate sets the currently applied sample rate in the trace data to allow chained up sampling.
func SetTraceAppliedSampleRate(root *model.Span, sampleRate float64) {
root.SetMetric(model.SpanSampleRateMetricKey, sampleRate)
func SetTraceAppliedSampleRate(root *model.Span, rate float64) {
root.SetMetric(model.SpanSampleRateMetricKey, rate)
}

// CombineRates merges two rates from Sampler1, Sampler2. Both samplers law are independant,
// and {sampled} = {sampled by Sampler1} or {sampled by Sampler2}
func CombineRates(rate1 float64, rate2 float64) float64 {
if rate1 >= 1 || rate2 >= 1 {
return 1
}
return rate1 + rate2 - rate1*rate2
gbbr marked this conversation as resolved.
Show resolved Hide resolved
}

// AddSampleRate adds a new sampling rate to the trace sampling rate. Previous and new sampling rate must be independant
// and the sampling decisions sequential.
func AddSampleRate(root *model.Span, rate float64) {
initialRate := GetTraceAppliedSampleRate(root)
newRate := initialRate * rate
SetTraceAppliedSampleRate(root, newRate)
}
29 changes: 29 additions & 0 deletions sampler/coresampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"
"time"

"github.com/DataDog/datadog-trace-agent/model"
log "github.com/cihub/seelog"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -53,3 +54,31 @@ func TestSamplerLoop(t *testing.T) {
assert.Fail(t, "Sampler took more than 1 second to close")
}
}

func TestCombineRates(t *testing.T) {
var combineRatesTests = []struct {
rate1, rate2 float64
expected float64
}{
{0.1, 1.0, 1.0},
{0.3, 0.2, 0.44},
{0.0, 0.5, 0.5},
}
for _, tt := range combineRatesTests {
assert.Equal(t, tt.expected, CombineRates(tt.rate1, tt.rate2))
assert.Equal(t, tt.expected, CombineRates(tt.rate2, tt.rate1))
}
}

func TestAddSampleRate(t *testing.T) {
gbbr marked this conversation as resolved.
Show resolved Hide resolved
assert := assert.New(t)
tID := randomTraceID()

root := model.Span{TraceID: tID, SpanID: 1, ParentID: 0, Start: 123, Duration: 100000, Service: "mcnulty", Type: "web"}

AddSampleRate(&root, 0.4)
assert.Equal(0.4, root.Metrics["_sample_rate"], "sample rate should be 40%%")

AddSampleRate(&root, 0.5)
assert.Equal(0.2, root.Metrics["_sample_rate"], "sample rate should be 20%% (50%% of 40%%)")
}
Loading