Skip to content

Commit

Permalink
changes to append samplerate to all spans
Browse files Browse the repository at this point in the history
  • Loading branch information
Hanna Yang committed Aug 1, 2024
1 parent fe7acca commit 401956f
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 15 deletions.
2 changes: 2 additions & 0 deletions processor/tailsamplingprocessor/internal/sampling/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type TraceData struct {
ReceivedBatches ptrace.Traces
// FinalDecision.
FinalDecision Decision
// SampleRate
SampleRate int64
}

// Decision gives the status of sampling decision.
Expand Down
61 changes: 48 additions & 13 deletions processor/tailsamplingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ type tailSamplingSpanProcessor struct {
policyTicker timeutils.TTicker
tickerFrequency time.Duration
decisionBatcher idbatcher.Batcher
sampledIDCache cache.Cache[bool]
sampledIDCache cache.Cache[int64]
deleteChan chan pcommon.TraceID
numTracesOnMap *atomic.Uint64
samplingRates map[string]int64
}

// spanAndScope a structure for holding information about span and its instrumentation scope.
Expand Down Expand Up @@ -88,9 +89,9 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting
if err != nil {
return nil, err
}
sampledDecisions := cache.NewNopDecisionCache[bool]()
sampledDecisions := cache.NewNopDecisionCache[int64]()
if cfg.DecisionCache.SampledCacheSize > 0 {
sampledDecisions, err = cache.NewLRUDecisionCache[bool](cfg.DecisionCache.SampledCacheSize)
sampledDecisions, err = cache.NewLRUDecisionCache[int64](cfg.DecisionCache.SampledCacheSize)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -119,6 +120,7 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting
if tsp.policies == nil {
policyNames := map[string]bool{}
tsp.policies = make([]*policy, len(cfg.PolicyCfgs))
tsp.samplingRates = make(map[string]int64, len(cfg.PolicyCfgs))
for i := range cfg.PolicyCfgs {
policyCfg := &cfg.PolicyCfgs[i]

Expand All @@ -137,6 +139,23 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting
attribute: metric.WithAttributes(attribute.String("policy", policyCfg.Name)),
}
tsp.policies[i] = p
if policyCfg.Type == Probabilistic {
tsp.samplingRates[policyCfg.Name] = int64(100 / policyCfg.ProbabilisticCfg.SamplingPercentage)
} else if policyCfg.Type == And {
sampleRateSet := false
for j := range policyCfg.AndCfg.SubPolicyCfg {
subpolicyCfg := &policyCfg.AndCfg.SubPolicyCfg[j]
if subpolicyCfg.Type == Probabilistic {
tsp.samplingRates[policyCfg.Name] = int64(100 / subpolicyCfg.ProbabilisticCfg.SamplingPercentage)
sampleRateSet = true
}
}
if !sampleRateSet {
tsp.samplingRates[policyCfg.Name] = 1
}
} else {
tsp.samplingRates[policyCfg.Name] = 1
}
}
}

Expand Down Expand Up @@ -176,7 +195,7 @@ func withTickerFrequency(frequency time.Duration) Option {
}

// withSampledDecisionCache sets the cache which the processor uses to store recently sampled trace IDs.
func withSampledDecisionCache(c cache.Cache[bool]) Option {
func withSampledDecisionCache(c cache.Cache[int64]) Option {
return func(tsp *tailSamplingSpanProcessor) {
tsp.sampledIDCache = c
}
Expand Down Expand Up @@ -255,7 +274,7 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
trace := d.(*sampling.TraceData)
trace.DecisionTime = time.Now()

decision := tsp.makeDecision(id, trace, &metrics)
decision, samplingRate := tsp.makeDecision(id, trace, &metrics)
tsp.telemetry.ProcessorTailSamplingSamplingDecisionTimerLatency.Record(tsp.ctx, int64(time.Since(startTime)/time.Microsecond))
tsp.telemetry.ProcessorTailSamplingSamplingTraceDroppedTooEarly.Add(tsp.ctx, metrics.idNotFoundOnMapCount)
tsp.telemetry.ProcessorTailSamplingSamplingPolicyEvaluationError.Add(tsp.ctx, metrics.evaluateErrorCount)
Expand All @@ -266,11 +285,12 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
trace.Lock()
allSpans := trace.ReceivedBatches
trace.FinalDecision = decision
trace.SampleRate = samplingRate
trace.ReceivedBatches = ptrace.NewTraces()
trace.Unlock()

if decision == sampling.Sampled {
tsp.releaseSampledTrace(context.Background(), id, allSpans)
tsp.releaseSampledTrace(context.Background(), id, allSpans, samplingRate)
}
}

Expand All @@ -283,7 +303,7 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
)
}

func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sampling.TraceData, metrics *policyMetrics) sampling.Decision {
func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sampling.TraceData, metrics *policyMetrics) (sampling.Decision, int64) {
finalDecision := sampling.NotSampled
samplingDecision := map[sampling.Decision]bool{
sampling.Error: false,
Expand All @@ -292,6 +312,7 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa
sampling.InvertSampled: false,
sampling.InvertNotSampled: false,
}
samplingRate := int64(1_000_000_000_000)

ctx := context.Background()
// Check all policies before making a final decision
Expand All @@ -311,6 +332,9 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa

samplingDecision[decision] = true
}
if decision == sampling.Sampled {
samplingRate = min(tsp.samplingRates[p.name], samplingRate)
}
}

// InvertNotSampled takes precedence over any other decision
Expand All @@ -323,7 +347,7 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa
finalDecision = sampling.Sampled
}

return finalDecision
return finalDecision, samplingRate
}

// ConsumeTraces is required by the processor.Traces interface.
Expand Down Expand Up @@ -361,10 +385,10 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc
var newTraceIDs int64
for id, spans := range idToSpansAndScope {
// If the trace ID is in the sampled cache, short circuit the decision
if _, ok := tsp.sampledIDCache.Get(id); ok {
if cacheSamplingRate, ok := tsp.sampledIDCache.Get(id); ok {
traceTd := ptrace.NewTraces()
appendToTraces(traceTd, resourceSpans, spans)
tsp.releaseSampledTrace(tsp.ctx, id, traceTd)
tsp.releaseSampledTrace(tsp.ctx, id, traceTd, cacheSamplingRate)
tsp.telemetry.ProcessorTailSamplingEarlyReleasesFromCacheDecision.Add(tsp.ctx, int64(len(spans)))
continue
}
Expand Down Expand Up @@ -408,6 +432,7 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc
// The only thing we really care about here is the final decision.
actualData.Lock()
finalDecision := actualData.FinalDecision
samplingRate := actualData.SampleRate

if finalDecision == sampling.Unspecified {
// If the final decision hasn't been made, add the new spans under the lock.
Expand All @@ -421,7 +446,7 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc
// Forward the spans to the policy destinations
traceTd := ptrace.NewTraces()
appendToTraces(traceTd, resourceSpans, spans)
tsp.releaseSampledTrace(tsp.ctx, id, traceTd)
tsp.releaseSampledTrace(tsp.ctx, id, traceTd, samplingRate)
case sampling.NotSampled:
tsp.telemetry.ProcessorTailSamplingSamplingLateSpanAge.Record(tsp.ctx, int64(time.Since(actualData.DecisionTime)/time.Second))
default:
Expand Down Expand Up @@ -470,8 +495,18 @@ func (tsp *tailSamplingSpanProcessor) dropTrace(traceID pcommon.TraceID, deletio
// releaseSampledTrace sends the trace data to the next consumer.
// It additionally adds the trace ID to the cache of sampled trace IDs.
// It does not (yet) delete the spans from the internal map.
func (tsp *tailSamplingSpanProcessor) releaseSampledTrace(ctx context.Context, id pcommon.TraceID, td ptrace.Traces) {
tsp.sampledIDCache.Put(id, true)
func (tsp *tailSamplingSpanProcessor) releaseSampledTrace(ctx context.Context, id pcommon.TraceID, td ptrace.Traces, samplingRate int64) {
for i := 0; i < td.ResourceSpans().Len(); i++ {
rs := td.ResourceSpans().At(i)
for j := 0; j < rs.ScopeSpans().Len(); j++ {
ss := rs.ScopeSpans().At(j)
for k := 0; k < ss.Spans().Len(); k++ {
span := ss.Spans().At(k)
span.Attributes().PutInt("SampleRate", samplingRate)
}
}
}
tsp.sampledIDCache.Put(id, samplingRate)
if err := tsp.nextConsumer.ConsumeTraces(ctx, td); err != nil {
tsp.logger.Warn(
"Error sending spans to destination",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func BenchmarkSampling(b *testing.B) {

for i := 0; i < b.N; i++ {
for i, id := range traceIDs {
_ = tsp.makeDecision(id, sampleBatches[i], metrics)
_, _ = tsp.makeDecision(id, sampleBatches[i], metrics)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) {
}

// Use this instead of the default no-op cache
c, err := cache.NewLRUDecisionCache[bool](200)
c, err := cache.NewLRUDecisionCache[int64](200)
require.NoError(t, err)
p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withSampledDecisionCache(c))
require.NoError(t, err)
Expand Down

0 comments on commit 401956f

Please sign in to comment.