From bd87ad9891d95587d3f65b4482323615aacdb56a Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Mon, 15 Feb 2021 11:43:08 +0800 Subject: [PATCH 1/3] sampling: require a default policy --- changelogs/head.asciidoc | 1 + x-pack/apm-server/sampling/config.go | 10 ++++++++++ x-pack/apm-server/sampling/config_test.go | 6 +++++- x-pack/apm-server/sampling/processor_test.go | 3 +++ 4 files changed, 19 insertions(+), 1 deletion(-) diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index c3363a94aa2..cdbefa09104 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -27,6 +27,7 @@ https://github.com/elastic/apm-server/compare/7.11\...master[View commits] * OpenTelemetry Protocol (OTLP) over gRPC is now supported on the standard endpoint (8200) {pull}4677[4677] * Add initial support for APM central config and sourcemaps when running under Fleet {pull}4670[4670] * Data stream and ILM policy for tail-based sampling {pull}4707[4707] +* When tail-sampling is enabled, a default policy must be defined {pull}4729[4729] [float] ==== Deprecated diff --git a/x-pack/apm-server/sampling/config.go b/x-pack/apm-server/sampling/config.go index ce713d93ee0..9ba7794f2f9 100644 --- a/x-pack/apm-server/sampling/config.go +++ b/x-pack/apm-server/sampling/config.go @@ -46,6 +46,9 @@ type LocalSamplingConfig struct { // Policies holds local tail-sampling policies. Policies are matched in the // order provided. Policies should therefore be ordered from most to least // specific. + // + // Policies must include at least one policy that matches all traces, to ensure + // that dropping non-matching traces is intentional. Policies []Policy // IngestRateDecayFactor holds the ingest rate decay factor, used for calculating @@ -173,10 +176,17 @@ func (config LocalSamplingConfig) validate() error { if len(config.Policies) == 0 { return errors.New("Policies unspecified") } + var anyDefaultPolicy bool for i, policy := range config.Policies { if err := policy.validate(); err != nil { return errors.Wrapf(err, "Policy %d invalid", i) } + if policy.PolicyCriteria == (PolicyCriteria{}) { + anyDefaultPolicy = true + } + } + if !anyDefaultPolicy { + return errors.New("Policies does not contain a default (empty criteria) policy") } if config.IngestRateDecayFactor <= 0 || config.IngestRateDecayFactor > 1 { return errors.New("IngestRateDecayFactor unspecified or out of range (0,1]") diff --git a/x-pack/apm-server/sampling/config_test.go b/x-pack/apm-server/sampling/config_test.go index 7519dbd06b2..9db7225416f 100644 --- a/x-pack/apm-server/sampling/config_test.go +++ b/x-pack/apm-server/sampling/config_test.go @@ -38,7 +38,11 @@ func TestNewProcessorConfigInvalid(t *testing.T) { config.MaxDynamicServices = 1 assertInvalidConfigError("invalid local sampling config: Policies unspecified") - config.Policies = []sampling.Policy{{}} + config.Policies = []sampling.Policy{{ + PolicyCriteria: sampling.PolicyCriteria{ServiceName: "foo"}, + }} + assertInvalidConfigError("invalid local sampling config: Policies does not contain a default (empty criteria) policy") + config.Policies[0].PolicyCriteria = sampling.PolicyCriteria{} for _, invalid := range []float64{-1, 1.0, 2.0} { config.Policies[0].SampleRate = invalid assertInvalidConfigError("invalid local sampling config: Policy 0 invalid: SampleRate unspecified or out of range [0,1)") diff --git a/x-pack/apm-server/sampling/processor_test.go b/x-pack/apm-server/sampling/processor_test.go index ba412ba8ad3..71422558c05 100644 --- a/x-pack/apm-server/sampling/processor_test.go +++ b/x-pack/apm-server/sampling/processor_test.go @@ -292,6 +292,9 @@ func TestProcessLocalTailSamplingPolicyOrder(t *testing.T) { }, { PolicyCriteria: sampling.PolicyCriteria{ServiceName: "service_name"}, SampleRate: 0.1, + }, { + PolicyCriteria: sampling.PolicyCriteria{}, + SampleRate: 0, }} config.FlushInterval = 10 * time.Millisecond published := make(chan string) From 532042975faa586290b3983f2adda513ba594dc5 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Mon, 15 Feb 2021 13:54:48 +0800 Subject: [PATCH 2/3] beater/config: validate policies here too --- beater/config/config_test.go | 4 ++- beater/config/sampling.go | 29 ++++++++++++++++++- beater/config/sampling_test.go | 52 ++++++++++++++++++++++++++++++++++ 3 files changed, 83 insertions(+), 2 deletions(-) create mode 100644 beater/config/sampling_test.go diff --git a/beater/config/config_test.go b/beater/config/config_test.go index a556163c3da..cc4bdd7bc1f 100644 --- a/beater/config/config_test.go +++ b/beater/config/config_test.go @@ -283,6 +283,7 @@ func TestUnpackConfig(t *testing.T) { "sampling.keep_unsampled": false, "sampling.tail": map[string]interface{}{ "enabled": true, + "policies": []map[string]interface{}{{"sample_rate": 0.5}}, "interval": "2m", "ingest_rate_decay": 1.0, }, @@ -371,6 +372,7 @@ func TestUnpackConfig(t *testing.T) { KeepUnsampled: false, Tail: &TailSamplingConfig{ Enabled: true, + Policies: []TailSamplingPolicy{{SampleRate: 0.5}}, ESConfig: elasticsearch.DefaultConfig(), Interval: 2 * time.Minute, IngestRateDecayFactor: 1.0, @@ -564,7 +566,7 @@ func TestAgentConfig(t *testing.T) { } func TestNewConfig_ESConfig(t *testing.T) { - ucfg, err := common.NewConfigFrom(`{"rum.enabled":true,"api_key.enabled":true,"sampling.tail.enabled":true}`) + ucfg, err := common.NewConfigFrom(`{"rum.enabled":true,"api_key.enabled":true,"sampling.tail.policies":[{"sample_rate": 0.5}]}`) require.NoError(t, err) // no es config given diff --git a/beater/config/sampling.go b/beater/config/sampling.go index 2a2d560206f..2ff93a682b4 100644 --- a/beater/config/sampling.go +++ b/beater/config/sampling.go @@ -41,7 +41,12 @@ type SamplingConfig struct { type TailSamplingConfig struct { Enabled bool `config:"enabled"` - Policies []TailSamplingPolicy `config:"policies"` + // Policies holds tail-sampling policies. + // + // Policies must include at least one policy that matches all traces, to ensure + // that dropping non-matching traces is intentional. + Policies []TailSamplingPolicy `config:"policies"` + ESConfig *elasticsearch.Config `config:"elasticsearch"` Interval time.Duration `config:"interval" validate:"min=1s"` IngestRateDecayFactor float64 `config:"ingest_rate_decay" validate:"min=0, max=1"` @@ -76,8 +81,30 @@ func (c *TailSamplingConfig) Unpack(in *common.Config) error { if err := in.Unpack(&cfg); err != nil { return errors.Wrap(err, "error unpacking tail sampling config") } + cfg.Enabled = in.Enabled() *c = TailSamplingConfig(cfg) c.esConfigured = in.HasField("elasticsearch") + return errors.Wrap(c.Validate(), "invalid tail sampling config") +} + +func (c *TailSamplingConfig) Validate() error { + if !c.Enabled { + return nil + } + if len(c.Policies) == 0 { + return errors.New("no policies specified") + } + var anyDefaultPolicy bool + for _, policy := range c.Policies { + if policy == (TailSamplingPolicy{SampleRate: policy.SampleRate}) { + // We have at least one default policy. + anyDefaultPolicy = true + break + } + } + if !anyDefaultPolicy { + return errors.New("no default (empty criteria) policy specified") + } return nil } diff --git a/beater/config/sampling_test.go b/beater/config/sampling_test.go new file mode 100644 index 00000000000..e2de09e66f1 --- /dev/null +++ b/beater/config/sampling_test.go @@ -0,0 +1,52 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package config + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestSamplingPoliciesValidation(t *testing.T) { + t.Run("MinimallyValid", func(t *testing.T) { + _, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{ + "sampling.tail.policies": []map[string]interface{}{{ + "sample_rate": 0.5, + }}, + }), nil) + assert.NoError(t, err) + }) + t.Run("NoPolicies", func(t *testing.T) { + _, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{ + "sampling.tail.enabled": true, + }), nil) + assert.EqualError(t, err, "Error processing configuration: invalid tail sampling config: no policies specified accessing 'sampling.tail'") + }) + t.Run("NoDefaultPolicies", func(t *testing.T) { + _, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{ + "sampling.tail.policies": []map[string]interface{}{{ + "service.name": "foo", + "sample_rate": 0.5, + }}, + }), nil) + assert.EqualError(t, err, "Error processing configuration: invalid tail sampling config: no default (empty criteria) policy specified accessing 'sampling.tail'") + }) +} From 73c5677e14879a4d9a4f09b96d4e0ed0b6fddac2 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Mon, 15 Feb 2021 13:00:28 +0800 Subject: [PATCH 3/3] beater: don't hang if runServer returns error If runServer returns an error without Stop being called, signal the "done" channel. --- beater/beater.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/beater/beater.go b/beater/beater.go index 35de9b57036..c9de6beb85c 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -199,14 +199,17 @@ func (bt *beater) start(ctx context.Context, cancelContext context.CancelFunc, b return nil, err } bt.stopServer = func() { - defer close(done) - defer closeTracer() if bt.config.ShutdownTimeout > 0 { time.AfterFunc(bt.config.ShutdownTimeout, cancelContext) } s.Stop() } s.Start() + go func() { + defer close(done) + defer closeTracer() + s.Wait() + }() } return done, nil } @@ -316,11 +319,18 @@ func (s *serverRunner) String() string { return "APMServer" } +// Stop stops the server. func (s *serverRunner) Stop() { s.stopOnce.Do(s.cancelRunServerContext) + s.Wait() +} + +// Wait waits for the server to stop. +func (s *serverRunner) Wait() { s.wg.Wait() } +// Start starts the server. func (s *serverRunner) Start() { s.wg.Add(1) go func() {