Skip to content

Commit

Permalink
Record {transaction,sample}.sample_rate (#804)
Browse files Browse the repository at this point in the history
* Record {transaction,sample}.sample_rate

Introduce the new ExtendedSampler interface, which
Samplers may implement to return the effective
sampling rate. This is implemented by the built-in
ratioSampler.

When starting a root transaction we now call the
ExtendedSampler method if implemented, and store
the effective sampling rate in the transaction's
tracestate under our "es" vendor key.

When receiving tracestate, we parse our "es" vendor
value and extract the sample rate.

When encoding transactions and spans we record the
sample rate (from tracestate) in the transaction and
span events.
  • Loading branch information
axw authored Sep 4, 2020
1 parent b34fe04 commit 489947b
Show file tree
Hide file tree
Showing 16 changed files with 409 additions and 10 deletions.
2 changes: 2 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ func (t *Tracer) updateRemoteConfig(logger WarningLogger, old, attrs map[string]
} else {
updates = append(updates, func(cfg *instrumentationConfig) {
cfg.sampler = sampler
cfg.extendedSampler, _ = sampler.(ExtendedSampler)
})
}
default:
Expand Down Expand Up @@ -479,6 +480,7 @@ type instrumentationConfigValues struct {
recording bool
captureBody CaptureBodyMode
captureHeaders bool
extendedSampler ExtendedSampler
maxSpans int
sampler Sampler
spanFramesMinDuration time.Duration
Expand Down
8 changes: 8 additions & 0 deletions model/marshal_fastjson.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ type Transaction struct {
// it to true.
Sampled *bool `json:"sampled,omitempty"`

// SampleRate holds the sample rate in effect when the trace was started,
// if known. This is used by the server to aggregate transaction metrics.
SampleRate *float64 `json:"sample_rate,omitempty"`

// SpanCount holds statistics on spans within a transaction.
SpanCount SpanCount `json:"span_count"`
}
Expand Down Expand Up @@ -254,6 +258,10 @@ type Span struct {
// ParentID holds the ID of the span's parent (span or transaction).
ParentID SpanID `json:"parent_id,omitempty"`

// SampleRate holds the sample rate in effect when the trace was started,
// if known. This is used by the server to aggregate span metrics.
SampleRate *float64 `json:"sample_rate,omitempty"`

// Context holds contextual information relating to the span.
Context *SpanContext `json:"context,omitempty"`

Expand Down
6 changes: 6 additions & 0 deletions modelwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ func (w *modelWriter) buildModelTransaction(out *model.Transaction, tx *Transact
if !sampled {
out.Sampled = &notSampled
}
if tx.traceContext.State.haveSampleRate {
out.SampleRate = &tx.traceContext.State.sampleRate
}

out.ParentID = model.SpanID(td.parentSpan)
out.Name = truncateString(td.Name)
Expand Down Expand Up @@ -137,6 +140,9 @@ func (w *modelWriter) buildModelSpan(out *model.Span, span *Span, sd *SpanData)
out.ID = model.SpanID(span.traceContext.Span)
out.TraceID = model.TraceID(span.traceContext.Trace)
out.TransactionID = model.SpanID(span.transactionID)
if span.traceContext.State.haveSampleRate {
out.SampleRate = &span.traceContext.State.sampleRate
}

out.ParentID = model.SpanID(sd.parentID)
out.Name = truncateString(sd.Name)
Expand Down
2 changes: 1 addition & 1 deletion module/apmgrpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func testClientSpan(t *testing.T, traceparentHeaders ...string) {
}
assert.Equal(t, clientSpans[0].TraceID, serverTransactions[1].TraceID)
assert.Equal(t, clientSpans[0].ID, serverTransactions[1].ParentID)
assert.Equal(t, "server_span", serverSpans[0].Name) // no tracestate
assert.Equal(t, "es=s:1", serverSpans[0].Name) // automatically created tracestate
assert.Equal(t, "vendor=tracestate", serverSpans[1].Name)

traceparentValue := apmhttp.FormatTraceparentHeader(apm.TraceContext{
Expand Down
3 changes: 2 additions & 1 deletion module/apmot/harness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,5 +115,6 @@ func (harnessAPIProbe) SameSpanContext(span opentracing.Span, sc opentracing.Spa
if !ok {
return false
}
return ctx1.traceContext == ctx2.traceContext
return ctx1.traceContext.Trace == ctx2.traceContext.Trace &&
ctx1.traceContext.Span == ctx2.traceContext.Span
}
50 changes: 46 additions & 4 deletions sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,37 @@ type Sampler interface {
Sample(TraceContext) bool
}

// ExtendedSampler may be implemented by Samplers, providing
// a method for sampling and returning an extended SampleResult.
//
// TODO(axw) in v2.0.0, replace the Sampler interface with this.
type ExtendedSampler interface {
// SampleExtended indicates whether or not a transaction
// should be sampled, and the sampling rate in effect at
// the time. This method will be invoked by calls to
// Tracer.StartTransaction for the root of a trace, so it
// must be goroutine-safe, and should avoid synchronization
// as far as possible.
SampleExtended(SampleParams) SampleResult
}

// SampleParams holds parameters for SampleExtended.
type SampleParams struct {
// TraceContext holds the newly-generated TraceContext
// for the root transaction which is being sampled.
TraceContext TraceContext
}

// SampleResult holds information about a sampling decision.
type SampleResult struct {
// Sampled holds the sampling decision.
Sampled bool

// SampleRate holds the sample rate in effect at the
// time of the sampling decision.
SampleRate float64
}

// NewRatioSampler returns a new Sampler with the given ratio
//
// A ratio of 1.0 samples 100% of transactions, a ratio of 0.5
Expand All @@ -51,16 +82,27 @@ func NewRatioSampler(r float64) Sampler {
x.SetUint64(math.MaxUint64)
x.Mul(&x, big.NewFloat(r))
ceil, _ := x.Uint64()
return ratioSampler{ceil}
return ratioSampler{r, ceil}
}

type ratioSampler struct {
ceil uint64
ratio float64
ceil uint64
}

// Sample samples the transaction according to the configured
// ratio and pseudo-random source.
func (s ratioSampler) Sample(c TraceContext) bool {
v := binary.BigEndian.Uint64(c.Span[:])
return v > 0 && v-1 < s.ceil
return s.SampleExtended(SampleParams{TraceContext: c}).Sampled
}

// SampleExtended samples the transaction according to the configured
// ratio and pseudo-random source.
func (s ratioSampler) SampleExtended(args SampleParams) SampleResult {
v := binary.BigEndian.Uint64(args.TraceContext.Span[:])
result := SampleResult{
Sampled: v > 0 && v-1 < s.ceil,
SampleRate: s.ratio,
}
return result
}
20 changes: 20 additions & 0 deletions sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,23 @@ func TestRatioSamplerNever(t *testing.T) {
Span: apm.SpanID{255, 255, 255, 255, 255, 255, 255, 255},
}))
}

func TestRatioSamplerExtended(t *testing.T) {
s := apm.NewRatioSampler(0.5).(apm.ExtendedSampler)

result := s.SampleExtended(apm.SampleParams{
TraceContext: apm.TraceContext{Span: apm.SpanID{255, 0, 0, 0, 0, 0, 0, 0}},
})
assert.Equal(t, apm.SampleResult{
Sampled: false,
SampleRate: 0.5,
}, result)

result = s.SampleExtended(apm.SampleParams{
TraceContext: apm.TraceContext{Span: apm.SpanID{1, 0, 0, 0, 0, 0, 0, 0}},
})
assert.Equal(t, apm.SampleResult{
Sampled: true,
SampleRate: 0.5,
}, result)
}
22 changes: 22 additions & 0 deletions span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,25 @@ func TestTracerStartSpanIDSpecified(t *testing.T) {
require.Len(t, spans, 1)
assert.Equal(t, model.SpanID(spanID), spans[0].ID)
}

func TestSpanSampleRate(t *testing.T) {
tracer := apmtest.NewRecordingTracer()
defer tracer.Close()
tracer.SetSampler(apm.NewRatioSampler(0.5555))

tx := tracer.StartTransactionOptions("name", "type", apm.TransactionOptions{
// Use a known transaction ID for deterministic sampling.
TransactionID: apm.SpanID{1, 2, 3, 4, 5, 6, 7, 8},
})
s1 := tx.StartSpan("name", "type", nil)
s2 := tx.StartSpan("name", "type", s1)
s2.End()
s1.End()
tx.End()
tracer.Flush(nil)

payloads := tracer.Payloads()
assert.Equal(t, 0.556, *payloads.Transactions[0].SampleRate)
assert.Equal(t, 0.556, *payloads.Spans[0].SampleRate)
assert.Equal(t, 0.556, *payloads.Spans[1].SampleRate)
}
78 changes: 76 additions & 2 deletions tracecontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,17 @@ import (
"encoding/hex"
"fmt"
"regexp"
"strconv"
"strings"
"unicode"

"github.com/pkg/errors"
)

const (
elasticTracestateVendorKey = "es"
)

var (
errZeroTraceID = errors.New("zero trace-id is invalid")
errZeroSpanID = errors.New("zero span-id is invalid")
Expand Down Expand Up @@ -152,6 +158,13 @@ func (o TraceOptions) WithRecorded(recorded bool) TraceOptions {
// TraceState holds vendor-specific state for a trace.
type TraceState struct {
head *TraceStateEntry

// Fields related to parsing the Elastic ("es") tracestate entry.
//
// These must not be modified after NewTraceState returns.
parseElasticTracestateError error
haveSampleRate bool
sampleRate float64
}

// NewTraceState returns a TraceState based on entries.
Expand All @@ -167,9 +180,55 @@ func NewTraceState(entries ...TraceStateEntry) TraceState {
}
last = &e
}
for _, e := range entries {
if e.Key != elasticTracestateVendorKey {
continue
}
out.parseElasticTracestateError = out.parseElasticTracestate(e)
break
}
return out
}

// parseElasticTracestate parses an Elastic ("es") tracestate entry.
//
// Per https://github.com/elastic/apm/blob/master/specs/agents/tracing-distributed-tracing.md,
// the "es" tracestate value format is: "key:value;key:value...". Unknown keys are ignored.
func (s *TraceState) parseElasticTracestate(e TraceStateEntry) error {
if err := e.Validate(); err != nil {
return err
}
value := e.Value
for value != "" {
kv := value
end := strings.IndexRune(value, ';')
if end >= 0 {
kv = value[:end]
value = value[end+1:]
} else {
value = ""
}
sep := strings.IndexRune(kv, ':')
if sep == -1 {
return errors.New("malformed 'es' tracestate entry")
}
k, v := kv[:sep], kv[sep+1:]
switch k {
case "s":
sampleRate, err := strconv.ParseFloat(v, 64)
if err != nil {
return err
}
if sampleRate < 0 || sampleRate > 1 {
return fmt.Errorf("sample rate %q out of range", v)
}
s.sampleRate = sampleRate
s.haveSampleRate = true
}
}
return nil
}

// String returns s as a comma-separated list of key-value pairs.
func (s TraceState) String() string {
if s.head == nil {
Expand Down Expand Up @@ -199,8 +258,16 @@ func (s TraceState) Validate() error {
if i == 32 {
return errors.New("tracestate contains more than the maximum allowed number of entries, 32")
}
if err := e.Validate(); err != nil {
return errors.Wrapf(err, "invalid tracestate entry at position %d", i)
if e.Key == elasticTracestateVendorKey {
// s.parseElasticTracestateError holds a general e.Validate error if any
// occurred, or any other error specific to the Elastic tracestate format.
if err := s.parseElasticTracestateError; err != nil {
return errors.Wrapf(err, "invalid tracestate entry at position %d", i)
}
} else {
if err := e.Validate(); err != nil {
return errors.Wrapf(err, "invalid tracestate entry at position %d", i)
}
}
if prev, ok := recorded[e.Key]; ok {
return fmt.Errorf("duplicate tracestate key %q at positions %d and %d", e.Key, prev, i)
Expand Down Expand Up @@ -261,3 +328,10 @@ func (e *TraceStateEntry) validateValue() error {
}
return nil
}

func formatElasticTracestateValue(sampleRate float64) string {
// 0 -> "s:0"
// 1 -> "s:1"
// 0.5555 -> "s:0.555" (any rounding should be applied prior)
return fmt.Sprintf("s:%.3g", sampleRate)
}
11 changes: 11 additions & 0 deletions tracecontext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,14 @@ func TestTraceStateInvalidValueCharacter(t *testing.T) {
`invalid tracestate entry at position 0: invalid value for key "oy": value contains invalid character '\x00'`)
}
}

func TestTraceStateInvalidElasticEntry(t *testing.T) {
ts := apm.NewTraceState(apm.TraceStateEntry{Key: "es", Value: "foo"})
assert.EqualError(t, ts.Validate(), `invalid tracestate entry at position 0: malformed 'es' tracestate entry`)

ts = apm.NewTraceState(apm.TraceStateEntry{Key: "es", Value: "s:foo"})
assert.EqualError(t, ts.Validate(), `invalid tracestate entry at position 0: strconv.ParseFloat: parsing "foo": invalid syntax`)

ts = apm.NewTraceState(apm.TraceStateEntry{Key: "es", Value: "s:1.5"})
assert.EqualError(t, ts.Validate(), `invalid tracestate entry at position 0: sample rate "1.5" out of range`)
}
2 changes: 2 additions & 0 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ func newTracer(opts TracerOptions) *Tracer {
})
t.setLocalInstrumentationConfig(envTransactionSampleRate, func(cfg *instrumentationConfigValues) {
cfg.sampler = opts.sampler
cfg.extendedSampler, _ = opts.sampler.(ExtendedSampler)
})
t.setLocalInstrumentationConfig(envSpanFramesMinDuration, func(cfg *instrumentationConfigValues) {
cfg.spanFramesMinDuration = opts.spanFramesMinDuration
Expand Down Expand Up @@ -664,6 +665,7 @@ func (t *Tracer) SetRecording(r bool) {
func (t *Tracer) SetSampler(s Sampler) {
t.setLocalInstrumentationConfig(envTransactionSampleRate, func(cfg *instrumentationConfigValues) {
cfg.sampler = s
cfg.extendedSampler, _ = s.(ExtendedSampler)
})
}

Expand Down
26 changes: 24 additions & 2 deletions transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,30 @@ func (t *Tracer) StartTransactionOptions(name, transactionType string, opts Tran
}

if root {
sampler := instrumentationConfig.sampler
if sampler == nil || sampler.Sample(tx.traceContext) {
var result SampleResult
if instrumentationConfig.extendedSampler != nil {
result = instrumentationConfig.extendedSampler.SampleExtended(SampleParams{
TraceContext: tx.traceContext,
})
if !result.Sampled {
// Special case: for unsampled transactions we
// report a sample rate of 0, so that we do not
// count them in aggregations in the server.
// This is necessary to avoid overcounting, as
// we will scale the sampled transactions.
result.SampleRate = 0
}
sampleRate := round(1000*result.SampleRate) / 1000
tx.traceContext.State = NewTraceState(TraceStateEntry{
Key: elasticTracestateVendorKey,
Value: formatElasticTracestateValue(sampleRate),
})
} else if instrumentationConfig.sampler != nil {
result.Sampled = instrumentationConfig.sampler.Sample(tx.traceContext)
} else {
result.Sampled = true
}
if result.Sampled {
o := tx.traceContext.Options.WithRecorded(true)
tx.traceContext.Options = o
}
Expand Down
Loading

0 comments on commit 489947b

Please sign in to comment.