diff --git a/ddtrace/tracer/span.go b/ddtrace/tracer/span.go index f349538320..db37c2e04f 100644 --- a/ddtrace/tracer/span.go +++ b/ddtrace/tracer/span.go @@ -344,6 +344,7 @@ func (s *span) finish(finishTime int64) { } s.finished = true + keep := true if t, ok := internal.GetGlobalTracer().(*tracer); ok { // we have an active tracer feats := t.features.Load() @@ -358,19 +359,12 @@ func (s *span) finish(finishTime int64) { } if feats.DropP0s { // the agent supports dropping p0's in the client - if shouldDrop(s) { - // ...and this span can be dropped - atomic.AddUint64(&t.droppedP0Spans, 1) - if s == s.context.trace.root { - atomic.AddUint64(&t.droppedP0Traces, 1) - } - return - } + keep = shouldKeep(s) } } - if s.context.drop { - // not sampled by local sampler - return + if keep { + // a single kept span keeps the whole trace. + s.context.trace.keep() } s.context.finish() } @@ -401,20 +395,21 @@ func newAggregableSpan(s *span, cfg *config) *aggregableSpan { } } -// shouldDrop reports whether it's fine to drop the span s. -func shouldDrop(s *span) bool { +// shouldKeep reports whether the trace should be kept. +// a single span being kept implies the whole trace being kept. +func shouldKeep(s *span) bool { if p, ok := s.context.samplingPriority(); ok && p > 0 { // positive sampling priorities stay - return false + return true } if atomic.LoadInt64(&s.context.errors) > 0 { // traces with any span containing an error get kept - return false + return true } if v, ok := s.Metrics[ext.EventSampleRate]; ok { - return !sampledByRate(s.TraceID, v) + return sampledByRate(s.TraceID, v) } - return true + return false } // shouldComputeStats mentions whether this span needs to have stats computed for. diff --git a/ddtrace/tracer/span_test.go b/ddtrace/tracer/span_test.go index b9943774d9..f8d509f21a 100644 --- a/ddtrace/tracer/span_test.go +++ b/ddtrace/tracer/span_test.go @@ -107,26 +107,26 @@ func TestShouldDrop(t *testing.T) { rate float64 want bool }{ - {1, 0, 0, false}, - {2, 1, 0, false}, - {0, 1, 0, false}, - {0, 0, 1, false}, - {0, 0, 0.5, false}, - {0, 0, 0.00001, true}, - {0, 0, 0, true}, + {1, 0, 0, true}, + {2, 1, 0, true}, + {0, 1, 0, true}, + {0, 0, 1, true}, + {0, 0, 0.5, true}, + {0, 0, 0.00001, false}, + {0, 0, 0, false}, } { t.Run("", func(t *testing.T) { s := newSpan("", "", "", 1, 1, 0) s.SetTag(ext.SamplingPriority, tt.prio) s.SetTag(ext.EventSampleRate, tt.rate) atomic.StoreInt64(&s.context.errors, tt.errors) - assert.Equal(t, shouldDrop(s), tt.want) + assert.Equal(t, shouldKeep(s), tt.want) }) } t.Run("none", func(t *testing.T) { s := newSpan("", "", "", 1, 1, 0) - assert.Equal(t, shouldDrop(s), true) + assert.Equal(t, shouldKeep(s), false) }) } diff --git a/ddtrace/tracer/spancontext.go b/ddtrace/tracer/spancontext.go index 3d1cb4ddfa..d88fbd4ba0 100644 --- a/ddtrace/tracer/spancontext.go +++ b/ddtrace/tracer/spancontext.go @@ -10,6 +10,7 @@ import ( "sync/atomic" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/internal" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" ) @@ -25,7 +26,6 @@ type spanContext struct { trace *trace // reference to the trace that this span belongs too span *span // reference to the span that hosts this context - drop bool // when true, the span will not be sent to the agent errors int64 // number of spans with errors in this trace // the below group should propagate cross-process @@ -52,7 +52,6 @@ func newSpanContext(span *span, parent *spanContext) *spanContext { } if parent != nil { context.trace = parent.trace - context.drop = parent.drop context.origin = parent.origin context.errors = parent.errors parent.ForeachBaggageItem(func(k, v string) bool { @@ -128,16 +127,30 @@ func (c *spanContext) baggageItem(key string) string { // finish marks this span as finished in the trace. func (c *spanContext) finish() { c.trace.finishedOne(c.span) } +// samplingDecision is the decision to send a trace to the agent or not. +type samplingDecision int64 + +const ( + // decisionNone is the default state of a trace. + // If no decision is made about the trace, the trace won't be sent to the agent. + decisionNone samplingDecision = iota + // decisionDrop prevents the trace from being sent to the agent. + decisionDrop + // decisionKeep ensures the trace will be sent to the agent. + decisionKeep +) + // trace contains shared context information about a trace, such as sampling // priority, the root reference and a buffer of the spans which are part of the // trace, if these exist. type trace struct { - mu sync.RWMutex // guards below fields - spans []*span // all the spans that are part of this trace - finished int // the number of finished spans - full bool // signifies that the span buffer is full - priority *float64 // sampling priority - locked bool // specifies if the sampling priority can be altered + mu sync.RWMutex // guards below fields + spans []*span // all the spans that are part of this trace + finished int // the number of finished spans + full bool // signifies that the span buffer is full + priority *float64 // sampling priority + locked bool // specifies if the sampling priority can be altered + samplingDecision samplingDecision // samplingDecision indicates whether to send the trace to the agent. // root specifies the root of the trace, if known; it is nil when a span // context is extracted from a carrier, at which point there are no spans in @@ -164,21 +177,33 @@ func newTrace() *trace { return &trace{spans: make([]*span, 0, traceStartSize)} } -func (t *trace) samplingPriority() (p int, ok bool) { - t.mu.RLock() - defer t.mu.RUnlock() +func (t *trace) samplingPriorityLocked() (p int, ok bool) { if t.priority == nil { return 0, false } return int(*t.priority), true } +func (t *trace) samplingPriority() (p int, ok bool) { + t.mu.RLock() + defer t.mu.RUnlock() + return t.samplingPriorityLocked() +} + func (t *trace) setSamplingPriority(p float64) { t.mu.Lock() defer t.mu.Unlock() t.setSamplingPriorityLocked(p) } +func (t *trace) keep() { + atomic.CompareAndSwapInt64((*int64)(&t.samplingDecision), int64(decisionNone), int64(decisionKeep)) +} + +func (t *trace) drop() { + atomic.CompareAndSwapInt64((*int64)(&t.samplingDecision), int64(decisionNone), int64(decisionDrop)) +} + func (t *trace) setSamplingPriorityLocked(p float64) { if t.locked { return @@ -246,11 +271,23 @@ func (t *trace) finishedOne(s *span) { if len(t.spans) != t.finished { return } - if tr, ok := internal.GetGlobalTracer().(*tracer); ok { - // we have a tracer that can receive completed traces. - tr.pushTrace(t.spans) - atomic.AddInt64(&tr.spansFinished, int64(len(t.spans))) + defer func() { + t.spans = nil + t.finished = 0 // important, because a buffer can be used for several flushes + }() + tr, ok := internal.GetGlobalTracer().(*tracer) + if !ok { + return + } + // we have a tracer that can receive completed traces. + atomic.AddInt64(&tr.spansFinished, int64(len(t.spans))) + sd := samplingDecision(atomic.LoadInt64((*int64)(&t.samplingDecision))) + if sd != decisionKeep { + if p, ok := t.samplingPriorityLocked(); ok && p == ext.PriorityAutoReject { + atomic.AddUint64(&tr.droppedP0Spans, uint64(len(t.spans))) + atomic.AddUint64(&tr.droppedP0Traces, 1) + } + return } - t.spans = nil - t.finished = 0 // important, because a buffer can be used for several flushes + tr.pushTrace(t.spans) } diff --git a/ddtrace/tracer/spancontext_test.go b/ddtrace/tracer/spancontext_test.go index 3d5ed72193..da0c4ae3c8 100644 --- a/ddtrace/tracer/spancontext_test.go +++ b/ddtrace/tracer/spancontext_test.go @@ -304,11 +304,8 @@ func TestSpanContextParent(t *testing.T) { baggage: map[string]string{"A": "A", "B": "B"}, hasBaggage: 1, trace: newTrace(), - drop: true, - }, - "nil-trace": &spanContext{ - drop: true, }, + "nil-trace": &spanContext{}, "priority": &spanContext{ baggage: map[string]string{"A": "A", "B": "B"}, hasBaggage: 1, @@ -317,6 +314,14 @@ func TestSpanContextParent(t *testing.T) { priority: func() *float64 { v := new(float64); *v = 2; return v }(), }, }, + "sampling_decision": &spanContext{ + baggage: map[string]string{"A": "A", "B": "B"}, + hasBaggage: 1, + trace: &trace{ + spans: []*span{newBasicSpan("abc")}, + samplingDecision: decisionKeep, + }, + }, "origin": &spanContext{ trace: &trace{spans: []*span{newBasicSpan("abc")}}, origin: "synthetics", @@ -334,8 +339,8 @@ func TestSpanContextParent(t *testing.T) { assert.Contains(ctx.trace.spans, s) if parentCtx.trace != nil { assert.Equal(ctx.trace.priority, parentCtx.trace.priority) + assert.Equal(ctx.trace.samplingDecision, parentCtx.trace.samplingDecision) } - assert.Equal(parentCtx.drop, ctx.drop) assert.Equal(parentCtx.baggage, ctx.baggage) assert.Equal(parentCtx.origin, ctx.origin) }) diff --git a/ddtrace/tracer/tracer.go b/ddtrace/tracer/tracer.go index ba2c8576aa..06cc12f731 100644 --- a/ddtrace/tracer/tracer.go +++ b/ddtrace/tracer/tracer.go @@ -487,7 +487,7 @@ func (t *tracer) sample(span *span) { } sampler := t.config.sampler if !sampler.Sample(span) { - span.context.drop = true + span.context.trace.drop() return } if rs, ok := sampler.(RateSampler); ok && rs.Rate() < 1 { diff --git a/ddtrace/tracer/tracer_test.go b/ddtrace/tracer/tracer_test.go index 6be3ab10df..9f2b40cc07 100644 --- a/ddtrace/tracer/tracer_test.go +++ b/ddtrace/tracer/tracer_test.go @@ -238,6 +238,66 @@ func TestTracerStartSpan(t *testing.T) { }) } +func TestSamplingDecision(t *testing.T) { + t.Run("sampled", func(t *testing.T) { + tracer, _, _, stop := startTestTracer(t) + defer stop() + tracer.prioritySampling.defaultRate = 0 + tracer.config.serviceName = "test_service" + span := tracer.StartSpan("name_1").(*span) + child := tracer.StartSpan("name_2", ChildOf(span.context)) + child.Finish() + span.Finish() + assert.Equal(t, float64(ext.PriorityAutoReject), span.Metrics[keySamplingPriority]) + assert.Equal(t, decisionKeep, span.context.trace.samplingDecision) + }) + + t.Run("dropped", func(t *testing.T) { + tracer, _, _, stop := startTestTracer(t) + defer stop() + tracer.features.DropP0s = true + tracer.prioritySampling.defaultRate = 0 + tracer.config.serviceName = "test_service" + span := tracer.StartSpan("name_1").(*span) + child := tracer.StartSpan("name_2", ChildOf(span.context)) + child.Finish() + span.Finish() + assert.Equal(t, float64(ext.PriorityAutoReject), span.Metrics[keySamplingPriority]) + assert.Equal(t, decisionNone, span.context.trace.samplingDecision) + }) + + t.Run("events_sampled", func(t *testing.T) { + tracer, _, _, stop := startTestTracer(t) + defer stop() + tracer.features.DropP0s = true + tracer.prioritySampling.defaultRate = 0 + tracer.config.serviceName = "test_service" + span := tracer.StartSpan("name_1").(*span) + child := tracer.StartSpan("name_2", ChildOf(span.context)) + child.SetTag(ext.EventSampleRate, 1) + child.Finish() + span.Finish() + assert.Equal(t, float64(ext.PriorityAutoReject), span.Metrics[keySamplingPriority]) + assert.Equal(t, decisionKeep, span.context.trace.samplingDecision) + }) + + t.Run("client_dropped", func(t *testing.T) { + tracer, _, _, stop := startTestTracer(t) + defer stop() + tracer.features.DropP0s = true + tracer.config.sampler = NewRateSampler(0) + tracer.prioritySampling.defaultRate = 0 + tracer.config.serviceName = "test_service" + span := tracer.StartSpan("name_1").(*span) + child := tracer.StartSpan("name_2", ChildOf(span.context)) + child.SetTag(ext.EventSampleRate, 1) + child.Finish() + span.Finish() + assert.Equal(t, float64(ext.PriorityAutoReject), span.Metrics[keySamplingPriority]) + assert.Equal(t, decisionDrop, span.context.trace.samplingDecision) + }) +} + func TestTracerRuntimeMetrics(t *testing.T) { t.Run("on", func(t *testing.T) { tp := new(testLogger)