diff --git a/beater/config/config_test.go b/beater/config/config_test.go index a556163c3da..cc4bdd7bc1f 100644 --- a/beater/config/config_test.go +++ b/beater/config/config_test.go @@ -283,6 +283,7 @@ func TestUnpackConfig(t *testing.T) { "sampling.keep_unsampled": false, "sampling.tail": map[string]interface{}{ "enabled": true, + "policies": []map[string]interface{}{{"sample_rate": 0.5}}, "interval": "2m", "ingest_rate_decay": 1.0, }, @@ -371,6 +372,7 @@ func TestUnpackConfig(t *testing.T) { KeepUnsampled: false, Tail: &TailSamplingConfig{ Enabled: true, + Policies: []TailSamplingPolicy{{SampleRate: 0.5}}, ESConfig: elasticsearch.DefaultConfig(), Interval: 2 * time.Minute, IngestRateDecayFactor: 1.0, @@ -564,7 +566,7 @@ func TestAgentConfig(t *testing.T) { } func TestNewConfig_ESConfig(t *testing.T) { - ucfg, err := common.NewConfigFrom(`{"rum.enabled":true,"api_key.enabled":true,"sampling.tail.enabled":true}`) + ucfg, err := common.NewConfigFrom(`{"rum.enabled":true,"api_key.enabled":true,"sampling.tail.policies":[{"sample_rate": 0.5}]}`) require.NoError(t, err) // no es config given diff --git a/beater/config/sampling.go b/beater/config/sampling.go index 2a2d560206f..2ff93a682b4 100644 --- a/beater/config/sampling.go +++ b/beater/config/sampling.go @@ -41,7 +41,12 @@ type SamplingConfig struct { type TailSamplingConfig struct { Enabled bool `config:"enabled"` - Policies []TailSamplingPolicy `config:"policies"` + // Policies holds tail-sampling policies. + // + // Policies must include at least one policy that matches all traces, to ensure + // that dropping non-matching traces is intentional. + Policies []TailSamplingPolicy `config:"policies"` + ESConfig *elasticsearch.Config `config:"elasticsearch"` Interval time.Duration `config:"interval" validate:"min=1s"` IngestRateDecayFactor float64 `config:"ingest_rate_decay" validate:"min=0, max=1"` @@ -76,8 +81,30 @@ func (c *TailSamplingConfig) Unpack(in *common.Config) error { if err := in.Unpack(&cfg); err != nil { return errors.Wrap(err, "error unpacking tail sampling config") } + cfg.Enabled = in.Enabled() *c = TailSamplingConfig(cfg) c.esConfigured = in.HasField("elasticsearch") + return errors.Wrap(c.Validate(), "invalid tail sampling config") +} + +func (c *TailSamplingConfig) Validate() error { + if !c.Enabled { + return nil + } + if len(c.Policies) == 0 { + return errors.New("no policies specified") + } + var anyDefaultPolicy bool + for _, policy := range c.Policies { + if policy == (TailSamplingPolicy{SampleRate: policy.SampleRate}) { + // We have at least one default policy. + anyDefaultPolicy = true + break + } + } + if !anyDefaultPolicy { + return errors.New("no default (empty criteria) policy specified") + } return nil } diff --git a/beater/config/sampling_test.go b/beater/config/sampling_test.go new file mode 100644 index 00000000000..e2de09e66f1 --- /dev/null +++ b/beater/config/sampling_test.go @@ -0,0 +1,52 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package config + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestSamplingPoliciesValidation(t *testing.T) { + t.Run("MinimallyValid", func(t *testing.T) { + _, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{ + "sampling.tail.policies": []map[string]interface{}{{ + "sample_rate": 0.5, + }}, + }), nil) + assert.NoError(t, err) + }) + t.Run("NoPolicies", func(t *testing.T) { + _, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{ + "sampling.tail.enabled": true, + }), nil) + assert.EqualError(t, err, "Error processing configuration: invalid tail sampling config: no policies specified accessing 'sampling.tail'") + }) + t.Run("NoDefaultPolicies", func(t *testing.T) { + _, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{ + "sampling.tail.policies": []map[string]interface{}{{ + "service.name": "foo", + "sample_rate": 0.5, + }}, + }), nil) + assert.EqualError(t, err, "Error processing configuration: invalid tail sampling config: no default (empty criteria) policy specified accessing 'sampling.tail'") + }) +}