Skip to content

Commit

Permalink
Finish up configs for tail sampling policies (#221)
Browse files Browse the repository at this point in the history
* Finish up configs for tail sampling policies

Fixes #146.

* Add tailsampling processor to defaults

* Restore multiple policies per instance of tail sampling processor

* Remove the -filter suffix
  • Loading branch information
songy23 authored and Paulo Janotti committed Aug 7, 2019
1 parent aeaee36 commit c4d4adf
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 58 deletions.
2 changes: 2 additions & 0 deletions defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/open-telemetry/opentelemetry-service/processor/attributekeyprocessor"
"github.com/open-telemetry/opentelemetry-service/processor/nodebatcher"
"github.com/open-telemetry/opentelemetry-service/processor/queued"
"github.com/open-telemetry/opentelemetry-service/processor/tailsampling"
"github.com/open-telemetry/opentelemetry-service/receiver"
"github.com/open-telemetry/opentelemetry-service/receiver/jaegerreceiver"
"github.com/open-telemetry/opentelemetry-service/receiver/opencensusreceiver"
Expand Down Expand Up @@ -74,6 +75,7 @@ func Components() (
&attributekeyprocessor.Factory{},
&queued.Factory{},
&nodebatcher.Factory{},
&tailsampling.Factory{},
)
if err != nil {
errs = append(errs, err)
Expand Down
2 changes: 2 additions & 0 deletions defaults/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/open-telemetry/opentelemetry-service/processor/attributekeyprocessor"
"github.com/open-telemetry/opentelemetry-service/processor/nodebatcher"
"github.com/open-telemetry/opentelemetry-service/processor/queued"
"github.com/open-telemetry/opentelemetry-service/processor/tailsampling"
"github.com/open-telemetry/opentelemetry-service/receiver"
"github.com/open-telemetry/opentelemetry-service/receiver/jaegerreceiver"
"github.com/open-telemetry/opentelemetry-service/receiver/opencensusreceiver"
Expand All @@ -55,6 +56,7 @@ func TestDefaultComponents(t *testing.T) {
"attribute-key": &attributekeyprocessor.Factory{},
"queued-retry": &queued.Factory{},
"batch": &nodebatcher.Factory{},
"tail-sampling": &tailsampling.Factory{},
}
expectedExporters := map[string]exporter.Factory{
"opencensus": &opencensusexporter.Factory{},
Expand Down
4 changes: 2 additions & 2 deletions internal/collector/sampling/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
// TraceData stores the sampling related trace data.
type TraceData struct {
sync.Mutex
// Decision gives the current status of the sampling decision for each policy.
Decision []Decision
// Decisions gives the current status of the sampling decision for each policy.
Decisions []Decision
// Arrival time the first span for the trace was received.
ArrivalTime time.Time
// Decisiontime time when sampling decision was taken.
Expand Down
41 changes: 26 additions & 15 deletions processor/tailsampling/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,33 @@ type PolicyType string
const (
// AlwaysSample samples all traces, typically used for debugging.
AlwaysSample PolicyType = "always-sample"
// NumericAttributeFilter sample traces that have a given numeric attribute in a specified
// NumericAttribute sample traces that have a given numeric attribute in a specified
// range, e.g.: attribute "http.status_code" >= 399 and <= 999.
NumericAttributeFilter PolicyType = "numeric-attribute-filter"
// StringAttributeFilter sample traces that a attribute, of type string, matching
NumericAttribute PolicyType = "numeric-attribute"
// StringAttribute sample traces that a attribute, of type string, matching
// one of the listed values.
StringAttributeFilter PolicyType = "string-attribute-filter"
StringAttribute PolicyType = "string-attribute"
// RateLimiting allows all traces until the specified limits are satisfied.
RateLimiting PolicyType = "rate-limiting"
)

// PolicyCfg holds the common configuration to all policies.
type PolicyCfg struct {
// Name given to the instance of the policy to make easy to identify it in metrics and logs.
Name string
Name string `mapstructure:"name"`
// Type of the policy this will be used to match the proper configuration of the policy.
Type PolicyType
// Exporters hold the name of the exporters that the policy evaluator uses to make decisions
// about whether or not sending the traces.
Exporters []string
// Configuration holds the settings specific to the policy.
Configuration interface{}
Type PolicyType `mapstructure:"type"`
// Configs for numeric attribute filter sampling policy evaluator.
NumericAttributeCfg NumericAttributeCfg `mapstructure:"numeric-attribute"`
// Configs for string attribute filter sampling policy evaluator.
StringAttributeCfg StringAttributeCfg `mapstructure:"string-attribute"`
// Configs for rate limiting filter sampling policy evaluator.
RateLimitingCfg RateLimitingCfg `mapstructure:"rate-limiting"`
}

// NumericAttributeFilterCfg holds the configurable settings to create a numeric attribute filter
// NumericAttributeCfg holds the configurable settings to create a numeric attribute filter
// sampling policy evaluator.
type NumericAttributeFilterCfg struct {
type NumericAttributeCfg struct {
// Tag that the filter is going to be matching against.
Key string `mapstructure:"key"`
// MinValue is the minimum value of the attribute to be considered a match.
Expand All @@ -60,15 +61,22 @@ type NumericAttributeFilterCfg struct {
MaxValue int64 `mapstructure:"max-value"`
}

// StringAttributeFilterCfg holds the configurable settings to create a string attribute filter
// StringAttributeCfg holds the configurable settings to create a string attribute filter
// sampling policy evaluator.
type StringAttributeFilterCfg struct {
type StringAttributeCfg struct {
// Tag that the filter is going to be matching against.
Key string `mapstructure:"key"`
// Values is the set of attribute values that if any is equal to the actual attribute value to be considered a match.
Values []string `mapstructure:"values"`
}

// RateLimitingCfg holds the configurable settings to create a rate limiting
// sampling policy evaluator.
type RateLimitingCfg struct {
// SpansPerSecond sets the limit on the maximum nuber of spans that can be processed each second.
SpansPerSecond int64 `mapstructure:"spans-per-second"`
}

// Config holds the configuration for tail-based sampling.
type Config struct {
configmodels.ProcessorSettings `mapstructure:",squash"`
Expand All @@ -81,4 +89,7 @@ type Config struct {
// ExpectedNewTracesPerSec sets the expected number of new traces sending to the tail sampling processor
// per second. This helps with allocating data structures with closer to actual usage size.
ExpectedNewTracesPerSec uint64 `mapstructure:"expected-new-traces-per-sec"`
// PolicyCfgs sets the tail-based sampling policy which makes a sampling decision
// for a given trace when requested.
PolicyCfgs []PolicyCfg `mapstructure:"policies"`
}
31 changes: 25 additions & 6 deletions processor/tailsampling/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,38 @@ func TestLoadConfig(t *testing.T) {
cfg, err := config.LoadConfigFile(
t, path.Join(".", "testdata", "tail_sampling_config.yaml"), receivers, processors, exporters,
)

require.Nil(t, err)
require.NotNil(t, cfg)

p0 := cfg.Processors["tail-sampling"]
assert.Equal(t, p0,
assert.Equal(t, cfg.Processors["tail-sampling"],
&Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "tail-sampling",
NameVal: "tail-sampling",
},
DecisionWait: 31 * time.Second,
NumTraces: 20001,
ExpectedNewTracesPerSec: 100,
DecisionWait: 10 * time.Second,
NumTraces: 100,
ExpectedNewTracesPerSec: 10,
PolicyCfgs: []PolicyCfg{
{
Name: "test-policy-1",
Type: AlwaysSample,
},
{
Name: "test-policy-2",
Type: NumericAttribute,
NumericAttributeCfg: NumericAttributeCfg{Key: "key1", MinValue: 50, MaxValue: 100},
},
{
Name: "test-policy-3",
Type: StringAttribute,
StringAttributeCfg: StringAttributeCfg{Key: "key2", Values: []string{"value1", "value2"}},
},
{
Name: "test-policy-4",
Type: RateLimiting,
RateLimitingCfg: RateLimitingCfg{SpansPerSecond: 35},
},
},
})
}
10 changes: 9 additions & 1 deletion processor/tailsampling/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,15 @@ func TestCreateDefaultConfig(t *testing.T) {
func TestCreateProcessor(t *testing.T) {
factory := &Factory{}

cfg := factory.CreateDefaultConfig()
cfg := factory.CreateDefaultConfig().(*Config)
// Manually set required fields
cfg.ExpectedNewTracesPerSec = 64
cfg.PolicyCfgs = []PolicyCfg{
{
Name: "test-policy",
Type: AlwaysSample,
},
}

tp, err := factory.CreateTraceProcessor(zap.NewNop(), exportertest.NewNopTraceExporter(), cfg)
assert.NotNil(t, tp)
Expand Down
75 changes: 58 additions & 17 deletions processor/tailsampling/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package tailsampling
import (
"context"
"errors"
"fmt"
"runtime"
"sync"
"sync/atomic"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/open-telemetry/opentelemetry-service/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-service/internal/collector/processor/idbatcher"
"github.com/open-telemetry/opentelemetry-service/internal/collector/sampling"
"github.com/open-telemetry/opentelemetry-service/observability"
"github.com/open-telemetry/opentelemetry-service/processor"
)

Expand All @@ -41,8 +43,6 @@ type Policy struct {
Name string
// Evaluator that decides if a trace is sampled or not by this policy instance.
Evaluator sampling.PolicyEvaluator
// Destination is the consumer of the traces selected to be sampled.
Destination consumer.TraceConsumer
// ctx used to carry metric tags of each policy.
ctx context.Context
}
Expand All @@ -52,7 +52,7 @@ type Policy struct {
type traceKey string

// tailSamplingSpanProcessor handles the incoming trace data and uses the given sampling
// policies to sample traces.
// policy to sample traces.
type tailSamplingSpanProcessor struct {
ctx context.Context
nextConsumer consumer.TraceConsumer
Expand All @@ -67,6 +67,10 @@ type tailSamplingSpanProcessor struct {
numTracesOnMap uint64
}

const (
sourceFormat = "tail-sampling"
)

var _ processor.TraceProcessor = (*tailSamplingSpanProcessor)(nil)

// NewTraceProcessor returns a processor.TraceProcessor that will perform tail sampling according to the given
Expand All @@ -82,21 +86,58 @@ func NewTraceProcessor(logger *zap.Logger, nextConsumer consumer.TraceConsumer,
return nil, err
}

ctx := context.Background()
policies := []*Policy{}
for _, policyCfg := range cfg.PolicyCfgs {
policyCtx, err := tag.New(ctx, tag.Upsert(tagPolicyKey, policyCfg.Name), tag.Upsert(observability.TagKeyReceiver, sourceFormat))
if err != nil {
return nil, err
}
eval, err := getPolicyEvaluator(&policyCfg)
if err != nil {
return nil, err
}
policy := &Policy{
Name: policyCfg.Name,
Evaluator: eval,
ctx: policyCtx,
}
policies = append(policies, policy)
}

tsp := &tailSamplingSpanProcessor{
ctx: context.Background(),
ctx: ctx,
nextConsumer: nextConsumer,
maxNumTraces: cfg.NumTraces,
logger: logger,
decisionBatcher: inBatcher,
policies: policies,
}

// TODO(#146): add policies to TailBasedCfg
tsp.policyTicker = &policyTicker{onTick: tsp.samplingPolicyOnTick}
tsp.deleteChan = make(chan traceKey, cfg.NumTraces)

return tsp, nil
}

func getPolicyEvaluator(cfg *PolicyCfg) (sampling.PolicyEvaluator, error) {
switch cfg.Type {
case AlwaysSample:
return sampling.NewAlwaysSample(), nil
case NumericAttribute:
nafCfg := cfg.NumericAttributeCfg
return sampling.NewNumericAttributeFilter(nafCfg.Key, nafCfg.MinValue, nafCfg.MaxValue), nil
case StringAttribute:
safCfg := cfg.StringAttributeCfg
return sampling.NewStringAttributeFilter(safCfg.Key, safCfg.Values), nil
case RateLimiting:
rlfCfg := cfg.RateLimitingCfg
return sampling.NewRateLimiting(rlfCfg.SpansPerSecond), nil
default:
return nil, fmt.Errorf("unknown sampling policy type %s", cfg.Type)
}
}

func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
var idNotFoundOnMapCount, evaluateErrorCount, decisionSampled, decisionNotSampled int64
startTime := time.Now()
Expand All @@ -118,13 +159,13 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
policy.ctx,
statDecisionLatencyMicroSec.M(int64(time.Since(policyEvaluateStartTime)/time.Microsecond)))
if err != nil {
trace.Decision[i] = sampling.NotSampled
trace.Decisions[i] = sampling.NotSampled
evaluateErrorCount++
tsp.logger.Error("Sampling policy error", zap.Error(err))
continue
}

trace.Decision[i] = decision
trace.Decisions[i] = decision

switch decision {
case sampling.Sampled:
Expand All @@ -140,7 +181,7 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
trace.Unlock()

for j := 0; j < len(traceBatches); j++ {
policy.Destination.ConsumeTraceData(policy.ctx, traceBatches[j])
tsp.nextConsumer.ConsumeTraceData(policy.ctx, traceBatches[j])
}
case sampling.NotSampled:
stats.RecordWithTags(
Expand Down Expand Up @@ -201,7 +242,7 @@ func (tsp *tailSamplingSpanProcessor) ConsumeTraceData(ctx context.Context, td c
initialDecisions[i] = sampling.Pending
}
initialTraceData := &sampling.TraceData{
Decision: initialDecisions,
Decisions: initialDecisions,
ArrivalTime: time.Now(),
SpanCount: lenSpans,
}
Expand All @@ -227,13 +268,13 @@ func (tsp *tailSamplingSpanProcessor) ConsumeTraceData(ctx context.Context, td c
}
}

for i, policyAndDests := range tsp.policies {
for i, policy := range tsp.policies {
actualData.Lock()
actualDecision := actualData.Decision[i]
actualDecision := actualData.Decisions[i]
// If decision is pending, we want to add the new spans still under the lock, so the decision doesn't happen
// in between the transition from pending.
if actualDecision == sampling.Pending {
// Add the spans to the trace, but only once for all policies, otherwise same spans will
// Add the spans to the trace, but only once for all policy, otherwise same spans will
// be duplicated in the final trace.
traceTd := prepareTraceBatch(spans, singleTrace, td)
actualData.ReceivedBatches = append(actualData.ReceivedBatches, traceTd)
Expand All @@ -248,19 +289,19 @@ func (tsp *tailSamplingSpanProcessor) ConsumeTraceData(ctx context.Context, td c
case sampling.Sampled:
// Forward the spans to the policy destinations
traceTd := prepareTraceBatch(spans, singleTrace, td)
if err := policyAndDests.Destination.ConsumeTraceData(policyAndDests.ctx, traceTd); err != nil {
if err := tsp.nextConsumer.ConsumeTraceData(policy.ctx, traceTd); err != nil {
tsp.logger.Warn("Error sending late arrived spans to destination",
zap.String("policy", policyAndDests.Name),
zap.String("policy", policy.Name),
zap.Error(err))
}
fallthrough // so OnLateArrivingSpans is also called for decision Sampled.
case sampling.NotSampled:
policyAndDests.Evaluator.OnLateArrivingSpans(actualDecision, spans)
policy.Evaluator.OnLateArrivingSpans(actualDecision, spans)
stats.Record(tsp.ctx, statLateSpanArrivalAfterDecision.M(int64(time.Since(actualData.DecisionTime)/time.Second)))

default:
tsp.logger.Warn("Encountered unexpected sampling decision",
zap.String("policy", policyAndDests.Name),
zap.String("policy", policy.Name),
zap.Int("decision", int(actualDecision)))
}
}
Expand All @@ -285,7 +326,7 @@ func (tsp *tailSamplingSpanProcessor) dropTrace(traceID traceKey, deletionTime t
policiesLen := len(tsp.policies)
stats.Record(tsp.ctx, statTraceRemovalAgeSec.M(int64(deletionTime.Sub(trace.ArrivalTime)/time.Second)))
for j := 0; j < policiesLen; j++ {
if trace.Decision[j] == sampling.Pending {
if trace.Decisions[j] == sampling.Pending {
policy := tsp.policies[j]
if decision, err := policy.Evaluator.OnDroppedSpans([]byte(traceID), trace); err != nil {
tsp.logger.Warn("OnDroppedSpans",
Expand Down
Loading

0 comments on commit c4d4adf

Please sign in to comment.