Skip to content

Commit

Permalink
sampling: introduce processor
Browse files Browse the repository at this point in the history
  • Loading branch information
axw committed Sep 29, 2020
1 parent 5e05c3b commit 3529d05
Show file tree
Hide file tree
Showing 7 changed files with 1,271 additions and 1 deletion.
139 changes: 139 additions & 0 deletions x-pack/apm-server/sampling/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package sampling

import (
"time"

"github.com/pkg/errors"

"github.com/elastic/apm-server/publish"
"github.com/elastic/go-elasticsearch/v7"
)

// Config holds configuration for Processor.
type Config struct {
// BeatID holds the unique ID of this apm-server.
BeatID string

// Reporter holds the publish.Reporter, for publishing tail-sampled trace events.
Reporter publish.Reporter

LocalSamplingConfig
RemoteSamplingConfig
StorageConfig
}

// LocalSamplingConfig holds Processor configuration related to local reservoir sampling.
type LocalSamplingConfig struct {
// Interval holds the local sampling interval.
//
// This controls how long it takes for servers to become aware of each other's
// sampled trace IDs, and so should be in the order of tens of seconds, or low
// minutes. In order not to lose sampled trace events, FlushInterval should be
// no greater than half of the TTL.
Interval time.Duration

// MaxTraceGroups holds the maximum number of trace groups to track.
//
// Once MaxTraceGroups is reached, any root transaction forming a new trace
// group will dropped.
MaxTraceGroups int

// DefaultSampleRate is the default sample rate to assign to new trace groups.
DefaultSampleRate float64

// IngestRateDecayFactor holds the ingest rate decay factor, used for calculating
// the exponentially weighted moving average (EWMA) ingest rate for each trace
// group.
IngestRateDecayFactor float64
}

// RemoteSamplingConfig holds Processor configuration related to publishing and
// subscribing to remote sampling decisions.
type RemoteSamplingConfig struct {
// Elasticsearch holds the Elasticsearch client to use for publishing
// and subscribing to remote sampling decisions.
Elasticsearch *elasticsearch.Client

// SampledTracesIndex holds the name of the Elasticsearch index for
// storing and searching sampled trace IDs.
SampledTracesIndex string
}

// StorageConfig holds Processor configuration related to event storage.
type StorageConfig struct {
// StorageDir holds the directory in which event storage will be maintained.
StorageDir string

// StorageGCInterval holds the amount of time between storage garbage collections.
StorageGCInterval time.Duration

// TTL holds the amount of time before events and sampling decisions
// are expired from local storage.
TTL time.Duration
}

// Validate validates the configuration.
func (config Config) Validate() error {
if config.BeatID == "" {
return errors.New("BeatID unspecified")
}
if config.Reporter == nil {
return errors.New("Reporter unspecified")
}
if err := config.LocalSamplingConfig.validate(); err != nil {
return errors.Wrap(err, "invalid local sampling config")
}
if err := config.RemoteSamplingConfig.validate(); err != nil {
return errors.Wrap(err, "invalid remote sampling config")
}
if err := config.StorageConfig.validate(); err != nil {
return errors.Wrap(err, "invalid storage config")
}
return nil
}

func (config LocalSamplingConfig) validate() error {
if config.Interval <= 0 {
return errors.New("Interval unspecified or negative")
}
if config.MaxTraceGroups <= 0 {
return errors.New("MaxTraceGroups unspecified or negative")
}
if config.DefaultSampleRate < 0 || config.DefaultSampleRate >= 1 {
// TODO(axw) allow sampling rate of 1.0 (100%), which would
// cause the root transaction to be indexed, and a sampling
// decision to be written to local storage, immediately.
return errors.New("DefaultSampleRate unspecified or out of range [0,1)")
}
if config.IngestRateDecayFactor <= 0 || config.IngestRateDecayFactor > 1 {
return errors.New("IngestRateDecayFactor unspecified or out of range (0,1]")
}
return nil
}

func (config RemoteSamplingConfig) validate() error {
if config.Elasticsearch == nil {
return errors.New("Elasticsearch unspecified")
}
if config.SampledTracesIndex == "" {
return errors.New("SampledTracesIndex unspecified")
}
return nil
}

func (config StorageConfig) validate() error {
if config.StorageDir == "" {
return errors.New("StorageDir unspecified")
}
if config.StorageGCInterval <= 0 {
return errors.New("StorageGCInterval unspecified or negative")
}
if config.TTL <= 0 {
return errors.New("TTL unspecified or negative")
}
return nil
}
66 changes: 66 additions & 0 deletions x-pack/apm-server/sampling/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package sampling_test

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/x-pack/apm-server/sampling"
"github.com/elastic/go-elasticsearch/v7"
)

func TestNewProcessorConfigInvalid(t *testing.T) {
var config sampling.Config
assertInvalidConfigError := func(expectedError string) {
t.Helper()
agg, err := sampling.NewProcessor(config)
require.Error(t, err)
require.Nil(t, agg)
assert.EqualError(t, err, "invalid tail-sampling config: "+expectedError)
}
assertInvalidConfigError("BeatID unspecified")
config.BeatID = "beat"

assertInvalidConfigError("Reporter unspecified")
config.Reporter = func(ctx context.Context, req publish.PendingReq) error { return nil }

assertInvalidConfigError("invalid local sampling config: Interval unspecified or negative")
config.Interval = 1

assertInvalidConfigError("invalid local sampling config: MaxTraceGroups unspecified or negative")
config.MaxTraceGroups = 1

for _, invalid := range []float64{-1, 1.0, 2.0} {
config.DefaultSampleRate = invalid
assertInvalidConfigError("invalid local sampling config: DefaultSampleRate unspecified or out of range [0,1)")
}
config.DefaultSampleRate = 0.5

for _, invalid := range []float64{-1, 0, 2.0} {
config.IngestRateDecayFactor = invalid
assertInvalidConfigError("invalid local sampling config: IngestRateDecayFactor unspecified or out of range (0,1]")
}
config.IngestRateDecayFactor = 0.5

assertInvalidConfigError("invalid remote sampling config: Elasticsearch unspecified")
config.Elasticsearch = &elasticsearch.Client{}

assertInvalidConfigError("invalid remote sampling config: SampledTracesIndex unspecified")
config.SampledTracesIndex = "sampled-traces"

assertInvalidConfigError("invalid storage config: StorageDir unspecified")
config.StorageDir = "tbs"

assertInvalidConfigError("invalid storage config: StorageGCInterval unspecified or negative")
config.StorageGCInterval = 1

assertInvalidConfigError("invalid storage config: TTL unspecified or negative")
config.TTL = 1
}
10 changes: 9 additions & 1 deletion x-pack/apm-server/sampling/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type traceGroups struct {
ingestRateDecayFactor float64

// maxGroups holds the maximum number of trace groups to maintain.
// Once this is reached, all trace events will be sampled.
// Once this is reached, no new trace group events will be sampled.
maxGroups int

mu sync.RWMutex
Expand Down Expand Up @@ -101,6 +101,9 @@ func (g *traceGroups) sampleTrace(tx *model.Transaction) (bool, error) {
group, ok := g.groups[key]
if ok {
defer g.mu.RUnlock()
if group.samplingFraction == 0 {
return false, nil
}
group.mu.Lock()
defer group.mu.Unlock()
group.total++
Expand All @@ -113,10 +116,15 @@ func (g *traceGroups) sampleTrace(tx *model.Transaction) (bool, error) {
group, ok = g.groups[key]
if ok {
// We've got a write lock on g.mu, no need to lock group too.
if group.samplingFraction == 0 {
return false, nil
}
group.total++
return group.reservoir.Sample(tx.Duration, tx.TraceID), nil
} else if len(g.groups) == g.maxGroups {
return false, errTooManyTraceGroups
} else if g.defaultSamplingFraction == 0 {
return false, nil
}

group = &traceGroup{
Expand Down
Loading

0 comments on commit 3529d05

Please sign in to comment.