diff --git a/processor/samplingprocessor/tailsamplingprocessor/processor.go b/processor/samplingprocessor/tailsamplingprocessor/processor.go index 1bf12c393f5..49a4ea80319 100644 --- a/processor/samplingprocessor/tailsamplingprocessor/processor.go +++ b/processor/samplingprocessor/tailsamplingprocessor/processor.go @@ -166,33 +166,44 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() { } trace.Decisions[i] = decision + } - switch decision { - case sampling.Sampled: - stats.RecordWithTags( - policy.ctx, - []tag.Mutator{tag.Insert(tagSampledKey, "true")}, - statCountTracesSampled.M(int64(1)), - ) - decisionSampled++ - - trace.Lock() - traceBatches := trace.ReceivedBatches - trace.Unlock() - - for j := 0; j < len(traceBatches); j++ { - tsp.nextConsumer.ConsumeTraces(policy.ctx, internaldata.OCToTraceData(traceBatches[j])) - } - case sampling.NotSampled: - stats.RecordWithTags( - policy.ctx, - []tag.Mutator{tag.Insert(tagSampledKey, "false")}, - statCountTracesSampled.M(int64(1)), - ) - decisionNotSampled++ + finalDecision := sampling.NotSampled + firstMatchingPolicy := tsp.policies[0] + + for i, decision := range trace.Decisions { + if decision == sampling.Sampled { + finalDecision = sampling.Sampled + firstMatchingPolicy = tsp.policies[i] + break } } + switch finalDecision { + case sampling.Sampled: + _ = stats.RecordWithTags( + firstMatchingPolicy.ctx, + []tag.Mutator{tag.Insert(tagSampledKey, "true")}, + statCountTracesSampled.M(int64(1)), + ) + decisionSampled++ + + trace.Lock() + traceBatches := trace.ReceivedBatches + trace.Unlock() + + for j := 0; j < len(traceBatches); j++ { + _ = tsp.nextConsumer.ConsumeTraces(firstMatchingPolicy.ctx, internaldata.OCToTraceData(traceBatches[j])) + } + case sampling.NotSampled: + _ = stats.RecordWithTags( + firstMatchingPolicy.ctx, + []tag.Mutator{tag.Insert(tagSampledKey, "false")}, + statCountTracesSampled.M(int64(1)), + ) + decisionNotSampled++ + } + // Sampled or not, remove the batches trace.Lock() trace.ReceivedBatches = nil @@ -296,8 +307,6 @@ func (tsp *tailSamplingSpanProcessor) processTraces(td consumerdata.TraceData) e actualData.Unlock() switch actualDecision { - case sampling.Pending: - // All process for pending done above, keep the case so it doesn't go to default. case sampling.Sampled: // Forward the spans to the policy destinations traceTd := prepareTraceBatch(spans, singleTrace, td) @@ -316,6 +325,12 @@ func (tsp *tailSamplingSpanProcessor) processTraces(td consumerdata.TraceData) e zap.String("policy", policy.Name), zap.Int("decision", int(actualDecision))) } + + // At this point the late arrival has been passed to nextConsumer. Need to break out of the policy loop + // so that it isn't sent to nextConsumer more than once when multiple policies chose to sample + if actualDecision == sampling.Sampled { + break + } } } diff --git a/processor/samplingprocessor/tailsamplingprocessor/processor_test.go b/processor/samplingprocessor/tailsamplingprocessor/processor_test.go index dbbfa5b3a8f..50f37c07c56 100644 --- a/processor/samplingprocessor/tailsamplingprocessor/processor_test.go +++ b/processor/samplingprocessor/tailsamplingprocessor/processor_test.go @@ -205,6 +205,72 @@ func TestSamplingPolicyTypicalPath(t *testing.T) { require.Equal(t, 1, mpe.LateArrivingSpansCount, "policy was not notified of the late span") } +func TestSamplingMultiplePolicies(t *testing.T) { + const maxSize = 100 + const decisionWaitSeconds = 5 + // For this test explicitly control the timer calls and batcher, and set a mock + // sampling policy evaluator. + msp := new(exportertest.SinkTraceExporter) + mpe1 := &mockPolicyEvaluator{} + mpe2 := &mockPolicyEvaluator{} + mtt := &manualTTicker{} + tsp := &tailSamplingSpanProcessor{ + ctx: context.Background(), + nextConsumer: msp, + maxNumTraces: maxSize, + logger: zap.NewNop(), + decisionBatcher: newSyncIDBatcher(decisionWaitSeconds), + policies: []*Policy{ + { + Name: "policy-1", Evaluator: mpe1, ctx: context.TODO(), + }, + { + Name: "policy-2", Evaluator: mpe2, ctx: context.TODO(), + }}, + deleteChan: make(chan traceKey, maxSize), + policyTicker: mtt, + } + + _, batches := generateIdsAndBatches(210) + currItem := 0 + numSpansPerBatchWindow := 10 + // First evaluations shouldn't have anything to evaluate, until decision wait time passed. + for evalNum := 0; evalNum < decisionWaitSeconds; evalNum++ { + for ; currItem < numSpansPerBatchWindow*(evalNum+1); currItem++ { + tsp.ConsumeTraces(context.Background(), batches[currItem]) + require.True(t, mtt.Started, "Time ticker was expected to have started") + } + tsp.samplingPolicyOnTick() + require.False( + t, + msp.SpansCount() != 0 || mpe1.EvaluationCount != 0 || mpe2.EvaluationCount != 0, + "policy for initial items was evaluated before decision wait period", + ) + } + + // Both policies will decide to sample + mpe1.NextDecision = sampling.Sampled + mpe2.NextDecision = sampling.Sampled + tsp.samplingPolicyOnTick() + require.False( + t, + msp.SpansCount() == 0 || mpe1.EvaluationCount == 0 || mpe2.EvaluationCount == 0, + "policy should have been evaluated totalspans == %d and evaluationcount(1) == %d and evaluationcount(2) == %d", + msp.SpansCount(), + mpe1.EvaluationCount, + mpe2.EvaluationCount, + ) + + require.Equal(t, numSpansPerBatchWindow, msp.SpansCount(), "nextConsumer should've been called with exactly 1 batch of spans") + + // Late span of a sampled trace should be sent directly down the pipeline exporter + tsp.ConsumeTraces(context.Background(), batches[0]) + expectedNumWithLateSpan := numSpansPerBatchWindow + 1 + require.Equal(t, expectedNumWithLateSpan, msp.SpansCount(), "late span was not accounted for") + require.Equal(t, 1, mpe1.LateArrivingSpansCount, "1st policy was not notified of the late span") + require.Equal(t, 0, mpe2.LateArrivingSpansCount, "2nd policy should not have been notified of the late span") +} + func generateIdsAndBatches(numIds int) ([][]byte, []pdata.Traces) { traceIds := make([][]byte, numIds) var tds []pdata.Traces