diff --git a/processor/tailsamplingprocessor/internal/sampling/policy.go b/processor/tailsamplingprocessor/internal/sampling/policy.go index 9441e8cfb6f0..091fed174e37 100644 --- a/processor/tailsamplingprocessor/internal/sampling/policy.go +++ b/processor/tailsamplingprocessor/internal/sampling/policy.go @@ -26,6 +26,8 @@ type TraceData struct { ReceivedBatches ptrace.Traces // FinalDecision. FinalDecision Decision + // SampleRate + SampleRate int64 } // Decision gives the status of sampling decision. diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index b0a58504001d..8c42dbd04126 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -55,9 +55,10 @@ type tailSamplingSpanProcessor struct { policyTicker timeutils.TTicker tickerFrequency time.Duration decisionBatcher idbatcher.Batcher - sampledIDCache cache.Cache[bool] + sampledIDCache cache.Cache[int64] deleteChan chan pcommon.TraceID numTracesOnMap *atomic.Uint64 + samplingRates map[string]int64 } // spanAndScope a structure for holding information about span and its instrumentation scope. @@ -88,9 +89,9 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting if err != nil { return nil, err } - sampledDecisions := cache.NewNopDecisionCache[bool]() + sampledDecisions := cache.NewNopDecisionCache[int64]() if cfg.DecisionCache.SampledCacheSize > 0 { - sampledDecisions, err = cache.NewLRUDecisionCache[bool](cfg.DecisionCache.SampledCacheSize) + sampledDecisions, err = cache.NewLRUDecisionCache[int64](cfg.DecisionCache.SampledCacheSize) if err != nil { return nil, err } @@ -119,6 +120,7 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting if tsp.policies == nil { policyNames := map[string]bool{} tsp.policies = make([]*policy, len(cfg.PolicyCfgs)) + tsp.samplingRates = make(map[string]int64, len(cfg.PolicyCfgs)) for i := range cfg.PolicyCfgs { policyCfg := &cfg.PolicyCfgs[i] @@ -137,6 +139,23 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting attribute: metric.WithAttributes(attribute.String("policy", policyCfg.Name)), } tsp.policies[i] = p + if policyCfg.Type == Probabilistic { + tsp.samplingRates[policyCfg.Name] = int64(100 / policyCfg.ProbabilisticCfg.SamplingPercentage) + } else if policyCfg.Type == And { + sampleRateSet := false + for j := range policyCfg.AndCfg.SubPolicyCfg { + subpolicyCfg := &policyCfg.AndCfg.SubPolicyCfg[j] + if subpolicyCfg.Type == Probabilistic { + tsp.samplingRates[policyCfg.Name] = int64(100 / subpolicyCfg.ProbabilisticCfg.SamplingPercentage) + sampleRateSet = true + } + } + if !sampleRateSet { + tsp.samplingRates[policyCfg.Name] = 1 + } + } else { + tsp.samplingRates[policyCfg.Name] = 1 + } } } @@ -176,7 +195,7 @@ func withTickerFrequency(frequency time.Duration) Option { } // withSampledDecisionCache sets the cache which the processor uses to store recently sampled trace IDs. -func withSampledDecisionCache(c cache.Cache[bool]) Option { +func withSampledDecisionCache(c cache.Cache[int64]) Option { return func(tsp *tailSamplingSpanProcessor) { tsp.sampledIDCache = c } @@ -255,7 +274,7 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() { trace := d.(*sampling.TraceData) trace.DecisionTime = time.Now() - decision := tsp.makeDecision(id, trace, &metrics) + decision, samplingRate := tsp.makeDecision(id, trace, &metrics) tsp.telemetry.ProcessorTailSamplingSamplingDecisionTimerLatency.Record(tsp.ctx, int64(time.Since(startTime)/time.Microsecond)) tsp.telemetry.ProcessorTailSamplingSamplingTraceDroppedTooEarly.Add(tsp.ctx, metrics.idNotFoundOnMapCount) tsp.telemetry.ProcessorTailSamplingSamplingPolicyEvaluationError.Add(tsp.ctx, metrics.evaluateErrorCount) @@ -266,11 +285,12 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() { trace.Lock() allSpans := trace.ReceivedBatches trace.FinalDecision = decision + trace.SampleRate = samplingRate trace.ReceivedBatches = ptrace.NewTraces() trace.Unlock() if decision == sampling.Sampled { - tsp.releaseSampledTrace(context.Background(), id, allSpans) + tsp.releaseSampledTrace(context.Background(), id, allSpans, samplingRate) } } @@ -283,7 +303,7 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() { ) } -func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sampling.TraceData, metrics *policyMetrics) sampling.Decision { +func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sampling.TraceData, metrics *policyMetrics) (sampling.Decision, int64) { finalDecision := sampling.NotSampled samplingDecision := map[sampling.Decision]bool{ sampling.Error: false, @@ -292,6 +312,7 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa sampling.InvertSampled: false, sampling.InvertNotSampled: false, } + samplingRate := int64(1_000_000_000_000) ctx := context.Background() // Check all policies before making a final decision @@ -311,6 +332,9 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa samplingDecision[decision] = true } + if decision == sampling.Sampled { + samplingRate = min(tsp.samplingRates[p.name], samplingRate) + } } // InvertNotSampled takes precedence over any other decision @@ -323,7 +347,7 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa finalDecision = sampling.Sampled } - return finalDecision + return finalDecision, samplingRate } // ConsumeTraces is required by the processor.Traces interface. @@ -361,10 +385,10 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc var newTraceIDs int64 for id, spans := range idToSpansAndScope { // If the trace ID is in the sampled cache, short circuit the decision - if _, ok := tsp.sampledIDCache.Get(id); ok { + if cacheSamplingRate, ok := tsp.sampledIDCache.Get(id); ok { traceTd := ptrace.NewTraces() appendToTraces(traceTd, resourceSpans, spans) - tsp.releaseSampledTrace(tsp.ctx, id, traceTd) + tsp.releaseSampledTrace(tsp.ctx, id, traceTd, cacheSamplingRate) tsp.telemetry.ProcessorTailSamplingEarlyReleasesFromCacheDecision.Add(tsp.ctx, int64(len(spans))) continue } @@ -408,6 +432,7 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc // The only thing we really care about here is the final decision. actualData.Lock() finalDecision := actualData.FinalDecision + samplingRate := actualData.SampleRate if finalDecision == sampling.Unspecified { // If the final decision hasn't been made, add the new spans under the lock. @@ -421,7 +446,7 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc // Forward the spans to the policy destinations traceTd := ptrace.NewTraces() appendToTraces(traceTd, resourceSpans, spans) - tsp.releaseSampledTrace(tsp.ctx, id, traceTd) + tsp.releaseSampledTrace(tsp.ctx, id, traceTd, samplingRate) case sampling.NotSampled: tsp.telemetry.ProcessorTailSamplingSamplingLateSpanAge.Record(tsp.ctx, int64(time.Since(actualData.DecisionTime)/time.Second)) default: @@ -470,8 +495,18 @@ func (tsp *tailSamplingSpanProcessor) dropTrace(traceID pcommon.TraceID, deletio // releaseSampledTrace sends the trace data to the next consumer. // It additionally adds the trace ID to the cache of sampled trace IDs. // It does not (yet) delete the spans from the internal map. -func (tsp *tailSamplingSpanProcessor) releaseSampledTrace(ctx context.Context, id pcommon.TraceID, td ptrace.Traces) { - tsp.sampledIDCache.Put(id, true) +func (tsp *tailSamplingSpanProcessor) releaseSampledTrace(ctx context.Context, id pcommon.TraceID, td ptrace.Traces, samplingRate int64) { + for i := 0; i < td.ResourceSpans().Len(); i++ { + rs := td.ResourceSpans().At(i) + for j := 0; j < rs.ScopeSpans().Len(); j++ { + ss := rs.ScopeSpans().At(j) + for k := 0; k < ss.Spans().Len(); k++ { + span := ss.Spans().At(k) + span.Attributes().PutInt("SampleRate", samplingRate) + } + } + } + tsp.sampledIDCache.Put(id, samplingRate) if err := tsp.nextConsumer.ConsumeTraces(ctx, td); err != nil { tsp.logger.Warn( "Error sending spans to destination", diff --git a/processor/tailsamplingprocessor/processor_benchmarks_test.go b/processor/tailsamplingprocessor/processor_benchmarks_test.go index c7dc26a48060..5b983aa9c26f 100644 --- a/processor/tailsamplingprocessor/processor_benchmarks_test.go +++ b/processor/tailsamplingprocessor/processor_benchmarks_test.go @@ -44,7 +44,7 @@ func BenchmarkSampling(b *testing.B) { for i := 0; i < b.N; i++ { for i, id := range traceIDs { - _ = tsp.makeDecision(id, sampleBatches[i], metrics) + _, _ = tsp.makeDecision(id, sampleBatches[i], metrics) } } } diff --git a/processor/tailsamplingprocessor/processor_decisions_test.go b/processor/tailsamplingprocessor/processor_decisions_test.go index 5c232b8dd3a4..a6addf665be5 100644 --- a/processor/tailsamplingprocessor/processor_decisions_test.go +++ b/processor/tailsamplingprocessor/processor_decisions_test.go @@ -343,7 +343,7 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) { } // Use this instead of the default no-op cache - c, err := cache.NewLRUDecisionCache[bool](200) + c, err := cache.NewLRUDecisionCache[int64](200) require.NoError(t, err) p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withSampledDecisionCache(c)) require.NoError(t, err)