Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tailsampling: only send to next consumer once #1735

Merged
merged 6 commits into from
Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 70 additions & 44 deletions processor/samplingprocessor/tailsamplingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,80 +138,102 @@ func getPolicyEvaluator(logger *zap.Logger, cfg *PolicyCfg) (sampling.PolicyEval
}
}

type policyMetrics struct {
idNotFoundOnMapCount, evaluateErrorCount, decisionSampled, decisionNotSampled int64
}

func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
var idNotFoundOnMapCount, evaluateErrorCount, decisionSampled, decisionNotSampled int64
metrics := policyMetrics{}

startTime := time.Now()
batch, _ := tsp.decisionBatcher.CloseCurrentAndTakeFirstBatch()
batchLen := len(batch)
tsp.logger.Debug("Sampling Policy Evaluation ticked")
for _, id := range batch {
d, ok := tsp.idToTrace.Load(traceKey(id.Bytes()))
if !ok {
idNotFoundOnMapCount++
metrics.idNotFoundOnMapCount++
continue
}
trace := d.(*sampling.TraceData)
trace.DecisionTime = time.Now()
for i, policy := range tsp.policies {
policyEvaluateStartTime := time.Now()
decision, err := policy.Evaluator.Evaluate(id, trace)
stats.Record(
policy.ctx,
statDecisionLatencyMicroSec.M(int64(time.Since(policyEvaluateStartTime)/time.Microsecond)))
if err != nil {
trace.Decisions[i] = sampling.NotSampled
evaluateErrorCount++
tsp.logger.Error("Sampling policy error", zap.Error(err))
continue

decision, policy := tsp.makeDecision(id, trace, &metrics)

// Sampled or not, remove the batches
trace.Lock()
traceBatches := trace.ReceivedBatches
trace.ReceivedBatches = nil
trace.Unlock()
Comment on lines +164 to +167
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously the lock was taken twice when a policy matched, I just combined them into a single acquisition


if decision == sampling.Sampled {
for j := 0; j < len(traceBatches); j++ {
_ = tsp.nextConsumer.ConsumeTraces(policy.ctx, internaldata.OCToTraceData(traceBatches[j]))
}
}
}

stats.Record(tsp.ctx,
statOverallDecisionLatencyµs.M(int64(time.Since(startTime)/time.Microsecond)),
statDroppedTooEarlyCount.M(metrics.idNotFoundOnMapCount),
statPolicyEvaluationErrorCount.M(metrics.evaluateErrorCount),
statTracesOnMemoryGauge.M(int64(atomic.LoadUint64(&tsp.numTracesOnMap))))

tsp.logger.Debug("Sampling policy evaluation completed",
zap.Int("batch.len", batchLen),
zap.Int64("sampled", metrics.decisionSampled),
zap.Int64("notSampled", metrics.decisionNotSampled),
zap.Int64("droppedPriorToEvaluation", metrics.idNotFoundOnMapCount),
zap.Int64("policyEvaluationErrors", metrics.evaluateErrorCount),
)
}

func (tsp *tailSamplingSpanProcessor) makeDecision(id pdata.TraceID, trace *sampling.TraceData, metrics *policyMetrics) (sampling.Decision, *Policy) {
finalDecision := sampling.NotSampled
var matchingPolicy *Policy = nil

for i, policy := range tsp.policies {
policyEvaluateStartTime := time.Now()
decision, err := policy.Evaluator.Evaluate(id, trace)
stats.Record(
policy.ctx,
statDecisionLatencyMicroSec.M(int64(time.Since(policyEvaluateStartTime)/time.Microsecond)))

if err != nil {
trace.Decisions[i] = sampling.NotSampled
metrics.evaluateErrorCount++
tsp.logger.Debug("Sampling policy error", zap.Error(err))
} else {
trace.Decisions[i] = decision

switch decision {
case sampling.Sampled:
stats.RecordWithTags(
// any single policy that decides to sample will cause the decision to be sampled
// the nextConsumer will get the context from the first matching policy
finalDecision = sampling.Sampled
if matchingPolicy == nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only reason this is necessary is to get the matching context from the policy which contains tags for the policy name

matchingPolicy = policy
}

_ = stats.RecordWithTags(
policy.ctx,
[]tag.Mutator{tag.Insert(tagSampledKey, "true")},
statCountTracesSampled.M(int64(1)),
)
decisionSampled++

trace.Lock()
traceBatches := trace.ReceivedBatches
trace.Unlock()
metrics.decisionSampled++

for j := 0; j < len(traceBatches); j++ {
tsp.nextConsumer.ConsumeTraces(policy.ctx, internaldata.OCToTraceData(traceBatches[j]))
}
case sampling.NotSampled:
stats.RecordWithTags(
_ = stats.RecordWithTags(
policy.ctx,
[]tag.Mutator{tag.Insert(tagSampledKey, "false")},
statCountTracesSampled.M(int64(1)),
)
decisionNotSampled++
metrics.decisionNotSampled++
}
}

// Sampled or not, remove the batches
trace.Lock()
trace.ReceivedBatches = nil
trace.Unlock()
}

stats.Record(tsp.ctx,
statOverallDecisionLatencyµs.M(int64(time.Since(startTime)/time.Microsecond)),
statDroppedTooEarlyCount.M(idNotFoundOnMapCount),
statPolicyEvaluationErrorCount.M(evaluateErrorCount),
statTracesOnMemoryGauge.M(int64(atomic.LoadUint64(&tsp.numTracesOnMap))))

tsp.logger.Debug("Sampling policy evaluation completed",
zap.Int("batch.len", batchLen),
zap.Int64("sampled", decisionSampled),
zap.Int64("notSampled", decisionNotSampled),
zap.Int64("droppedPriorToEvaluation", idNotFoundOnMapCount),
zap.Int64("policyEvaluationErrors", evaluateErrorCount),
)
return finalDecision, matchingPolicy
}

// ConsumeTraceData is required by the SpanProcessor interface.
Expand Down Expand Up @@ -296,8 +318,6 @@ func (tsp *tailSamplingSpanProcessor) processTraces(td consumerdata.TraceData) e
actualData.Unlock()

switch actualDecision {
case sampling.Pending:
// All process for pending done above, keep the case so it doesn't go to default.
case sampling.Sampled:
// Forward the spans to the policy destinations
traceTd := prepareTraceBatch(spans, singleTrace, td)
Expand All @@ -316,6 +336,12 @@ func (tsp *tailSamplingSpanProcessor) processTraces(td consumerdata.TraceData) e
zap.String("policy", policy.Name),
zap.Int("decision", int(actualDecision)))
}

// At this point the late arrival has been passed to nextConsumer. Need to break out of the policy loop
// so that it isn't sent to nextConsumer more than once when multiple policies chose to sample
if actualDecision == sampling.Sampled {
break
}
}
}

Expand Down
126 changes: 126 additions & 0 deletions processor/samplingprocessor/tailsamplingprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tailsamplingprocessor

import (
"context"
"errors"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -205,6 +206,131 @@ func TestSamplingPolicyTypicalPath(t *testing.T) {
require.Equal(t, 1, mpe.LateArrivingSpansCount, "policy was not notified of the late span")
}

func TestSamplingMultiplePolicies(t *testing.T) {
const maxSize = 100
const decisionWaitSeconds = 5
// For this test explicitly control the timer calls and batcher, and set a mock
// sampling policy evaluator.
msp := new(exportertest.SinkTraceExporter)
mpe1 := &mockPolicyEvaluator{}
mpe2 := &mockPolicyEvaluator{}
mtt := &manualTTicker{}
tsp := &tailSamplingSpanProcessor{
ctx: context.Background(),
nextConsumer: msp,
maxNumTraces: maxSize,
logger: zap.NewNop(),
decisionBatcher: newSyncIDBatcher(decisionWaitSeconds),
policies: []*Policy{
{
Name: "policy-1", Evaluator: mpe1, ctx: context.TODO(),
},
{
Name: "policy-2", Evaluator: mpe2, ctx: context.TODO(),
}},
deleteChan: make(chan traceKey, maxSize),
policyTicker: mtt,
}

_, batches := generateIdsAndBatches(210)
currItem := 0
numSpansPerBatchWindow := 10
// First evaluations shouldn't have anything to evaluate, until decision wait time passed.
for evalNum := 0; evalNum < decisionWaitSeconds; evalNum++ {
for ; currItem < numSpansPerBatchWindow*(evalNum+1); currItem++ {
tsp.ConsumeTraces(context.Background(), batches[currItem])
require.True(t, mtt.Started, "Time ticker was expected to have started")
}
tsp.samplingPolicyOnTick()
require.False(
t,
msp.SpansCount() != 0 || mpe1.EvaluationCount != 0 || mpe2.EvaluationCount != 0,
"policy for initial items was evaluated before decision wait period",
)
}

// Both policies will decide to sample
mpe1.NextDecision = sampling.Sampled
mpe2.NextDecision = sampling.Sampled
tsp.samplingPolicyOnTick()
require.False(
t,
msp.SpansCount() == 0 || mpe1.EvaluationCount == 0 || mpe2.EvaluationCount == 0,
"policy should have been evaluated totalspans == %d and evaluationcount(1) == %d and evaluationcount(2) == %d",
msp.SpansCount(),
mpe1.EvaluationCount,
mpe2.EvaluationCount,
)

require.Equal(t, numSpansPerBatchWindow, msp.SpansCount(), "nextConsumer should've been called with exactly 1 batch of spans")

// Late span of a sampled trace should be sent directly down the pipeline exporter
tsp.ConsumeTraces(context.Background(), batches[0])
expectedNumWithLateSpan := numSpansPerBatchWindow + 1
require.Equal(t, expectedNumWithLateSpan, msp.SpansCount(), "late span was not accounted for")
require.Equal(t, 1, mpe1.LateArrivingSpansCount, "1st policy was not notified of the late span")
require.Equal(t, 0, mpe2.LateArrivingSpansCount, "2nd policy should not have been notified of the late span")
}

func TestSamplingPolicyDecisionNotSampled(t *testing.T) {
const maxSize = 100
const decisionWaitSeconds = 5
// For this test explicitly control the timer calls and batcher, and set a mock
// sampling policy evaluator.
msp := new(exportertest.SinkTraceExporter)
mpe := &mockPolicyEvaluator{}
mtt := &manualTTicker{}
tsp := &tailSamplingSpanProcessor{
ctx: context.Background(),
nextConsumer: msp,
maxNumTraces: maxSize,
logger: zap.NewNop(),
decisionBatcher: newSyncIDBatcher(decisionWaitSeconds),
policies: []*Policy{{Name: "mock-policy", Evaluator: mpe, ctx: context.TODO()}},
deleteChan: make(chan traceKey, maxSize),
policyTicker: mtt,
}

_, batches := generateIdsAndBatches(210)
currItem := 0
numSpansPerBatchWindow := 10
// First evaluations shouldn't have anything to evaluate, until decision wait time passed.
for evalNum := 0; evalNum < decisionWaitSeconds; evalNum++ {
for ; currItem < numSpansPerBatchWindow*(evalNum+1); currItem++ {
tsp.ConsumeTraces(context.Background(), batches[currItem])
require.True(t, mtt.Started, "Time ticker was expected to have started")
}
tsp.samplingPolicyOnTick()
require.False(
t,
msp.SpansCount() != 0 || mpe.EvaluationCount != 0,
"policy for initial items was evaluated before decision wait period",
)
}

// Now the first batch that waited the decision period.
mpe.NextDecision = sampling.NotSampled
tsp.samplingPolicyOnTick()
require.EqualValues(t, 0, msp.SpansCount(), "exporter should have received zero spans")
require.EqualValues(t, 4, mpe.EvaluationCount, "policy should have been evaluated 4 times")

// Late span of a non-sampled trace should be ignored
tsp.ConsumeTraces(context.Background(), batches[0])
require.Equal(t, 0, msp.SpansCount())
require.Equal(t, 1, mpe.LateArrivingSpansCount, "policy was not notified of the late span")

mpe.NextDecision = sampling.Unspecified
mpe.NextError = errors.New("mock policy error")
tsp.samplingPolicyOnTick()
require.EqualValues(t, 0, msp.SpansCount(), "exporter should have received zero spans")
require.EqualValues(t, 6, mpe.EvaluationCount, "policy should have been evaluated 6 times")

// Late span of a non-sampled trace should be ignored
tsp.ConsumeTraces(context.Background(), batches[0])
require.Equal(t, 0, msp.SpansCount())
require.Equal(t, 2, mpe.LateArrivingSpansCount, "policy was not notified of the late span")
}

func generateIdsAndBatches(numIds int) ([]pdata.TraceID, []pdata.Traces) {
traceIds := make([]pdata.TraceID, numIds)
var tds []pdata.Traces
Expand Down