Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sampling: introduce event processor #4241

Merged
merged 3 commits into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions x-pack/apm-server/sampling/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package sampling

import (
"time"

"github.com/pkg/errors"

"github.com/elastic/apm-server/publish"
"github.com/elastic/go-elasticsearch/v7"
)

// Config holds configuration for Processor.
type Config struct {
// BeatID holds the unique ID of this apm-server.
BeatID string

// Reporter holds the publish.Reporter, for publishing tail-sampled trace events.
Reporter publish.Reporter

LocalSamplingConfig
RemoteSamplingConfig
StorageConfig
}

// LocalSamplingConfig holds Processor configuration related to local reservoir sampling.
type LocalSamplingConfig struct {
// Interval holds the local sampling interval.
//
// This controls how long it takes for servers to become aware of each other's
// sampled trace IDs, and so should be in the order of tens of seconds, or low
// minutes. In order not to lose sampled trace events, FlushInterval should be
axw marked this conversation as resolved.
Show resolved Hide resolved
// no greater than half of the TTL.
Interval time.Duration

// MaxTraceGroups holds the maximum number of trace groups to track.
//
// Once MaxTraceGroups is reached, any root transaction forming a new trace
// group will dropped.
MaxTraceGroups int

// DefaultSampleRate is the default sample rate to assign to new trace groups.
DefaultSampleRate float64

// IngestRateDecayFactor holds the ingest rate decay factor, used for calculating
// the exponentially weighted moving average (EWMA) ingest rate for each trace
// group.
IngestRateDecayFactor float64
}

// RemoteSamplingConfig holds Processor configuration related to publishing and
// subscribing to remote sampling decisions.
type RemoteSamplingConfig struct {
// Elasticsearch holds the Elasticsearch client to use for publishing
// and subscribing to remote sampling decisions.
Elasticsearch *elasticsearch.Client

// SampledTracesIndex holds the name of the Elasticsearch index for
// storing and searching sampled trace IDs.
SampledTracesIndex string
}

// StorageConfig holds Processor configuration related to event storage.
type StorageConfig struct {
// StorageDir holds the directory in which event storage will be maintained.
StorageDir string

// StorageGCInterval holds the amount of time between storage garbage collections.
StorageGCInterval time.Duration

// TTL holds the amount of time before events and sampling decisions
// are expired from local storage.
TTL time.Duration
}

// Validate validates the configuration.
func (config Config) Validate() error {
if config.BeatID == "" {
return errors.New("BeatID unspecified")
}
if config.Reporter == nil {
return errors.New("Reporter unspecified")
}
if err := config.LocalSamplingConfig.validate(); err != nil {
return errors.Wrap(err, "invalid local sampling config")
}
if err := config.RemoteSamplingConfig.validate(); err != nil {
return errors.Wrap(err, "invalid remote sampling config")
}
if err := config.StorageConfig.validate(); err != nil {
return errors.Wrap(err, "invalid storage config")
}
return nil
}

func (config LocalSamplingConfig) validate() error {
if config.Interval <= 0 {
return errors.New("Interval unspecified or negative")
}
if config.MaxTraceGroups <= 0 {
return errors.New("MaxTraceGroups unspecified or negative")
}
if config.DefaultSampleRate < 0 || config.DefaultSampleRate >= 1 {
// TODO(axw) allow sampling rate of 1.0 (100%), which would
// cause the root transaction to be indexed, and a sampling
// decision to be written to local storage, immediately.
return errors.New("DefaultSampleRate unspecified or out of range [0,1)")
}
if config.IngestRateDecayFactor <= 0 || config.IngestRateDecayFactor > 1 {
return errors.New("IngestRateDecayFactor unspecified or out of range (0,1]")
}
return nil
}

func (config RemoteSamplingConfig) validate() error {
if config.Elasticsearch == nil {
return errors.New("Elasticsearch unspecified")
}
if config.SampledTracesIndex == "" {
return errors.New("SampledTracesIndex unspecified")
}
return nil
}

func (config StorageConfig) validate() error {
if config.StorageDir == "" {
return errors.New("StorageDir unspecified")
}
if config.StorageGCInterval <= 0 {
return errors.New("StorageGCInterval unspecified or negative")
}
if config.TTL <= 0 {
return errors.New("TTL unspecified or negative")
}
return nil
}
66 changes: 66 additions & 0 deletions x-pack/apm-server/sampling/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package sampling_test

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/x-pack/apm-server/sampling"
"github.com/elastic/go-elasticsearch/v7"
)

func TestNewProcessorConfigInvalid(t *testing.T) {
var config sampling.Config
assertInvalidConfigError := func(expectedError string) {
t.Helper()
agg, err := sampling.NewProcessor(config)
require.Error(t, err)
require.Nil(t, agg)
assert.EqualError(t, err, "invalid tail-sampling config: "+expectedError)
}
assertInvalidConfigError("BeatID unspecified")
config.BeatID = "beat"

assertInvalidConfigError("Reporter unspecified")
config.Reporter = func(ctx context.Context, req publish.PendingReq) error { return nil }

assertInvalidConfigError("invalid local sampling config: Interval unspecified or negative")
config.Interval = 1

assertInvalidConfigError("invalid local sampling config: MaxTraceGroups unspecified or negative")
config.MaxTraceGroups = 1

for _, invalid := range []float64{-1, 1.0, 2.0} {
config.DefaultSampleRate = invalid
assertInvalidConfigError("invalid local sampling config: DefaultSampleRate unspecified or out of range [0,1)")
}
config.DefaultSampleRate = 0.5

for _, invalid := range []float64{-1, 0, 2.0} {
config.IngestRateDecayFactor = invalid
assertInvalidConfigError("invalid local sampling config: IngestRateDecayFactor unspecified or out of range (0,1]")
}
config.IngestRateDecayFactor = 0.5

assertInvalidConfigError("invalid remote sampling config: Elasticsearch unspecified")
config.Elasticsearch = &elasticsearch.Client{}

assertInvalidConfigError("invalid remote sampling config: SampledTracesIndex unspecified")
config.SampledTracesIndex = "sampled-traces"

assertInvalidConfigError("invalid storage config: StorageDir unspecified")
config.StorageDir = "tbs"

assertInvalidConfigError("invalid storage config: StorageGCInterval unspecified or negative")
config.StorageGCInterval = 1

assertInvalidConfigError("invalid storage config: TTL unspecified or negative")
config.TTL = 1
}
10 changes: 9 additions & 1 deletion x-pack/apm-server/sampling/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type traceGroups struct {
ingestRateDecayFactor float64

// maxGroups holds the maximum number of trace groups to maintain.
// Once this is reached, all trace events will be sampled.
// Once this is reached, no new trace group events will be sampled.
maxGroups int

mu sync.RWMutex
Expand Down Expand Up @@ -101,6 +101,9 @@ func (g *traceGroups) sampleTrace(tx *model.Transaction) (bool, error) {
group, ok := g.groups[key]
if ok {
defer g.mu.RUnlock()
if group.samplingFraction == 0 {
return false, nil
}
group.mu.Lock()
defer group.mu.Unlock()
group.total++
Expand All @@ -113,10 +116,15 @@ func (g *traceGroups) sampleTrace(tx *model.Transaction) (bool, error) {
group, ok = g.groups[key]
if ok {
// We've got a write lock on g.mu, no need to lock group too.
if group.samplingFraction == 0 {
return false, nil
}
group.total++
return group.reservoir.Sample(tx.Duration, tx.TraceID), nil
} else if len(g.groups) == g.maxGroups {
return false, errTooManyTraceGroups
} else if g.defaultSamplingFraction == 0 {
return false, nil
}

group = &traceGroup{
Expand Down
Loading