Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finish up configs for tail sampling policies #221

Merged
merged 4 commits into from
Aug 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/open-telemetry/opentelemetry-service/processor/attributekeyprocessor"
"github.com/open-telemetry/opentelemetry-service/processor/nodebatcher"
"github.com/open-telemetry/opentelemetry-service/processor/queued"
"github.com/open-telemetry/opentelemetry-service/processor/tailsampling"
"github.com/open-telemetry/opentelemetry-service/receiver"
"github.com/open-telemetry/opentelemetry-service/receiver/jaegerreceiver"
"github.com/open-telemetry/opentelemetry-service/receiver/opencensusreceiver"
Expand Down Expand Up @@ -72,6 +73,7 @@ func Components() (
&attributekeyprocessor.Factory{},
&queued.Factory{},
&nodebatcher.Factory{},
&tailsampling.Factory{},
)
if err != nil {
errs = append(errs, err)
Expand Down
2 changes: 2 additions & 0 deletions defaults/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/open-telemetry/opentelemetry-service/processor/attributekeyprocessor"
"github.com/open-telemetry/opentelemetry-service/processor/nodebatcher"
"github.com/open-telemetry/opentelemetry-service/processor/queued"
"github.com/open-telemetry/opentelemetry-service/processor/tailsampling"
"github.com/open-telemetry/opentelemetry-service/receiver"
"github.com/open-telemetry/opentelemetry-service/receiver/jaegerreceiver"
"github.com/open-telemetry/opentelemetry-service/receiver/opencensusreceiver"
Expand All @@ -54,6 +55,7 @@ func TestDefaultComponents(t *testing.T) {
"attribute-key": &attributekeyprocessor.Factory{},
"queued-retry": &queued.Factory{},
"batch": &nodebatcher.Factory{},
"tail-sampling": &tailsampling.Factory{},
}
expectedExporters := map[string]exporter.Factory{
"opencensus": &opencensusexporter.Factory{},
Expand Down
4 changes: 2 additions & 2 deletions internal/collector/sampling/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
// TraceData stores the sampling related trace data.
type TraceData struct {
sync.Mutex
// Decision gives the current status of the sampling decision for each policy.
Decision []Decision
// Decisions gives the current status of the sampling decision for each policy.
Decisions []Decision
// Arrival time the first span for the trace was received.
ArrivalTime time.Time
// Decisiontime time when sampling decision was taken.
Expand Down
41 changes: 26 additions & 15 deletions processor/tailsampling/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,33 @@ type PolicyType string
const (
// AlwaysSample samples all traces, typically used for debugging.
AlwaysSample PolicyType = "always-sample"
// NumericAttributeFilter sample traces that have a given numeric attribute in a specified
// NumericAttribute sample traces that have a given numeric attribute in a specified
// range, e.g.: attribute "http.status_code" >= 399 and <= 999.
NumericAttributeFilter PolicyType = "numeric-attribute-filter"
// StringAttributeFilter sample traces that a attribute, of type string, matching
NumericAttribute PolicyType = "numeric-attribute"
// StringAttribute sample traces that a attribute, of type string, matching
// one of the listed values.
StringAttributeFilter PolicyType = "string-attribute-filter"
StringAttribute PolicyType = "string-attribute"
// RateLimiting allows all traces until the specified limits are satisfied.
RateLimiting PolicyType = "rate-limiting"
)

// PolicyCfg holds the common configuration to all policies.
type PolicyCfg struct {
// Name given to the instance of the policy to make easy to identify it in metrics and logs.
Name string
Name string `mapstructure:"name"`
// Type of the policy this will be used to match the proper configuration of the policy.
Type PolicyType
// Exporters hold the name of the exporters that the policy evaluator uses to make decisions
// about whether or not sending the traces.
Exporters []string
// Configuration holds the settings specific to the policy.
Configuration interface{}
Type PolicyType `mapstructure:"type"`
// Configs for numeric attribute filter sampling policy evaluator.
NumericAttributeCfg NumericAttributeCfg `mapstructure:"numeric-attribute"`
// Configs for string attribute filter sampling policy evaluator.
StringAttributeCfg StringAttributeCfg `mapstructure:"string-attribute"`
// Configs for rate limiting filter sampling policy evaluator.
RateLimitingCfg RateLimitingCfg `mapstructure:"rate-limiting"`
}

// NumericAttributeFilterCfg holds the configurable settings to create a numeric attribute filter
// NumericAttributeCfg holds the configurable settings to create a numeric attribute filter
// sampling policy evaluator.
type NumericAttributeFilterCfg struct {
type NumericAttributeCfg struct {
// Tag that the filter is going to be matching against.
Key string `mapstructure:"key"`
// MinValue is the minimum value of the attribute to be considered a match.
Expand All @@ -60,15 +61,22 @@ type NumericAttributeFilterCfg struct {
MaxValue int64 `mapstructure:"max-value"`
}

// StringAttributeFilterCfg holds the configurable settings to create a string attribute filter
// StringAttributeCfg holds the configurable settings to create a string attribute filter
// sampling policy evaluator.
type StringAttributeFilterCfg struct {
type StringAttributeCfg struct {
// Tag that the filter is going to be matching against.
Key string `mapstructure:"key"`
// Values is the set of attribute values that if any is equal to the actual attribute value to be considered a match.
Values []string `mapstructure:"values"`
}

// RateLimitingCfg holds the configurable settings to create a rate limiting
// sampling policy evaluator.
type RateLimitingCfg struct {
// SpansPerSecond sets the limit on the maximum nuber of spans that can be processed each second.
SpansPerSecond int64 `mapstructure:"spans-per-second"`
}

// Config holds the configuration for tail-based sampling.
type Config struct {
configmodels.ProcessorSettings `mapstructure:",squash"`
Expand All @@ -81,4 +89,7 @@ type Config struct {
// ExpectedNewTracesPerSec sets the expected number of new traces sending to the tail sampling processor
// per second. This helps with allocating data structures with closer to actual usage size.
ExpectedNewTracesPerSec uint64 `mapstructure:"expected-new-traces-per-sec"`
// PolicyCfgs sets the tail-based sampling policy which makes a sampling decision
// for a given trace when requested.
PolicyCfgs []PolicyCfg `mapstructure:"policies"`
}
31 changes: 25 additions & 6 deletions processor/tailsampling/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,38 @@ func TestLoadConfig(t *testing.T) {
cfg, err := config.LoadConfigFile(
t, path.Join(".", "testdata", "tail_sampling_config.yaml"), receivers, processors, exporters,
)

require.Nil(t, err)
require.NotNil(t, cfg)

p0 := cfg.Processors["tail-sampling"]
assert.Equal(t, p0,
assert.Equal(t, cfg.Processors["tail-sampling"],
&Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "tail-sampling",
NameVal: "tail-sampling",
},
DecisionWait: 31 * time.Second,
NumTraces: 20001,
ExpectedNewTracesPerSec: 100,
DecisionWait: 10 * time.Second,
NumTraces: 100,
ExpectedNewTracesPerSec: 10,
PolicyCfgs: []PolicyCfg{
{
Name: "test-policy-1",
Type: AlwaysSample,
},
{
Name: "test-policy-2",
Type: NumericAttribute,
NumericAttributeCfg: NumericAttributeCfg{Key: "key1", MinValue: 50, MaxValue: 100},
},
{
Name: "test-policy-3",
Type: StringAttribute,
StringAttributeCfg: StringAttributeCfg{Key: "key2", Values: []string{"value1", "value2"}},
},
{
Name: "test-policy-4",
Type: RateLimiting,
RateLimitingCfg: RateLimitingCfg{SpansPerSecond: 35},
},
},
})
}
10 changes: 9 additions & 1 deletion processor/tailsampling/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,15 @@ func TestCreateDefaultConfig(t *testing.T) {
func TestCreateProcessor(t *testing.T) {
factory := &Factory{}

cfg := factory.CreateDefaultConfig()
cfg := factory.CreateDefaultConfig().(*Config)
// Manually set required fields
cfg.ExpectedNewTracesPerSec = 64
cfg.PolicyCfgs = []PolicyCfg{
{
Name: "test-policy",
Type: AlwaysSample,
},
}

tp, err := factory.CreateTraceProcessor(zap.NewNop(), exportertest.NewNopTraceExporter(), cfg)
assert.NotNil(t, tp)
Expand Down
75 changes: 58 additions & 17 deletions processor/tailsampling/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package tailsampling
import (
"context"
"errors"
"fmt"
"runtime"
"sync"
"sync/atomic"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/open-telemetry/opentelemetry-service/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-service/internal/collector/processor/idbatcher"
"github.com/open-telemetry/opentelemetry-service/internal/collector/sampling"
"github.com/open-telemetry/opentelemetry-service/observability"
"github.com/open-telemetry/opentelemetry-service/processor"
)

Expand All @@ -41,8 +43,6 @@ type Policy struct {
Name string
// Evaluator that decides if a trace is sampled or not by this policy instance.
Evaluator sampling.PolicyEvaluator
// Destination is the consumer of the traces selected to be sampled.
Destination consumer.TraceConsumer
// ctx used to carry metric tags of each policy.
ctx context.Context
}
Expand All @@ -52,7 +52,7 @@ type Policy struct {
type traceKey string

// tailSamplingSpanProcessor handles the incoming trace data and uses the given sampling
// policies to sample traces.
// policy to sample traces.
type tailSamplingSpanProcessor struct {
ctx context.Context
nextConsumer consumer.TraceConsumer
Expand All @@ -67,6 +67,10 @@ type tailSamplingSpanProcessor struct {
numTracesOnMap uint64
}

const (
sourceFormat = "tail-sampling"
)

var _ processor.TraceProcessor = (*tailSamplingSpanProcessor)(nil)

// NewTraceProcessor returns a processor.TraceProcessor that will perform tail sampling according to the given
Expand All @@ -82,21 +86,58 @@ func NewTraceProcessor(logger *zap.Logger, nextConsumer consumer.TraceConsumer,
return nil, err
}

ctx := context.Background()
policies := []*Policy{}
for _, policyCfg := range cfg.PolicyCfgs {
policyCtx, err := tag.New(ctx, tag.Upsert(tagPolicyKey, policyCfg.Name), tag.Upsert(observability.TagKeyReceiver, sourceFormat))
if err != nil {
return nil, err
}
eval, err := getPolicyEvaluator(&policyCfg)
if err != nil {
return nil, err
}
policy := &Policy{
Name: policyCfg.Name,
Evaluator: eval,
ctx: policyCtx,
}
policies = append(policies, policy)
}

tsp := &tailSamplingSpanProcessor{
ctx: context.Background(),
ctx: ctx,
nextConsumer: nextConsumer,
maxNumTraces: cfg.NumTraces,
logger: logger,
decisionBatcher: inBatcher,
policies: policies,
}

// TODO(#146): add policies to TailBasedCfg
tsp.policyTicker = &policyTicker{onTick: tsp.samplingPolicyOnTick}
tsp.deleteChan = make(chan traceKey, cfg.NumTraces)

return tsp, nil
}

func getPolicyEvaluator(cfg *PolicyCfg) (sampling.PolicyEvaluator, error) {
switch cfg.Type {
case AlwaysSample:
return sampling.NewAlwaysSample(), nil
case NumericAttribute:
nafCfg := cfg.NumericAttributeCfg
return sampling.NewNumericAttributeFilter(nafCfg.Key, nafCfg.MinValue, nafCfg.MaxValue), nil
case StringAttribute:
safCfg := cfg.StringAttributeCfg
return sampling.NewStringAttributeFilter(safCfg.Key, safCfg.Values), nil
case RateLimiting:
rlfCfg := cfg.RateLimitingCfg
return sampling.NewRateLimiting(rlfCfg.SpansPerSecond), nil
default:
return nil, fmt.Errorf("unknown sampling policy type %s", cfg.Type)
}
}

func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
var idNotFoundOnMapCount, evaluateErrorCount, decisionSampled, decisionNotSampled int64
startTime := time.Now()
Expand All @@ -118,13 +159,13 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
policy.ctx,
statDecisionLatencyMicroSec.M(int64(time.Since(policyEvaluateStartTime)/time.Microsecond)))
if err != nil {
trace.Decision[i] = sampling.NotSampled
trace.Decisions[i] = sampling.NotSampled
evaluateErrorCount++
tsp.logger.Error("Sampling policy error", zap.Error(err))
continue
}

trace.Decision[i] = decision
trace.Decisions[i] = decision

switch decision {
case sampling.Sampled:
Expand All @@ -140,7 +181,7 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() {
trace.Unlock()

for j := 0; j < len(traceBatches); j++ {
policy.Destination.ConsumeTraceData(policy.ctx, traceBatches[j])
tsp.nextConsumer.ConsumeTraceData(policy.ctx, traceBatches[j])
}
case sampling.NotSampled:
stats.RecordWithTags(
Expand Down Expand Up @@ -201,7 +242,7 @@ func (tsp *tailSamplingSpanProcessor) ConsumeTraceData(ctx context.Context, td c
initialDecisions[i] = sampling.Pending
}
initialTraceData := &sampling.TraceData{
Decision: initialDecisions,
Decisions: initialDecisions,
ArrivalTime: time.Now(),
SpanCount: lenSpans,
}
Expand All @@ -227,13 +268,13 @@ func (tsp *tailSamplingSpanProcessor) ConsumeTraceData(ctx context.Context, td c
}
}

for i, policyAndDests := range tsp.policies {
for i, policy := range tsp.policies {
actualData.Lock()
actualDecision := actualData.Decision[i]
actualDecision := actualData.Decisions[i]
// If decision is pending, we want to add the new spans still under the lock, so the decision doesn't happen
// in between the transition from pending.
if actualDecision == sampling.Pending {
// Add the spans to the trace, but only once for all policies, otherwise same spans will
// Add the spans to the trace, but only once for all policy, otherwise same spans will
// be duplicated in the final trace.
traceTd := prepareTraceBatch(spans, singleTrace, td)
actualData.ReceivedBatches = append(actualData.ReceivedBatches, traceTd)
Expand All @@ -248,19 +289,19 @@ func (tsp *tailSamplingSpanProcessor) ConsumeTraceData(ctx context.Context, td c
case sampling.Sampled:
// Forward the spans to the policy destinations
traceTd := prepareTraceBatch(spans, singleTrace, td)
if err := policyAndDests.Destination.ConsumeTraceData(policyAndDests.ctx, traceTd); err != nil {
if err := tsp.nextConsumer.ConsumeTraceData(policy.ctx, traceTd); err != nil {
tsp.logger.Warn("Error sending late arrived spans to destination",
zap.String("policy", policyAndDests.Name),
zap.String("policy", policy.Name),
zap.Error(err))
}
fallthrough // so OnLateArrivingSpans is also called for decision Sampled.
case sampling.NotSampled:
policyAndDests.Evaluator.OnLateArrivingSpans(actualDecision, spans)
policy.Evaluator.OnLateArrivingSpans(actualDecision, spans)
stats.Record(tsp.ctx, statLateSpanArrivalAfterDecision.M(int64(time.Since(actualData.DecisionTime)/time.Second)))

default:
tsp.logger.Warn("Encountered unexpected sampling decision",
zap.String("policy", policyAndDests.Name),
zap.String("policy", policy.Name),
zap.Int("decision", int(actualDecision)))
}
}
Expand All @@ -285,7 +326,7 @@ func (tsp *tailSamplingSpanProcessor) dropTrace(traceID traceKey, deletionTime t
policiesLen := len(tsp.policies)
stats.Record(tsp.ctx, statTraceRemovalAgeSec.M(int64(deletionTime.Sub(trace.ArrivalTime)/time.Second)))
for j := 0; j < policiesLen; j++ {
if trace.Decision[j] == sampling.Pending {
if trace.Decisions[j] == sampling.Pending {
policy := tsp.policies[j]
if decision, err := policy.Evaluator.OnDroppedSpans([]byte(traceID), trace); err != nil {
tsp.logger.Warn("OnDroppedSpans",
Expand Down
Loading