diff --git a/processor/cascadingfilterprocessor/README.md b/processor/cascadingfilterprocessor/README.md index 78d7ee449ecb..aad51a825f0b 100644 --- a/processor/cascadingfilterprocessor/README.md +++ b/processor/cascadingfilterprocessor/README.md @@ -47,6 +47,10 @@ of the provided values (either at resource of span level) (use `s` or `ms` as the suffix to indicate unit) - `properties: { name_pattern: `}: selects the span if its operation name matches the provided regular expression +To invert the decision (which is still a subject to rate limiting), additional property can be configured: +- `invert_match: ` (default=`false`): when set to `true`, the opposite decision is selected for the trace. E.g. +if trace matches a given string attribute and `invert_match=true`, then the trace is not selected + ## Limiting the number of spans There are two `spans_per_second` settings. The global one and the policy-one. @@ -79,39 +83,40 @@ processors: probabilistic_filtering_ratio: 0.1 policies: [ - { - name: test-policy-1, - }, - { - name: test-policy-2, - numeric_attribute: {key: key1, min_value: 50, max_value: 100} - }, - { - name: test-policy-3, - string_attribute: {key: key2, values: [value1, value2]} - }, - { - name: test-policy-4, - spans_per_second: 35, - }, - { - name: test-policy-5, - spans_per_second: 123, - numeric_attribute: {key: key1, min_value: 50, max_value: 100} - }, - { - name: test-policy-6, - spans_per_second: 50, - properties: {min_duration: 9s } - }, - { - name: test-policy-7, - properties: { - name_pattern: "foo.*", - min_number_of_spans: 10, - min_duration: 9s - } - }, + { + name: test-policy-1, + }, + { + name: test-policy-2, + numeric_attribute: { key: key1, min_value: 50, max_value: 100 } + }, + { + name: test-policy-3, + string_attribute: { key: key2, values: [ value1, value2 ] } + }, + { + name: test-policy-4, + spans_per_second: 35, + }, + { + name: test-policy-5, + spans_per_second: 123, + numeric_attribute: { key: key1, min_value: 50, max_value: 100 }, + invert_match: true + }, + { + name: test-policy-6, + spans_per_second: 50, + properties: { min_duration: 9s } + }, + { + name: test-policy-7, + properties: { + name_pattern: "foo.*", + min_number_of_spans: 10, + min_duration: 9s + } + }, { name: everything_else, spans_per_second: -1 diff --git a/processor/cascadingfilterprocessor/config/config.go b/processor/cascadingfilterprocessor/config/config.go index c3312ac83791..e3d21713c20a 100644 --- a/processor/cascadingfilterprocessor/config/config.go +++ b/processor/cascadingfilterprocessor/config/config.go @@ -32,6 +32,8 @@ type PolicyCfg struct { PropertiesCfg PropertiesCfg `mapstructure:"properties"` // SpansPerSecond specifies the rule budget that should never be exceeded for it SpansPerSecond int64 `mapstructure:"spans_per_second"` + // InvertMatch specifies if the match should be inverted. Default: false + InvertMatch bool `mapstructure:"invert_match"` } // PropertiesCfg holds the configurable settings to create a duration filter diff --git a/processor/cascadingfilterprocessor/config_test.go b/processor/cascadingfilterprocessor/config_test.go index 49d62f0930ab..517a3594d351 100644 --- a/processor/cascadingfilterprocessor/config_test.go +++ b/processor/cascadingfilterprocessor/config_test.go @@ -76,6 +76,7 @@ func TestLoadConfig(t *testing.T) { SpansPerSecond: 123, NumericAttributeCfg: &config.NumericAttributeCfg{ Key: "key1", MinValue: 50, MaxValue: 100}, + InvertMatch: true, }, { Name: "test-policy-6", diff --git a/processor/cascadingfilterprocessor/sampling/policy_factory.go b/processor/cascadingfilterprocessor/sampling/policy_factory.go index 8b2bd2da702c..2d88993ad2a6 100644 --- a/processor/cascadingfilterprocessor/sampling/policy_factory.go +++ b/processor/cascadingfilterprocessor/sampling/policy_factory.go @@ -46,6 +46,8 @@ type policyEvaluator struct { maxSpansPerSecond int64 spansInCurrentSecond int64 + invertMatch bool + logger *zap.Logger } @@ -124,5 +126,6 @@ func NewFilter(logger *zap.Logger, cfg *config.PolicyCfg) (PolicyEvaluator, erro currentSecond: 0, spansInCurrentSecond: 0, maxSpansPerSecond: cfg.SpansPerSecond, + invertMatch: cfg.InvertMatch, }, nil } diff --git a/processor/cascadingfilterprocessor/sampling/policy_filter.go b/processor/cascadingfilterprocessor/sampling/policy_filter.go index 036ca816c996..b1361a2dd1ee 100644 --- a/processor/cascadingfilterprocessor/sampling/policy_filter.go +++ b/processor/cascadingfilterprocessor/sampling/policy_filter.go @@ -156,9 +156,15 @@ func (pe *policyEvaluator) evaluateRules(_ pdata.TraceID, trace *TraceData) (Dec conditionMet.operationName && conditionMet.numericAttr && conditionMet.stringAttr { + if pe.invertMatch { + return NotSampled, nil + } return Sampled, nil } + if pe.invertMatch { + return Sampled, nil + } return NotSampled, nil } diff --git a/processor/cascadingfilterprocessor/sampling/span_properties_filter_test.go b/processor/cascadingfilterprocessor/sampling/span_properties_filter_test.go index 0335f6c05751..9668fe4933ad 100644 --- a/processor/cascadingfilterprocessor/sampling/span_properties_filter_test.go +++ b/processor/cascadingfilterprocessor/sampling/span_properties_filter_test.go @@ -46,6 +46,13 @@ func newSpanPropertiesFilter(operationNamePattern *string, minDuration *time.Dur }, nil } +func evaluate(t *testing.T, evaluator policyEvaluator, traces *TraceData, expectedDecision Decision) { + u, _ := uuid.NewRandom() + decision, err := evaluator.Evaluate(pdata.NewTraceID(u), traces) + assert.NoError(t, err) + assert.Equal(t, expectedDecision, decision) +} + func TestPartialSpanPropertiesFilter(t *testing.T) { opFilter, _ := newSpanPropertiesFilter(&operationNamePattern, nil, nil) durationFilter, _ := newSpanPropertiesFilter(nil, &minDuration, nil) @@ -74,15 +81,13 @@ func TestPartialSpanPropertiesFilter(t *testing.T) { for _, c := range cases { t.Run(c.Desc, func(t *testing.T) { - u, _ := uuid.NewRandom() - decision, err := c.Evaluator.Evaluate(pdata.NewTraceID(u), matchingTraces) - assert.NoError(t, err) - assert.Equal(t, decision, Sampled) - - u, _ = uuid.NewRandom() - decision, err = c.Evaluator.Evaluate(pdata.NewTraceID(u), nonMatchingTraces) - assert.NoError(t, err) - assert.Equal(t, decision, NotSampled) + c.Evaluator.invertMatch = false + evaluate(t, c.Evaluator, matchingTraces, Sampled) + evaluate(t, c.Evaluator, nonMatchingTraces, NotSampled) + + c.Evaluator.invertMatch = true + evaluate(t, c.Evaluator, matchingTraces, NotSampled) + evaluate(t, c.Evaluator, nonMatchingTraces, Sampled) }) } } @@ -117,11 +122,17 @@ func TestSpanPropertiesFilter(t *testing.T) { for _, c := range cases { t.Run(c.Desc, func(t *testing.T) { + // Regular match filter, _ := newSpanPropertiesFilter(&operationNamePattern, &minDuration, &minNumberOfSpans) - u, _ := uuid.NewRandom() - decision, err := filter.Evaluate(pdata.NewTraceID(u), c.Trace) - assert.NoError(t, err) - assert.Equal(t, decision, c.Decision) + evaluate(t, filter, c.Trace, c.Decision) + + // Invert match + filter.invertMatch = true + invertDecision := Sampled + if c.Decision == Sampled { + invertDecision = NotSampled + } + evaluate(t, filter, c.Trace, invertDecision) }) } } diff --git a/processor/cascadingfilterprocessor/testdata/cascading_filter_config.yaml b/processor/cascadingfilterprocessor/testdata/cascading_filter_config.yaml index e8303c3e8d54..5c0f897adc1b 100644 --- a/processor/cascadingfilterprocessor/testdata/cascading_filter_config.yaml +++ b/processor/cascadingfilterprocessor/testdata/cascading_filter_config.yaml @@ -31,7 +31,8 @@ processors: { name: test-policy-5, spans_per_second: 123, - numeric_attribute: {key: key1, min_value: 50, max_value: 100} + numeric_attribute: {key: key1, min_value: 50, max_value: 100}, + invert_match: true }, { name: test-policy-6,