diff --git a/defaults/defaults.go b/defaults/defaults.go index 99dad5b8cb8..c388f6ff5bf 100644 --- a/defaults/defaults.go +++ b/defaults/defaults.go @@ -28,6 +28,7 @@ import ( "github.com/open-telemetry/opentelemetry-service/processor/attributekeyprocessor" "github.com/open-telemetry/opentelemetry-service/processor/nodebatcher" "github.com/open-telemetry/opentelemetry-service/processor/queued" + "github.com/open-telemetry/opentelemetry-service/processor/tailsampling" "github.com/open-telemetry/opentelemetry-service/receiver" "github.com/open-telemetry/opentelemetry-service/receiver/jaegerreceiver" "github.com/open-telemetry/opentelemetry-service/receiver/opencensusreceiver" @@ -72,6 +73,7 @@ func Components() ( &attributekeyprocessor.Factory{}, &queued.Factory{}, &nodebatcher.Factory{}, + &tailsampling.Factory{}, ) if err != nil { errs = append(errs, err) diff --git a/defaults/defaults_test.go b/defaults/defaults_test.go index e3238b4217f..f8e884dd850 100644 --- a/defaults/defaults_test.go +++ b/defaults/defaults_test.go @@ -33,6 +33,7 @@ import ( "github.com/open-telemetry/opentelemetry-service/processor/attributekeyprocessor" "github.com/open-telemetry/opentelemetry-service/processor/nodebatcher" "github.com/open-telemetry/opentelemetry-service/processor/queued" + "github.com/open-telemetry/opentelemetry-service/processor/tailsampling" "github.com/open-telemetry/opentelemetry-service/receiver" "github.com/open-telemetry/opentelemetry-service/receiver/jaegerreceiver" "github.com/open-telemetry/opentelemetry-service/receiver/opencensusreceiver" @@ -54,6 +55,7 @@ func TestDefaultComponents(t *testing.T) { "attribute-key": &attributekeyprocessor.Factory{}, "queued-retry": &queued.Factory{}, "batch": &nodebatcher.Factory{}, + "tail-sampling": &tailsampling.Factory{}, } expectedExporters := map[string]exporter.Factory{ "opencensus": &opencensusexporter.Factory{}, diff --git a/internal/collector/sampling/policy.go b/internal/collector/sampling/policy.go index 255bd9638e3..ff9e863689b 100644 --- a/internal/collector/sampling/policy.go +++ b/internal/collector/sampling/policy.go @@ -26,8 +26,8 @@ import ( // TraceData stores the sampling related trace data. type TraceData struct { sync.Mutex - // Decision gives the current status of the sampling decision for each policy. - Decision []Decision + // Decisions gives the current status of the sampling decision for each policy. + Decisions []Decision // Arrival time the first span for the trace was received. ArrivalTime time.Time // Decisiontime time when sampling decision was taken. diff --git a/processor/tailsampling/config.go b/processor/tailsampling/config.go index 88000ab2063..9659636d008 100644 --- a/processor/tailsampling/config.go +++ b/processor/tailsampling/config.go @@ -26,12 +26,12 @@ type PolicyType string const ( // AlwaysSample samples all traces, typically used for debugging. AlwaysSample PolicyType = "always-sample" - // NumericAttributeFilter sample traces that have a given numeric attribute in a specified + // NumericAttribute sample traces that have a given numeric attribute in a specified // range, e.g.: attribute "http.status_code" >= 399 and <= 999. - NumericAttributeFilter PolicyType = "numeric-attribute-filter" - // StringAttributeFilter sample traces that a attribute, of type string, matching + NumericAttribute PolicyType = "numeric-attribute" + // StringAttribute sample traces that a attribute, of type string, matching // one of the listed values. - StringAttributeFilter PolicyType = "string-attribute-filter" + StringAttribute PolicyType = "string-attribute" // RateLimiting allows all traces until the specified limits are satisfied. RateLimiting PolicyType = "rate-limiting" ) @@ -39,19 +39,20 @@ const ( // PolicyCfg holds the common configuration to all policies. type PolicyCfg struct { // Name given to the instance of the policy to make easy to identify it in metrics and logs. - Name string + Name string `mapstructure:"name"` // Type of the policy this will be used to match the proper configuration of the policy. - Type PolicyType - // Exporters hold the name of the exporters that the policy evaluator uses to make decisions - // about whether or not sending the traces. - Exporters []string - // Configuration holds the settings specific to the policy. - Configuration interface{} + Type PolicyType `mapstructure:"type"` + // Configs for numeric attribute filter sampling policy evaluator. + NumericAttributeCfg NumericAttributeCfg `mapstructure:"numeric-attribute"` + // Configs for string attribute filter sampling policy evaluator. + StringAttributeCfg StringAttributeCfg `mapstructure:"string-attribute"` + // Configs for rate limiting filter sampling policy evaluator. + RateLimitingCfg RateLimitingCfg `mapstructure:"rate-limiting"` } -// NumericAttributeFilterCfg holds the configurable settings to create a numeric attribute filter +// NumericAttributeCfg holds the configurable settings to create a numeric attribute filter // sampling policy evaluator. -type NumericAttributeFilterCfg struct { +type NumericAttributeCfg struct { // Tag that the filter is going to be matching against. Key string `mapstructure:"key"` // MinValue is the minimum value of the attribute to be considered a match. @@ -60,15 +61,22 @@ type NumericAttributeFilterCfg struct { MaxValue int64 `mapstructure:"max-value"` } -// StringAttributeFilterCfg holds the configurable settings to create a string attribute filter +// StringAttributeCfg holds the configurable settings to create a string attribute filter // sampling policy evaluator. -type StringAttributeFilterCfg struct { +type StringAttributeCfg struct { // Tag that the filter is going to be matching against. Key string `mapstructure:"key"` // Values is the set of attribute values that if any is equal to the actual attribute value to be considered a match. Values []string `mapstructure:"values"` } +// RateLimitingCfg holds the configurable settings to create a rate limiting +// sampling policy evaluator. +type RateLimitingCfg struct { + // SpansPerSecond sets the limit on the maximum nuber of spans that can be processed each second. + SpansPerSecond int64 `mapstructure:"spans-per-second"` +} + // Config holds the configuration for tail-based sampling. type Config struct { configmodels.ProcessorSettings `mapstructure:",squash"` @@ -81,4 +89,7 @@ type Config struct { // ExpectedNewTracesPerSec sets the expected number of new traces sending to the tail sampling processor // per second. This helps with allocating data structures with closer to actual usage size. ExpectedNewTracesPerSec uint64 `mapstructure:"expected-new-traces-per-sec"` + // PolicyCfgs sets the tail-based sampling policy which makes a sampling decision + // for a given trace when requested. + PolicyCfgs []PolicyCfg `mapstructure:"policies"` } diff --git a/processor/tailsampling/config_test.go b/processor/tailsampling/config_test.go index 6aed9bc4569..205e42bdaa9 100644 --- a/processor/tailsampling/config_test.go +++ b/processor/tailsampling/config_test.go @@ -36,19 +36,38 @@ func TestLoadConfig(t *testing.T) { cfg, err := config.LoadConfigFile( t, path.Join(".", "testdata", "tail_sampling_config.yaml"), receivers, processors, exporters, ) - require.Nil(t, err) require.NotNil(t, cfg) - p0 := cfg.Processors["tail-sampling"] - assert.Equal(t, p0, + assert.Equal(t, cfg.Processors["tail-sampling"], &Config{ ProcessorSettings: configmodels.ProcessorSettings{ TypeVal: "tail-sampling", NameVal: "tail-sampling", }, - DecisionWait: 31 * time.Second, - NumTraces: 20001, - ExpectedNewTracesPerSec: 100, + DecisionWait: 10 * time.Second, + NumTraces: 100, + ExpectedNewTracesPerSec: 10, + PolicyCfgs: []PolicyCfg{ + { + Name: "test-policy-1", + Type: AlwaysSample, + }, + { + Name: "test-policy-2", + Type: NumericAttribute, + NumericAttributeCfg: NumericAttributeCfg{Key: "key1", MinValue: 50, MaxValue: 100}, + }, + { + Name: "test-policy-3", + Type: StringAttribute, + StringAttributeCfg: StringAttributeCfg{Key: "key2", Values: []string{"value1", "value2"}}, + }, + { + Name: "test-policy-4", + Type: RateLimiting, + RateLimitingCfg: RateLimitingCfg{SpansPerSecond: 35}, + }, + }, }) } diff --git a/processor/tailsampling/factory_test.go b/processor/tailsampling/factory_test.go index e28841aa98e..21cbab5f47f 100644 --- a/processor/tailsampling/factory_test.go +++ b/processor/tailsampling/factory_test.go @@ -33,7 +33,15 @@ func TestCreateDefaultConfig(t *testing.T) { func TestCreateProcessor(t *testing.T) { factory := &Factory{} - cfg := factory.CreateDefaultConfig() + cfg := factory.CreateDefaultConfig().(*Config) + // Manually set required fields + cfg.ExpectedNewTracesPerSec = 64 + cfg.PolicyCfgs = []PolicyCfg{ + { + Name: "test-policy", + Type: AlwaysSample, + }, + } tp, err := factory.CreateTraceProcessor(zap.NewNop(), exportertest.NewNopTraceExporter(), cfg) assert.NotNil(t, tp) diff --git a/processor/tailsampling/processor.go b/processor/tailsampling/processor.go index 0f1d29cf845..e74aa3c7a7a 100644 --- a/processor/tailsampling/processor.go +++ b/processor/tailsampling/processor.go @@ -17,6 +17,7 @@ package tailsampling import ( "context" "errors" + "fmt" "runtime" "sync" "sync/atomic" @@ -31,6 +32,7 @@ import ( "github.com/open-telemetry/opentelemetry-service/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-service/internal/collector/processor/idbatcher" "github.com/open-telemetry/opentelemetry-service/internal/collector/sampling" + "github.com/open-telemetry/opentelemetry-service/observability" "github.com/open-telemetry/opentelemetry-service/processor" ) @@ -41,8 +43,6 @@ type Policy struct { Name string // Evaluator that decides if a trace is sampled or not by this policy instance. Evaluator sampling.PolicyEvaluator - // Destination is the consumer of the traces selected to be sampled. - Destination consumer.TraceConsumer // ctx used to carry metric tags of each policy. ctx context.Context } @@ -52,7 +52,7 @@ type Policy struct { type traceKey string // tailSamplingSpanProcessor handles the incoming trace data and uses the given sampling -// policies to sample traces. +// policy to sample traces. type tailSamplingSpanProcessor struct { ctx context.Context nextConsumer consumer.TraceConsumer @@ -67,6 +67,10 @@ type tailSamplingSpanProcessor struct { numTracesOnMap uint64 } +const ( + sourceFormat = "tail-sampling" +) + var _ processor.TraceProcessor = (*tailSamplingSpanProcessor)(nil) // NewTraceProcessor returns a processor.TraceProcessor that will perform tail sampling according to the given @@ -82,21 +86,58 @@ func NewTraceProcessor(logger *zap.Logger, nextConsumer consumer.TraceConsumer, return nil, err } + ctx := context.Background() + policies := []*Policy{} + for _, policyCfg := range cfg.PolicyCfgs { + policyCtx, err := tag.New(ctx, tag.Upsert(tagPolicyKey, policyCfg.Name), tag.Upsert(observability.TagKeyReceiver, sourceFormat)) + if err != nil { + return nil, err + } + eval, err := getPolicyEvaluator(&policyCfg) + if err != nil { + return nil, err + } + policy := &Policy{ + Name: policyCfg.Name, + Evaluator: eval, + ctx: policyCtx, + } + policies = append(policies, policy) + } + tsp := &tailSamplingSpanProcessor{ - ctx: context.Background(), + ctx: ctx, nextConsumer: nextConsumer, maxNumTraces: cfg.NumTraces, logger: logger, decisionBatcher: inBatcher, + policies: policies, } - // TODO(#146): add policies to TailBasedCfg tsp.policyTicker = &policyTicker{onTick: tsp.samplingPolicyOnTick} tsp.deleteChan = make(chan traceKey, cfg.NumTraces) return tsp, nil } +func getPolicyEvaluator(cfg *PolicyCfg) (sampling.PolicyEvaluator, error) { + switch cfg.Type { + case AlwaysSample: + return sampling.NewAlwaysSample(), nil + case NumericAttribute: + nafCfg := cfg.NumericAttributeCfg + return sampling.NewNumericAttributeFilter(nafCfg.Key, nafCfg.MinValue, nafCfg.MaxValue), nil + case StringAttribute: + safCfg := cfg.StringAttributeCfg + return sampling.NewStringAttributeFilter(safCfg.Key, safCfg.Values), nil + case RateLimiting: + rlfCfg := cfg.RateLimitingCfg + return sampling.NewRateLimiting(rlfCfg.SpansPerSecond), nil + default: + return nil, fmt.Errorf("unknown sampling policy type %s", cfg.Type) + } +} + func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() { var idNotFoundOnMapCount, evaluateErrorCount, decisionSampled, decisionNotSampled int64 startTime := time.Now() @@ -118,13 +159,13 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() { policy.ctx, statDecisionLatencyMicroSec.M(int64(time.Since(policyEvaluateStartTime)/time.Microsecond))) if err != nil { - trace.Decision[i] = sampling.NotSampled + trace.Decisions[i] = sampling.NotSampled evaluateErrorCount++ tsp.logger.Error("Sampling policy error", zap.Error(err)) continue } - trace.Decision[i] = decision + trace.Decisions[i] = decision switch decision { case sampling.Sampled: @@ -140,7 +181,7 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() { trace.Unlock() for j := 0; j < len(traceBatches); j++ { - policy.Destination.ConsumeTraceData(policy.ctx, traceBatches[j]) + tsp.nextConsumer.ConsumeTraceData(policy.ctx, traceBatches[j]) } case sampling.NotSampled: stats.RecordWithTags( @@ -201,7 +242,7 @@ func (tsp *tailSamplingSpanProcessor) ConsumeTraceData(ctx context.Context, td c initialDecisions[i] = sampling.Pending } initialTraceData := &sampling.TraceData{ - Decision: initialDecisions, + Decisions: initialDecisions, ArrivalTime: time.Now(), SpanCount: lenSpans, } @@ -227,13 +268,13 @@ func (tsp *tailSamplingSpanProcessor) ConsumeTraceData(ctx context.Context, td c } } - for i, policyAndDests := range tsp.policies { + for i, policy := range tsp.policies { actualData.Lock() - actualDecision := actualData.Decision[i] + actualDecision := actualData.Decisions[i] // If decision is pending, we want to add the new spans still under the lock, so the decision doesn't happen // in between the transition from pending. if actualDecision == sampling.Pending { - // Add the spans to the trace, but only once for all policies, otherwise same spans will + // Add the spans to the trace, but only once for all policy, otherwise same spans will // be duplicated in the final trace. traceTd := prepareTraceBatch(spans, singleTrace, td) actualData.ReceivedBatches = append(actualData.ReceivedBatches, traceTd) @@ -248,19 +289,19 @@ func (tsp *tailSamplingSpanProcessor) ConsumeTraceData(ctx context.Context, td c case sampling.Sampled: // Forward the spans to the policy destinations traceTd := prepareTraceBatch(spans, singleTrace, td) - if err := policyAndDests.Destination.ConsumeTraceData(policyAndDests.ctx, traceTd); err != nil { + if err := tsp.nextConsumer.ConsumeTraceData(policy.ctx, traceTd); err != nil { tsp.logger.Warn("Error sending late arrived spans to destination", - zap.String("policy", policyAndDests.Name), + zap.String("policy", policy.Name), zap.Error(err)) } fallthrough // so OnLateArrivingSpans is also called for decision Sampled. case sampling.NotSampled: - policyAndDests.Evaluator.OnLateArrivingSpans(actualDecision, spans) + policy.Evaluator.OnLateArrivingSpans(actualDecision, spans) stats.Record(tsp.ctx, statLateSpanArrivalAfterDecision.M(int64(time.Since(actualData.DecisionTime)/time.Second))) default: tsp.logger.Warn("Encountered unexpected sampling decision", - zap.String("policy", policyAndDests.Name), + zap.String("policy", policy.Name), zap.Int("decision", int(actualDecision))) } } @@ -285,7 +326,7 @@ func (tsp *tailSamplingSpanProcessor) dropTrace(traceID traceKey, deletionTime t policiesLen := len(tsp.policies) stats.Record(tsp.ctx, statTraceRemovalAgeSec.M(int64(deletionTime.Sub(trace.ArrivalTime)/time.Second))) for j := 0; j < policiesLen; j++ { - if trace.Decision[j] == sampling.Pending { + if trace.Decisions[j] == sampling.Pending { policy := tsp.policies[j] if decision, err := policy.Evaluator.OnDroppedSpans([]byte(traceID), trace); err != nil { tsp.logger.Warn("OnDroppedSpans", diff --git a/processor/tailsampling/processor_test.go b/processor/tailsampling/processor_test.go index e5938dbba2a..542df7e804b 100644 --- a/processor/tailsampling/processor_test.go +++ b/processor/tailsampling/processor_test.go @@ -23,11 +23,11 @@ import ( tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" "go.uber.org/zap" - "github.com/open-telemetry/opentelemetry-service/consumer" "github.com/open-telemetry/opentelemetry-service/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-service/exporter/exportertest" "github.com/open-telemetry/opentelemetry-service/internal/collector/processor/idbatcher" "github.com/open-telemetry/opentelemetry-service/internal/collector/sampling" + "github.com/open-telemetry/opentelemetry-service/processor" tracetranslator "github.com/open-telemetry/opentelemetry-service/translator/trace" ) @@ -35,12 +35,15 @@ const ( defaultTestDecisionWait = 30 * time.Second ) +var testPolicy = []PolicyCfg{{Name: "test-policy", Type: AlwaysSample}} + func TestSequentialTraceArrival(t *testing.T) { traceIds, batches := generateIdsAndBatches(128) cfg := Config{ DecisionWait: defaultTestDecisionWait, NumTraces: uint64(2 * len(traceIds)), ExpectedNewTracesPerSec: 64, + PolicyCfgs: testPolicy, } sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg) tsp := sp.(*tailSamplingSpanProcessor) @@ -67,6 +70,7 @@ func TestConcurrentTraceArrival(t *testing.T) { DecisionWait: defaultTestDecisionWait, NumTraces: uint64(2 * len(traceIds)), ExpectedNewTracesPerSec: 64, + PolicyCfgs: testPolicy, } sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg) tsp := sp.(*tailSamplingSpanProcessor) @@ -105,6 +109,7 @@ func TestSequentialTraceMapSize(t *testing.T) { DecisionWait: defaultTestDecisionWait, NumTraces: uint64(maxSize), ExpectedNewTracesPerSec: 64, + PolicyCfgs: testPolicy, } sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg) tsp := sp.(*tailSamplingSpanProcessor) @@ -128,6 +133,7 @@ func TestConcurrentTraceMapSize(t *testing.T) { DecisionWait: defaultTestDecisionWait, NumTraces: uint64(maxSize), ExpectedNewTracesPerSec: 64, + PolicyCfgs: testPolicy, } sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg) tsp := sp.(*tailSamplingSpanProcessor) @@ -154,23 +160,23 @@ func TestConcurrentTraceMapSize(t *testing.T) { } func TestSamplingPolicyTypicalPath(t *testing.T) { - t.Skip("TODO(#146): add policies to TailBasedCfg and fix this test") const maxSize = 100 const decisionWaitSeconds = 5 + // For this test explicitly control the timer calls and batcher, and set a mock + // sampling policy evaluator. msp := &mockSpanProcessor{} mpe := &mockPolicyEvaluator{} - cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: uint64(maxSize), - ExpectedNewTracesPerSec: 64, - } - sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg) - tsp := sp.(*tailSamplingSpanProcessor) - - // For this test explicitly control the timer calls and batcher. mtt := &manualTTicker{} - tsp.policyTicker = mtt - tsp.decisionBatcher = newSyncIDBatcher(decisionWaitSeconds) + tsp := &tailSamplingSpanProcessor{ + ctx: context.Background(), + nextConsumer: msp, + maxNumTraces: maxSize, + logger: zap.NewNop(), + decisionBatcher: newSyncIDBatcher(decisionWaitSeconds), + policies: []*Policy{{Name: "mock-policy", Evaluator: mpe, ctx: context.TODO()}}, + deleteChan: make(chan traceKey, maxSize), + policyTicker: mtt, + } _, batches := generateIdsAndBatches(210) currItem := 0 @@ -314,7 +320,7 @@ type mockSpanProcessor struct { TotalSpans int } -var _ consumer.TraceConsumer = &mockSpanProcessor{} +var _ processor.TraceProcessor = &mockSpanProcessor{} func (p *mockSpanProcessor) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { batchSize := len(td.Spans) diff --git a/processor/tailsampling/testdata/tail_sampling_config.yaml b/processor/tailsampling/testdata/tail_sampling_config.yaml index 52d2fedc42d..f76ad4f96b3 100644 --- a/processor/tailsampling/testdata/tail_sampling_config.yaml +++ b/processor/tailsampling/testdata/tail_sampling_config.yaml @@ -6,9 +6,31 @@ exporters: processors: tail-sampling: - decision-wait: 31s - num-traces: 20001 - expected-new-traces-per-sec: 100 + decision-wait: 10s + num-traces: 100 + expected-new-traces-per-sec: 10 + policies: + [ + { + name: test-policy-1, + type: always-sample + }, + { + name: test-policy-2, + type: numeric-attribute, + numeric-attribute: {key: key1, min-value: 50, max-value: 100} + }, + { + name: test-policy-3, + type: string-attribute, + string-attribute: {key: key2, values: [value1, value2]} + }, + { + name: test-policy-4, + type: rate-limiting, + rate-limiting: {spans-per-second: 35} + } + ] pipelines: traces: