From 360aa05d8e7ef9d0c892d5935f05d691445ef486 Mon Sep 17 00:00:00 2001 From: songy23 Date: Thu, 1 Aug 2019 17:17:55 -0700 Subject: [PATCH] Finish up configs for tail sampling policies Fixes #146. Note one additional change is now each tail sampling processor can only accept one policy, because one processor can only have one next consumer. If users need more than one policies they can define multiple tail sampling processors in the config. --- internal/collector/sampling/policy.go | 4 +- processor/tailsampling/config.go | 29 ++- processor/tailsampling/config_test.go | 62 +++++- processor/tailsampling/factory_test.go | 8 +- processor/tailsampling/processor.go | 201 ++++++++++-------- processor/tailsampling/processor_test.go | 44 ++-- .../testdata/tail_sampling_config.yaml | 41 +++- 7 files changed, 253 insertions(+), 136 deletions(-) diff --git a/internal/collector/sampling/policy.go b/internal/collector/sampling/policy.go index 255bd9638e3..4427048a4f2 100644 --- a/internal/collector/sampling/policy.go +++ b/internal/collector/sampling/policy.go @@ -26,8 +26,8 @@ import ( // TraceData stores the sampling related trace data. type TraceData struct { sync.Mutex - // Decision gives the current status of the sampling decision for each policy. - Decision []Decision + // Decision gives the current status of the sampling decision. + Decision Decision // Arrival time the first span for the trace was received. ArrivalTime time.Time // Decisiontime time when sampling decision was taken. diff --git a/processor/tailsampling/config.go b/processor/tailsampling/config.go index 88000ab2063..cff8b2e981a 100644 --- a/processor/tailsampling/config.go +++ b/processor/tailsampling/config.go @@ -32,21 +32,22 @@ const ( // StringAttributeFilter sample traces that a attribute, of type string, matching // one of the listed values. StringAttributeFilter PolicyType = "string-attribute-filter" - // RateLimiting allows all traces until the specified limits are satisfied. - RateLimiting PolicyType = "rate-limiting" + // RateLimitingFilter allows all traces until the specified limits are satisfied. + RateLimitingFilter PolicyType = "rate-limiting-filter" ) // PolicyCfg holds the common configuration to all policies. type PolicyCfg struct { // Name given to the instance of the policy to make easy to identify it in metrics and logs. - Name string + Name string `mapstructure:"name"` // Type of the policy this will be used to match the proper configuration of the policy. - Type PolicyType - // Exporters hold the name of the exporters that the policy evaluator uses to make decisions - // about whether or not sending the traces. - Exporters []string - // Configuration holds the settings specific to the policy. - Configuration interface{} + Type PolicyType `mapstructure:"type"` + // Configs for numeric attribute filter sampling policy evaluator. + NumericAttributeFilterCfg NumericAttributeFilterCfg `mapstructure:"numeric-attribute-filter"` + // Configs for string attribute filter sampling policy evaluator. + StringAttributeFilterCfg StringAttributeFilterCfg `mapstructure:"string-attribute-filter"` + // Configs for rate limiting filter sampling policy evaluator. + RateLimitingFilterCfg RateLimitingFilterCfg `mapstructure:"rate-limiting-filter"` } // NumericAttributeFilterCfg holds the configurable settings to create a numeric attribute filter @@ -69,6 +70,13 @@ type StringAttributeFilterCfg struct { Values []string `mapstructure:"values"` } +// RateLimitingFilterCfg holds the configurable settings to create a rate limiting +// sampling policy evaluator. +type RateLimitingFilterCfg struct { + // SpansPerSecond sets the limit on the maximum nuber of spans that can be processed each second. + SpansPerSecond int64 `mapstructure:"spans-per-second"` +} + // Config holds the configuration for tail-based sampling. type Config struct { configmodels.ProcessorSettings `mapstructure:",squash"` @@ -81,4 +89,7 @@ type Config struct { // ExpectedNewTracesPerSec sets the expected number of new traces sending to the tail sampling processor // per second. This helps with allocating data structures with closer to actual usage size. ExpectedNewTracesPerSec uint64 `mapstructure:"expected-new-traces-per-sec"` + // Policy sets the tail-based sampling policy which makes a sampling decision + // for a given trace when requested. + PolicyCfg PolicyCfg `mapstructure:"policy"` } diff --git a/processor/tailsampling/config_test.go b/processor/tailsampling/config_test.go index 6aed9bc4569..a433ffea231 100644 --- a/processor/tailsampling/config_test.go +++ b/processor/tailsampling/config_test.go @@ -36,19 +36,69 @@ func TestLoadConfig(t *testing.T) { cfg, err := config.LoadConfigFile( t, path.Join(".", "testdata", "tail_sampling_config.yaml"), receivers, processors, exporters, ) - require.Nil(t, err) require.NotNil(t, cfg) - p0 := cfg.Processors["tail-sampling"] - assert.Equal(t, p0, + assert.Equal(t, cfg.Processors["tail-sampling"], &Config{ ProcessorSettings: configmodels.ProcessorSettings{ TypeVal: "tail-sampling", NameVal: "tail-sampling", }, - DecisionWait: 31 * time.Second, - NumTraces: 20001, - ExpectedNewTracesPerSec: 100, + DecisionWait: 10 * time.Second, + NumTraces: 100, + ExpectedNewTracesPerSec: 10, + PolicyCfg: PolicyCfg{ + Name: "test-policy-1", + Type: AlwaysSample, + }, + }) + + assert.Equal(t, cfg.Processors["tail-sampling/2"], + &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "tail-sampling", + NameVal: "tail-sampling/2", + }, + DecisionWait: 20 * time.Second, + NumTraces: 200, + ExpectedNewTracesPerSec: 20, + PolicyCfg: PolicyCfg{ + Name: "test-policy-2", + Type: NumericAttributeFilter, + NumericAttributeFilterCfg: NumericAttributeFilterCfg{Key: "key1", MinValue: 50, MaxValue: 100}, + }, + }) + + assert.Equal(t, cfg.Processors["tail-sampling/3"], + &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "tail-sampling", + NameVal: "tail-sampling/3", + }, + DecisionWait: 30 * time.Second, + NumTraces: 300, + ExpectedNewTracesPerSec: 30, + PolicyCfg: PolicyCfg{ + Name: "test-policy-3", + Type: StringAttributeFilter, + StringAttributeFilterCfg: StringAttributeFilterCfg{Key: "key2", Values: []string{"value1", "value2"}}, + }, + }) + + assert.Equal(t, cfg.Processors["tail-sampling/4"], + &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "tail-sampling", + NameVal: "tail-sampling/4", + }, + DecisionWait: 40 * time.Second, + NumTraces: 400, + ExpectedNewTracesPerSec: 40, + PolicyCfg: PolicyCfg{ + Name: "test-policy-4", + Type: RateLimitingFilter, + RateLimitingFilterCfg: RateLimitingFilterCfg{SpansPerSecond: 35}, + }, }) } diff --git a/processor/tailsampling/factory_test.go b/processor/tailsampling/factory_test.go index e28841aa98e..b5731cd9d7c 100644 --- a/processor/tailsampling/factory_test.go +++ b/processor/tailsampling/factory_test.go @@ -33,7 +33,13 @@ func TestCreateDefaultConfig(t *testing.T) { func TestCreateProcessor(t *testing.T) { factory := &Factory{} - cfg := factory.CreateDefaultConfig() + cfg := factory.CreateDefaultConfig().(*Config) + // Manually set required fields + cfg.ExpectedNewTracesPerSec = 64 + cfg.PolicyCfg = PolicyCfg{ + Name: "test-policy", + Type: AlwaysSample, + } tp, err := factory.CreateTraceProcessor(zap.NewNop(), exportertest.NewNopTraceExporter(), cfg) assert.NotNil(t, tp) diff --git a/processor/tailsampling/processor.go b/processor/tailsampling/processor.go index 551f985edbd..30de97e2ecf 100644 --- a/processor/tailsampling/processor.go +++ b/processor/tailsampling/processor.go @@ -17,6 +17,7 @@ package tailsampling import ( "context" "errors" + "fmt" "runtime" "sync" "sync/atomic" @@ -31,6 +32,7 @@ import ( "github.com/open-telemetry/opentelemetry-service/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-service/internal/collector/processor/idbatcher" "github.com/open-telemetry/opentelemetry-service/internal/collector/sampling" + "github.com/open-telemetry/opentelemetry-service/observability" "github.com/open-telemetry/opentelemetry-service/processor" ) @@ -41,8 +43,6 @@ type Policy struct { Name string // Evaluator that decides if a trace is sampled or not by this policy instance. Evaluator sampling.PolicyEvaluator - // Destination is the consumer of the traces selected to be sampled. - Destination consumer.TraceConsumer // ctx used to carry metric tags of each policy. ctx context.Context } @@ -52,13 +52,13 @@ type Policy struct { type traceKey string // tailSamplingSpanProcessor handles the incoming trace data and uses the given sampling -// policies to sample traces. +// policy to sample traces. type tailSamplingSpanProcessor struct { ctx context.Context nextConsumer consumer.TraceConsumer start sync.Once maxNumTraces uint64 - policies []*Policy + policy *Policy logger *zap.Logger idToTrace sync.Map policyTicker tTicker @@ -86,21 +86,54 @@ func NewTraceProcessor(logger *zap.Logger, nextConsumer consumer.TraceConsumer, return nil, err } + ctx := context.Background() + policyCtx, err := tag.New(ctx, tag.Upsert(tagPolicyKey, cfg.PolicyCfg.Name), tag.Upsert(observability.TagKeyReceiver, sourceFormat)) + if err != nil { + return nil, err + } + eval, err := getPolicyEvaluator(&cfg.PolicyCfg) + if err != nil { + return nil, err + } + policy := &Policy{ + Name: cfg.PolicyCfg.Name, + Evaluator: eval, + ctx: policyCtx, + } + tsp := &tailSamplingSpanProcessor{ - ctx: context.Background(), + ctx: ctx, nextConsumer: nextConsumer, maxNumTraces: cfg.NumTraces, logger: logger, decisionBatcher: inBatcher, + policy: policy, } - // TODO(#146): add policies to TailBasedCfg tsp.policyTicker = &policyTicker{onTick: tsp.samplingPolicyOnTick} tsp.deleteChan = make(chan traceKey, cfg.NumTraces) return tsp, nil } +func getPolicyEvaluator(cfg *PolicyCfg) (sampling.PolicyEvaluator, error) { + switch cfg.Type { + case AlwaysSample: + return sampling.NewAlwaysSample(), nil + case NumericAttributeFilter: + nafCfg := cfg.NumericAttributeFilterCfg + return sampling.NewNumericAttributeFilter(nafCfg.Key, nafCfg.MinValue, nafCfg.MaxValue), nil + case StringAttributeFilter: + safCfg := cfg.StringAttributeFilterCfg + return sampling.NewStringAttributeFilter(safCfg.Key, safCfg.Values), nil + case RateLimitingFilter: + rlfCfg := cfg.RateLimitingFilterCfg + return sampling.NewRateLimiting(rlfCfg.SpansPerSecond), nil + default: + return nil, fmt.Errorf("unknown sampling policy type %s", cfg.Type) + } +} + func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() { var idNotFoundOnMapCount, evaluateErrorCount, decisionSampled, decisionNotSampled int64 startTime := time.Now() @@ -115,45 +148,43 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() { } trace := d.(*sampling.TraceData) trace.DecisionTime = time.Now() - for i, policy := range tsp.policies { - policyEvaluateStartTime := time.Now() - decision, err := policy.Evaluator.Evaluate(id, trace) - stats.Record( - policy.ctx, - statDecisionLatencyMicroSec.M(int64(time.Since(policyEvaluateStartTime)/time.Microsecond))) - if err != nil { - trace.Decision[i] = sampling.NotSampled - evaluateErrorCount++ - tsp.logger.Error("Sampling policy error", zap.Error(err)) - continue - } + policyEvaluateStartTime := time.Now() + decision, err := tsp.policy.Evaluator.Evaluate(id, trace) + stats.Record( + tsp.policy.ctx, + statDecisionLatencyMicroSec.M(int64(time.Since(policyEvaluateStartTime)/time.Microsecond))) + if err != nil { + trace.Decision = sampling.NotSampled + evaluateErrorCount++ + tsp.logger.Error("Sampling policy error", zap.Error(err)) + continue + } - trace.Decision[i] = decision + trace.Decision = decision - switch decision { - case sampling.Sampled: - stats.RecordWithTags( - policy.ctx, - []tag.Mutator{tag.Insert(tagSampledKey, "true")}, - statCountTracesSampled.M(int64(1)), - ) - decisionSampled++ + switch decision { + case sampling.Sampled: + stats.RecordWithTags( + tsp.policy.ctx, + []tag.Mutator{tag.Insert(tagSampledKey, "true")}, + statCountTracesSampled.M(int64(1)), + ) + decisionSampled++ - trace.Lock() - traceBatches := trace.ReceivedBatches - trace.Unlock() + trace.Lock() + traceBatches := trace.ReceivedBatches + trace.Unlock() - for j := 0; j < len(traceBatches); j++ { - policy.Destination.ConsumeTraceData(policy.ctx, traceBatches[j]) - } - case sampling.NotSampled: - stats.RecordWithTags( - policy.ctx, - []tag.Mutator{tag.Insert(tagSampledKey, "false")}, - statCountTracesSampled.M(int64(1)), - ) - decisionNotSampled++ + for j := 0; j < len(traceBatches); j++ { + tsp.nextConsumer.ConsumeTraceData(tsp.policy.ctx, traceBatches[j]) } + case sampling.NotSampled: + stats.RecordWithTags( + tsp.policy.ctx, + []tag.Mutator{tag.Insert(tagSampledKey, "false")}, + statCountTracesSampled.M(int64(1)), + ) + decisionNotSampled++ } // Sampled or not, remove the batches @@ -199,13 +230,8 @@ func (tsp *tailSamplingSpanProcessor) ConsumeTraceData(ctx context.Context, td c singleTrace := len(idToSpans) == 1 for id, spans := range idToSpans { lenSpans := int64(len(spans)) - lenPolicies := len(tsp.policies) - initialDecisions := make([]sampling.Decision, lenPolicies, lenPolicies) - for i := 0; i < lenPolicies; i++ { - initialDecisions[i] = sampling.Pending - } initialTraceData := &sampling.TraceData{ - Decision: initialDecisions, + Decision: sampling.Pending, ArrivalTime: time.Now(), SpanCount: lenSpans, } @@ -231,42 +257,40 @@ func (tsp *tailSamplingSpanProcessor) ConsumeTraceData(ctx context.Context, td c } } - for i, policyAndDests := range tsp.policies { - actualData.Lock() - actualDecision := actualData.Decision[i] - // If decision is pending, we want to add the new spans still under the lock, so the decision doesn't happen - // in between the transition from pending. - if actualDecision == sampling.Pending { - // Add the spans to the trace, but only once for all policies, otherwise same spans will - // be duplicated in the final trace. - traceTd := prepareTraceBatch(spans, singleTrace, td) - actualData.ReceivedBatches = append(actualData.ReceivedBatches, traceTd) - actualData.Unlock() - break - } + actualData.Lock() + actualDecision := actualData.Decision + // If decision is pending, we want to add the new spans still under the lock, so the decision doesn't happen + // in between the transition from pending. + if actualDecision == sampling.Pending { + // Add the spans to the trace, but only once for all policy, otherwise same spans will + // be duplicated in the final trace. + traceTd := prepareTraceBatch(spans, singleTrace, td) + actualData.ReceivedBatches = append(actualData.ReceivedBatches, traceTd) actualData.Unlock() - - switch actualDecision { - case sampling.Pending: - // All process for pending done above, keep the case so it doesn't go to default. - case sampling.Sampled: - // Forward the spans to the policy destinations - traceTd := prepareTraceBatch(spans, singleTrace, td) - if err := policyAndDests.Destination.ConsumeTraceData(policyAndDests.ctx, traceTd); err != nil { - tsp.logger.Warn("Error sending late arrived spans to destination", - zap.String("policy", policyAndDests.Name), - zap.Error(err)) - } - fallthrough // so OnLateArrivingSpans is also called for decision Sampled. - case sampling.NotSampled: - policyAndDests.Evaluator.OnLateArrivingSpans(actualDecision, spans) - stats.Record(tsp.ctx, statLateSpanArrivalAfterDecision.M(int64(time.Since(actualData.DecisionTime)/time.Second))) - - default: - tsp.logger.Warn("Encountered unexpected sampling decision", - zap.String("policy", policyAndDests.Name), - zap.Int("decision", int(actualDecision))) + break + } + actualData.Unlock() + + switch actualDecision { + case sampling.Pending: + // All process for pending done above, keep the case so it doesn't go to default. + case sampling.Sampled: + // Forward the spans to the policy destinations + traceTd := prepareTraceBatch(spans, singleTrace, td) + if err := tsp.nextConsumer.ConsumeTraceData(tsp.policy.ctx, traceTd); err != nil { + tsp.logger.Warn("Error sending late arrived spans to destination", + zap.String("policy", tsp.policy.Name), + zap.Error(err)) } + fallthrough // so OnLateArrivingSpans is also called for decision Sampled. + case sampling.NotSampled: + tsp.policy.Evaluator.OnLateArrivingSpans(actualDecision, spans) + stats.Record(tsp.ctx, statLateSpanArrivalAfterDecision.M(int64(time.Since(actualData.DecisionTime)/time.Second))) + + default: + tsp.logger.Warn("Encountered unexpected sampling decision", + zap.String("policy", tsp.policy.Name), + zap.Int("decision", int(actualDecision))) } } @@ -286,17 +310,14 @@ func (tsp *tailSamplingSpanProcessor) dropTrace(traceID traceKey, deletionTime t tsp.logger.Error("Attempt to delete traceID not on table") return } - policiesLen := len(tsp.policies) stats.Record(tsp.ctx, statTraceRemovalAgeSec.M(int64(deletionTime.Sub(trace.ArrivalTime)/time.Second))) - for j := 0; j < policiesLen; j++ { - if trace.Decision[j] == sampling.Pending { - policy := tsp.policies[j] - if decision, err := policy.Evaluator.OnDroppedSpans([]byte(traceID), trace); err != nil { - tsp.logger.Warn("OnDroppedSpans", - zap.String("policy", policy.Name), - zap.Int("decision", int(decision)), - zap.Error(err)) - } + if trace.Decision == sampling.Pending { + policy := tsp.policy + if decision, err := policy.Evaluator.OnDroppedSpans([]byte(traceID), trace); err != nil { + tsp.logger.Warn("OnDroppedSpans", + zap.String("policy", policy.Name), + zap.Int("decision", int(decision)), + zap.Error(err)) } } } diff --git a/processor/tailsampling/processor_test.go b/processor/tailsampling/processor_test.go index 4e0bad2495a..9d869a4d569 100644 --- a/processor/tailsampling/processor_test.go +++ b/processor/tailsampling/processor_test.go @@ -23,11 +23,11 @@ import ( tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" "go.uber.org/zap" - "github.com/open-telemetry/opentelemetry-service/consumer" "github.com/open-telemetry/opentelemetry-service/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-service/exporter/exportertest" "github.com/open-telemetry/opentelemetry-service/internal/collector/processor/idbatcher" "github.com/open-telemetry/opentelemetry-service/internal/collector/sampling" + "github.com/open-telemetry/opentelemetry-service/processor" tracetranslator "github.com/open-telemetry/opentelemetry-service/translator/trace" ) @@ -35,12 +35,15 @@ const ( defaultTestDecisionWait = 30 * time.Second ) +var testPolicy = PolicyCfg{Name: "test-policy", Type: AlwaysSample} + func TestSequentialTraceArrival(t *testing.T) { traceIds, batches := generateIdsAndBatches(128) cfg := Config{ DecisionWait: defaultTestDecisionWait, NumTraces: uint64(2 * len(traceIds)), ExpectedNewTracesPerSec: 64, + PolicyCfg: testPolicy, } sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg) tsp := sp.(*tailSamplingSpanProcessor) @@ -67,6 +70,7 @@ func TestConcurrentTraceArrival(t *testing.T) { DecisionWait: defaultTestDecisionWait, NumTraces: uint64(2 * len(traceIds)), ExpectedNewTracesPerSec: 64, + PolicyCfg: testPolicy, } sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg) tsp := sp.(*tailSamplingSpanProcessor) @@ -105,6 +109,7 @@ func TestSequentialTraceMapSize(t *testing.T) { DecisionWait: defaultTestDecisionWait, NumTraces: uint64(maxSize), ExpectedNewTracesPerSec: 64, + PolicyCfg: testPolicy, } sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg) tsp := sp.(*tailSamplingSpanProcessor) @@ -128,6 +133,7 @@ func TestConcurrentTraceMapSize(t *testing.T) { DecisionWait: defaultTestDecisionWait, NumTraces: uint64(maxSize), ExpectedNewTracesPerSec: 64, + PolicyCfg: testPolicy, } sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg) tsp := sp.(*tailSamplingSpanProcessor) @@ -154,23 +160,23 @@ func TestConcurrentTraceMapSize(t *testing.T) { } func TestSamplingPolicyTypicalPath(t *testing.T) { - t.Skip("TODO(#146): add policies to TailBasedCfg and fix this test") const maxSize = 100 const decisionWaitSeconds = 5 + // For this test explicitly control the timer calls and batcher, and set a mock + // sampling policy evaluator. msp := &mockSpanProcessor{} mpe := &mockPolicyEvaluator{} - cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: uint64(maxSize), - ExpectedNewTracesPerSec: 64, - } - sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg) - tsp := sp.(*tailSamplingSpanProcessor) - - // For this test explicitly control the timer calls and batcher. mtt := &manualTTicker{} - tsp.policyTicker = mtt - tsp.decisionBatcher = newSyncIDBatcher(decisionWaitSeconds) + tsp := &tailSamplingSpanProcessor{ + ctx: context.Background(), + nextConsumer: msp, + maxNumTraces: maxSize, + logger: zap.NewNop(), + decisionBatcher: newSyncIDBatcher(decisionWaitSeconds), + policy: &Policy{Name: "mock-policy", Evaluator: mpe, ctx: context.TODO()}, + deleteChan: make(chan traceKey, maxSize), + policyTicker: mtt, + } _, batches := generateIdsAndBatches(210) currItem := 0 @@ -240,16 +246,6 @@ func generateIdsAndBatches(numIds int) ([][]byte, []consumerdata.TraceData) { return traceIds, tds } -func newTestPolicy() []*Policy { - return []*Policy{ - { - Name: "test", - Evaluator: sampling.NewAlwaysSample(), - Destination: &mockSpanProcessor{}, - }, - } -} - type mockPolicyEvaluator struct { NextDecision sampling.Decision NextError error @@ -324,7 +320,7 @@ type mockSpanProcessor struct { TotalSpans int } -var _ consumer.TraceConsumer = &mockSpanProcessor{} +var _ processor.TraceProcessor = &mockSpanProcessor{} func (p *mockSpanProcessor) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error { batchSize := len(td.Spans) diff --git a/processor/tailsampling/testdata/tail_sampling_config.yaml b/processor/tailsampling/testdata/tail_sampling_config.yaml index 52d2fedc42d..e861e395df8 100644 --- a/processor/tailsampling/testdata/tail_sampling_config.yaml +++ b/processor/tailsampling/testdata/tail_sampling_config.yaml @@ -6,12 +6,45 @@ exporters: processors: tail-sampling: - decision-wait: 31s - num-traces: 20001 - expected-new-traces-per-sec: 100 + decision-wait: 10s + num-traces: 100 + expected-new-traces-per-sec: 10 + policy: + name: test-policy-1 + type: always-sample + tail-sampling/2: + decision-wait: 20s + num-traces: 200 + expected-new-traces-per-sec: 20 + policy: + name: test-policy-2 + type: numeric-attribute-filter + numeric-attribute-filter: + key: key1 + min-value: 50 + max-value: 100 + tail-sampling/3: + decision-wait: 30s + num-traces: 300 + expected-new-traces-per-sec: 30 + policy: + name: test-policy-3 + type: string-attribute-filter + string-attribute-filter: + key: key2 + values: [value1, value2] + tail-sampling/4: + decision-wait: 40s + num-traces: 400 + expected-new-traces-per-sec: 40 + policy: + name: test-policy-4 + type: rate-limiting-filter + rate-limiting-filter: + spans-per-second: 35 pipelines: traces: receivers: [examplereceiver] - processors: [tail-sampling] + processors: [tail-sampling, tail-sampling/2, tail-sampling/3, tail-sampling/4] exporters: [exampleexporter]