Skip to content

Commit

Permalink
tailsampling: when multiple policies choose to sample, only send to n…
Browse files Browse the repository at this point in the history
…ext consumer once

fixes #1514
  • Loading branch information
chris-smith-zocdoc committed Sep 3, 2020
1 parent f3b5b45 commit a267df2
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 25 deletions.
65 changes: 40 additions & 25 deletions processor/samplingprocessor/tailsamplingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,33 +166,44 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
}

trace.Decisions[i] = decision
}

switch decision {
case sampling.Sampled:
stats.RecordWithTags(
policy.ctx,
[]tag.Mutator{tag.Insert(tagSampledKey, "true")},
statCountTracesSampled.M(int64(1)),
)
decisionSampled++

trace.Lock()
traceBatches := trace.ReceivedBatches
trace.Unlock()

for j := 0; j < len(traceBatches); j++ {
tsp.nextConsumer.ConsumeTraces(policy.ctx, internaldata.OCToTraceData(traceBatches[j]))
}
case sampling.NotSampled:
stats.RecordWithTags(
policy.ctx,
[]tag.Mutator{tag.Insert(tagSampledKey, "false")},
statCountTracesSampled.M(int64(1)),
)
decisionNotSampled++
finalDecision := sampling.NotSampled
firstMatchingPolicy := tsp.policies[0]

for i, decision := range trace.Decisions {
if decision == sampling.Sampled {
finalDecision = sampling.Sampled
firstMatchingPolicy = tsp.policies[i]
break
}
}

switch finalDecision {
case sampling.Sampled:
_ = stats.RecordWithTags(
firstMatchingPolicy.ctx,
[]tag.Mutator{tag.Insert(tagSampledKey, "true")},
statCountTracesSampled.M(int64(1)),
)
decisionSampled++

trace.Lock()
traceBatches := trace.ReceivedBatches
trace.Unlock()

for j := 0; j < len(traceBatches); j++ {
_ = tsp.nextConsumer.ConsumeTraces(firstMatchingPolicy.ctx, internaldata.OCToTraceData(traceBatches[j]))
}
case sampling.NotSampled:
_ = stats.RecordWithTags(
firstMatchingPolicy.ctx,
[]tag.Mutator{tag.Insert(tagSampledKey, "false")},
statCountTracesSampled.M(int64(1)),
)
decisionNotSampled++
}

// Sampled or not, remove the batches
trace.Lock()
trace.ReceivedBatches = nil
Expand Down Expand Up @@ -296,8 +307,6 @@ func (tsp *tailSamplingSpanProcessor) processTraces(td consumerdata.TraceData) e
actualData.Unlock()

switch actualDecision {
case sampling.Pending:
// All process for pending done above, keep the case so it doesn't go to default.
case sampling.Sampled:
// Forward the spans to the policy destinations
traceTd := prepareTraceBatch(spans, singleTrace, td)
Expand All @@ -316,6 +325,12 @@ func (tsp *tailSamplingSpanProcessor) processTraces(td consumerdata.TraceData) e
zap.String("policy", policy.Name),
zap.Int("decision", int(actualDecision)))
}

// At this point the late arrival has been passed to nextConsumer. Need to break out of the policy loop
// so that it isn't sent to nextConsumer more than once when multiple policies chose to sample
if actualDecision == sampling.Sampled {
break
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,72 @@ func TestSamplingPolicyTypicalPath(t *testing.T) {
require.Equal(t, 1, mpe.LateArrivingSpansCount, "policy was not notified of the late span")
}

func TestSamplingMultiplePolicies(t *testing.T) {
const maxSize = 100
const decisionWaitSeconds = 5
// For this test explicitly control the timer calls and batcher, and set a mock
// sampling policy evaluator.
msp := new(exportertest.SinkTraceExporter)
mpe1 := &mockPolicyEvaluator{}
mpe2 := &mockPolicyEvaluator{}
mtt := &manualTTicker{}
tsp := &tailSamplingSpanProcessor{
ctx: context.Background(),
nextConsumer: msp,
maxNumTraces: maxSize,
logger: zap.NewNop(),
decisionBatcher: newSyncIDBatcher(decisionWaitSeconds),
policies: []*Policy{
{
Name: "policy-1", Evaluator: mpe1, ctx: context.TODO(),
},
{
Name: "policy-2", Evaluator: mpe2, ctx: context.TODO(),
}},
deleteChan: make(chan traceKey, maxSize),
policyTicker: mtt,
}

_, batches := generateIdsAndBatches(210)
currItem := 0
numSpansPerBatchWindow := 10
// First evaluations shouldn't have anything to evaluate, until decision wait time passed.
for evalNum := 0; evalNum < decisionWaitSeconds; evalNum++ {
for ; currItem < numSpansPerBatchWindow*(evalNum+1); currItem++ {
tsp.ConsumeTraces(context.Background(), batches[currItem])
require.True(t, mtt.Started, "Time ticker was expected to have started")
}
tsp.samplingPolicyOnTick()
require.False(
t,
msp.SpansCount() != 0 || mpe1.EvaluationCount != 0 || mpe2.EvaluationCount != 0,
"policy for initial items was evaluated before decision wait period",
)
}

// Both policies will decide to sample
mpe1.NextDecision = sampling.Sampled
mpe2.NextDecision = sampling.Sampled
tsp.samplingPolicyOnTick()
require.False(
t,
msp.SpansCount() == 0 || mpe1.EvaluationCount == 0 || mpe2.EvaluationCount == 0,
"policy should have been evaluated totalspans == %d and evaluationcount(1) == %d and evaluationcount(2) == %d",
msp.SpansCount(),
mpe1.EvaluationCount,
mpe2.EvaluationCount,
)

require.Equal(t, numSpansPerBatchWindow, msp.SpansCount(), "nextConsumer should've been called with exactly 1 batch of spans")

// Late span of a sampled trace should be sent directly down the pipeline exporter
tsp.ConsumeTraces(context.Background(), batches[0])
expectedNumWithLateSpan := numSpansPerBatchWindow + 1
require.Equal(t, expectedNumWithLateSpan, msp.SpansCount(), "late span was not accounted for")
require.Equal(t, 1, mpe1.LateArrivingSpansCount, "1st policy was not notified of the late span")
require.Equal(t, 0, mpe2.LateArrivingSpansCount, "2nd policy should not have been notified of the late span")
}

func generateIdsAndBatches(numIds int) ([][]byte, []pdata.Traces) {
traceIds := make([][]byte, numIds)
var tds []pdata.Traces
Expand Down

0 comments on commit a267df2

Please sign in to comment.