diff --git a/x-pack/apm-server/sampling/config.go b/x-pack/apm-server/sampling/config.go new file mode 100644 index 00000000000..ace7ffc538a --- /dev/null +++ b/x-pack/apm-server/sampling/config.go @@ -0,0 +1,139 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package sampling + +import ( + "time" + + "github.com/pkg/errors" + + "github.com/elastic/apm-server/publish" + "github.com/elastic/go-elasticsearch/v7" +) + +// Config holds configuration for Processor. +type Config struct { + // BeatID holds the unique ID of this apm-server. + BeatID string + + // Reporter holds the publish.Reporter, for publishing tail-sampled trace events. + Reporter publish.Reporter + + LocalSamplingConfig + RemoteSamplingConfig + StorageConfig +} + +// LocalSamplingConfig holds Processor configuration related to local reservoir sampling. +type LocalSamplingConfig struct { + // FlushInterval holds the local sampling interval. + // + // This controls how long it takes for servers to become aware of each other's + // sampled trace IDs, and so should be in the order of tens of seconds, or low + // minutes. In order not to lose sampled trace events, FlushInterval should be + // no greater than half of the TTL. + FlushInterval time.Duration + + // MaxTraceGroups holds the maximum number of trace groups to track. + // + // Once MaxTraceGroups is reached, any root transaction forming a new trace + // group will dropped. + MaxTraceGroups int + + // DefaultSampleRate is the default sample rate to assign to new trace groups. + DefaultSampleRate float64 + + // IngestRateDecayFactor holds the ingest rate decay factor, used for calculating + // the exponentially weighted moving average (EWMA) ingest rate for each trace + // group. + IngestRateDecayFactor float64 +} + +// RemoteSamplingConfig holds Processor configuration related to publishing and +// subscribing to remote sampling decisions. +type RemoteSamplingConfig struct { + // Elasticsearch holds the Elasticsearch client to use for publishing + // and subscribing to remote sampling decisions. + Elasticsearch *elasticsearch.Client + + // SampledTracesIndex holds the name of the Elasticsearch index for + // storing and searching sampled trace IDs. + SampledTracesIndex string +} + +// StorageConfig holds Processor configuration related to event storage. +type StorageConfig struct { + // StorageDir holds the directory in which event storage will be maintained. + StorageDir string + + // StorageGCInterval holds the amount of time between storage garbage collections. + StorageGCInterval time.Duration + + // TTL holds the amount of time before events and sampling decisions + // are expired from local storage. + TTL time.Duration +} + +// Validate validates the configuration. +func (config Config) Validate() error { + if config.BeatID == "" { + return errors.New("BeatID unspecified") + } + if config.Reporter == nil { + return errors.New("Reporter unspecified") + } + if err := config.LocalSamplingConfig.validate(); err != nil { + return errors.Wrap(err, "invalid local sampling config") + } + if err := config.RemoteSamplingConfig.validate(); err != nil { + return errors.Wrap(err, "invalid remote sampling config") + } + if err := config.StorageConfig.validate(); err != nil { + return errors.Wrap(err, "invalid storage config") + } + return nil +} + +func (config LocalSamplingConfig) validate() error { + if config.FlushInterval <= 0 { + return errors.New("FlushInterval unspecified or negative") + } + if config.MaxTraceGroups <= 0 { + return errors.New("MaxTraceGroups unspecified or negative") + } + if config.DefaultSampleRate < 0 || config.DefaultSampleRate >= 1 { + // TODO(axw) allow sampling rate of 1.0 (100%), which would + // cause the root transaction to be indexed, and a sampling + // decision to be written to local storage, immediately. + return errors.New("DefaultSampleRate unspecified or out of range [0,1)") + } + if config.IngestRateDecayFactor <= 0 || config.IngestRateDecayFactor > 1 { + return errors.New("IngestRateDecayFactor unspecified or out of range (0,1]") + } + return nil +} + +func (config RemoteSamplingConfig) validate() error { + if config.Elasticsearch == nil { + return errors.New("Elasticsearch unspecified") + } + if config.SampledTracesIndex == "" { + return errors.New("SampledTracesIndex unspecified") + } + return nil +} + +func (config StorageConfig) validate() error { + if config.StorageDir == "" { + return errors.New("StorageDir unspecified") + } + if config.StorageGCInterval <= 0 { + return errors.New("StorageGCInterval unspecified or negative") + } + if config.TTL <= 0 { + return errors.New("TTL unspecified or negative") + } + return nil +} diff --git a/x-pack/apm-server/sampling/config_test.go b/x-pack/apm-server/sampling/config_test.go new file mode 100644 index 00000000000..6f9e5fc1343 --- /dev/null +++ b/x-pack/apm-server/sampling/config_test.go @@ -0,0 +1,66 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package sampling_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/apm-server/publish" + "github.com/elastic/apm-server/x-pack/apm-server/sampling" + "github.com/elastic/go-elasticsearch/v7" +) + +func TestNewProcessorConfigInvalid(t *testing.T) { + var config sampling.Config + assertInvalidConfigError := func(expectedError string) { + t.Helper() + agg, err := sampling.NewProcessor(config) + require.Error(t, err) + require.Nil(t, agg) + assert.EqualError(t, err, "invalid tail-sampling config: "+expectedError) + } + assertInvalidConfigError("BeatID unspecified") + config.BeatID = "beat" + + assertInvalidConfigError("Reporter unspecified") + config.Reporter = func(ctx context.Context, req publish.PendingReq) error { return nil } + + assertInvalidConfigError("invalid local sampling config: FlushInterval unspecified or negative") + config.FlushInterval = 1 + + assertInvalidConfigError("invalid local sampling config: MaxTraceGroups unspecified or negative") + config.MaxTraceGroups = 1 + + for _, invalid := range []float64{-1, 1.0, 2.0} { + config.DefaultSampleRate = invalid + assertInvalidConfigError("invalid local sampling config: DefaultSampleRate unspecified or out of range [0,1)") + } + config.DefaultSampleRate = 0.5 + + for _, invalid := range []float64{-1, 0, 2.0} { + config.IngestRateDecayFactor = invalid + assertInvalidConfigError("invalid local sampling config: IngestRateDecayFactor unspecified or out of range (0,1]") + } + config.IngestRateDecayFactor = 0.5 + + assertInvalidConfigError("invalid remote sampling config: Elasticsearch unspecified") + config.Elasticsearch = &elasticsearch.Client{} + + assertInvalidConfigError("invalid remote sampling config: SampledTracesIndex unspecified") + config.SampledTracesIndex = "sampled-traces" + + assertInvalidConfigError("invalid storage config: StorageDir unspecified") + config.StorageDir = "tbs" + + assertInvalidConfigError("invalid storage config: StorageGCInterval unspecified or negative") + config.StorageGCInterval = 1 + + assertInvalidConfigError("invalid storage config: TTL unspecified or negative") + config.TTL = 1 +} diff --git a/x-pack/apm-server/sampling/groups.go b/x-pack/apm-server/sampling/groups.go index 1a825832890..c17662b5730 100644 --- a/x-pack/apm-server/sampling/groups.go +++ b/x-pack/apm-server/sampling/groups.go @@ -29,7 +29,7 @@ type traceGroups struct { ingestRateDecayFactor float64 // maxGroups holds the maximum number of trace groups to maintain. - // Once this is reached, all trace events will be sampled. + // Once this is reached, no new trace group events will be sampled. maxGroups int mu sync.RWMutex @@ -101,6 +101,9 @@ func (g *traceGroups) sampleTrace(tx *model.Transaction) (bool, error) { group, ok := g.groups[key] if ok { defer g.mu.RUnlock() + if group.samplingFraction == 0 { + return false, nil + } group.mu.Lock() defer group.mu.Unlock() group.total++ @@ -113,10 +116,15 @@ func (g *traceGroups) sampleTrace(tx *model.Transaction) (bool, error) { group, ok = g.groups[key] if ok { // We've got a write lock on g.mu, no need to lock group too. + if group.samplingFraction == 0 { + return false, nil + } group.total++ return group.reservoir.Sample(tx.Duration, tx.TraceID), nil } else if len(g.groups) == g.maxGroups { return false, errTooManyTraceGroups + } else if g.defaultSamplingFraction == 0 { + return false, nil } group = &traceGroup{ diff --git a/x-pack/apm-server/sampling/processor.go b/x-pack/apm-server/sampling/processor.go new file mode 100644 index 00000000000..df3ba0ce8b1 --- /dev/null +++ b/x-pack/apm-server/sampling/processor.go @@ -0,0 +1,374 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package sampling + +import ( + "context" + "sync" + "time" + + "github.com/dgraph-io/badger/v2" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + + logs "github.com/elastic/apm-server/log" + "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/publish" + "github.com/elastic/apm-server/transform" + "github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage" + "github.com/elastic/apm-server/x-pack/apm-server/sampling/pubsub" + "github.com/elastic/beats/v7/libbeat/logp" +) + +const ( + badgerValueLogFileSize = 128 * 1024 * 1024 +) + +// ErrStopped is returned when calling ProcessTransformables on a stopped Processor. +var ErrStopped = errors.New("processor is stopped") + +// Processor is a tail-sampling event processor. +type Processor struct { + config Config + logger *logp.Logger + groups *traceGroups + + storageMu sync.RWMutex + db *badger.DB + storage *eventstorage.ShardedReadWriter + + stopMu sync.Mutex + stopping chan struct{} + stopped chan struct{} +} + +// NewProcessor returns a new Processor, for tail-sampling trace events. +func NewProcessor(config Config) (*Processor, error) { + if err := config.Validate(); err != nil { + return nil, errors.Wrap(err, "invalid tail-sampling config") + } + + logger := logp.NewLogger(logs.Sampling) + badgerOpts := badger.DefaultOptions(config.StorageDir) + badgerOpts.ValueLogFileSize = badgerValueLogFileSize + badgerOpts.Logger = eventstorage.LogpAdaptor{Logger: logger} + db, err := badger.Open(badgerOpts) + if err != nil { + return nil, err + } + + eventCodec := eventstorage.JSONCodec{} + storage := eventstorage.New(db, eventCodec, config.TTL) + readWriter := storage.NewShardedReadWriter() + + p := &Processor{ + config: config, + logger: logger, + groups: newTraceGroups(config.MaxTraceGroups, config.DefaultSampleRate, config.IngestRateDecayFactor), + db: db, + storage: readWriter, + stopping: make(chan struct{}), + stopped: make(chan struct{}), + } + return p, nil +} + +// ProcessTransformables tail-samples transactions and spans. +// +// Any events returned by the processor will be published immediately. +// This includes: +// +// - Non-trace events (errors, metricsets) +// - Trace events which are already known to have been tail-sampled +// - Transactions which are head-based unsampled +// +// All other trace events will either be dropped (e.g. known to not +// be tail-sampled), or stored for possible later publication. +func (p *Processor) ProcessTransformables(ctx context.Context, events []transform.Transformable) ([]transform.Transformable, error) { + p.storageMu.RLock() + defer p.storageMu.RUnlock() + if p.storage == nil { + return nil, ErrStopped + } + for i := 0; i < len(events); i++ { + var drop bool + var err error + switch event := events[i].(type) { + case *model.Transaction: + drop, err = p.processTransaction(event) + case *model.Span: + drop, err = p.processSpan(event) + default: + continue + } + if err != nil { + return nil, err + } + if drop { + n := len(events) + events[i], events[n-1] = events[n-1], events[i] + events = events[:n-1] + i-- + } + } + return events, nil +} + +func (p *Processor) processTransaction(tx *model.Transaction) (bool, error) { + if tx.Sampled != nil && !*tx.Sampled { + // (Head-based) unsampled transactions are passed through + // by the tail sampler. + return false, nil + } + + traceSampled, err := p.storage.IsTraceSampled(tx.TraceID) + switch err { + case nil: + // Tail-sampling decision has been made, index or drop the transaction. + drop := !traceSampled + return drop, nil + case eventstorage.ErrNotFound: + // Tail-sampling decision has not yet been made. + break + default: + return false, err + } + + if tx.ParentID != "" { + // Non-root transaction: write to local storage while we wait + // for a sampling decision. + return true, p.storage.WriteTransaction(tx) + } + + // Root transaction: apply reservoir sampling. + reservoirSampled, err := p.groups.sampleTrace(tx) + if err == errTooManyTraceGroups { + // Too many trace groups, drop the transaction. + // + // TODO(axw) log a warning with a rate limit. + // TODO(axw) should we have an "other" bucket to capture, + // and capture them with the default rate? + // likely does not make sense to reservoir sample, + // except when there is a single logical trace group + // with high cardinality transaction names. + return true, nil + } else if err != nil { + return false, err + } + + if !reservoirSampled { + // Write the non-sampling decision to storage to avoid further + // writes for the trace ID, and then drop the transaction. + // + // This is a local optimisation only. To avoid creating network + // traffic and load on Elasticsearch for uninteresting root + // transactions, we do not propagate this to other APM Servers. + return true, p.storage.WriteTraceSampled(tx.TraceID, false) + } + + // The root transaction was admitted to the sampling reservoir, so we + // can proceed to write the transaction to storage and then drop it; + // we may index it later, after finalising the sampling decision. + return true, p.storage.WriteTransaction(tx) +} + +func (p *Processor) processSpan(span *model.Span) (bool, error) { + traceSampled, err := p.storage.IsTraceSampled(span.TraceID) + if err != nil { + if err == eventstorage.ErrNotFound { + // Tail-sampling decision has not yet been made, write span to local storage. + return true, p.storage.WriteSpan(span) + } + return false, err + } + // Tail-sampling decision has been made, index or drop the event. + drop := !traceSampled + return drop, nil +} + +// Stop stops the processor, flushing and closing the event storage. +func (p *Processor) Stop(ctx context.Context) error { + p.stopMu.Lock() + if p.storage == nil { + // Already fully stopped. + p.stopMu.Unlock() + return nil + } + select { + case <-p.stopping: + // already stopping + default: + close(p.stopping) + } + p.stopMu.Unlock() + + // Wait for Run to return. + select { + case <-ctx.Done(): + return ctx.Err() + case <-p.stopped: + } + + // Lock storage before stopping, to prevent closing + // storage while ProcessTransformables is using it. + p.storageMu.Lock() + defer p.storageMu.Unlock() + + if err := p.storage.Flush(); err != nil { + return err + } + p.storage.Close() + if err := p.db.Close(); err != nil { + return err + } + p.storage = nil + return nil +} + +// Run runs the tail-sampling processor. This method is responsible for: +// +// - periodically making, and then publishing, local sampling decisions +// - subscribing to remote sampling decisions +// - reacting to both local and remote sampling decisions by reading +// related events from local storage, and then reporting them +// +// Run returns when a fatal error occurs or the Stop method is invoked. +func (p *Processor) Run() error { + p.storageMu.RLock() + defer p.storageMu.RUnlock() + defer func() { + p.stopMu.Lock() + defer p.stopMu.Unlock() + select { + case <-p.stopped: + default: + close(p.stopped) + } + }() + + // NOTE(axw) the user can configure the tail-sampling flush interval, + // but cannot directly control the bulk indexing flush interval. The + // bulk indexing is expected to complete soon after the tail-sampling + // flush interval. + bulkIndexerFlushInterval := 5 * time.Second + if bulkIndexerFlushInterval > p.config.FlushInterval { + bulkIndexerFlushInterval = p.config.FlushInterval + } + + pubsub, err := pubsub.New(pubsub.Config{ + BeatID: p.config.BeatID, + Client: p.config.Elasticsearch, + Index: p.config.SampledTracesIndex, + Logger: p.logger, + + // Issue pubsub subscriber search requests at twice the frequency + // of publishing, so each server observes each other's sampled + // trace IDs soon after they are published. + SearchInterval: p.config.FlushInterval / 2, + FlushInterval: bulkIndexerFlushInterval, + }) + if err != nil { + return err + } + + remoteSampledTraceIDs := make(chan string) + localSampledTraceIDs := make(chan string) + errgroup, ctx := errgroup.WithContext(context.Background()) + errgroup.Go(func() error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-p.stopping: + return context.Canceled + } + }) + errgroup.Go(func() error { + // This goroutine is responsible for periodically garbage + // collecting the Badger value log, using the recommended + // discard ratio of 0.5. + ticker := time.NewTicker(p.config.StorageGCInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + const discardRatio = 0.5 + if err := p.db.RunValueLogGC(discardRatio); err != nil && err != badger.ErrNoRewrite { + return err + } + } + } + }) + errgroup.Go(func() error { + return pubsub.SubscribeSampledTraceIDs(ctx, remoteSampledTraceIDs) + }) + errgroup.Go(func() error { + ticker := time.NewTicker(p.config.FlushInterval) + defer ticker.Stop() + var traceIDs []string + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + p.logger.Debug("finalizing local sampling reservoirs") + traceIDs = p.groups.finalizeSampledTraces(traceIDs) + if len(traceIDs) == 0 { + continue + } + if err := pubsub.PublishSampledTraceIDs(ctx, traceIDs...); err != nil { + return err + } + for _, traceID := range traceIDs { + select { + case <-ctx.Done(): + return ctx.Err() + case localSampledTraceIDs <- traceID: + } + } + traceIDs = traceIDs[:0] + } + } + }) + errgroup.Go(func() error { + // TODO(axw) pace the publishing over the flush interval? + // Alternatively we can rely on backpressure from the reporter, + // removing the artificial one second timeout from publisher code + // and just waiting as long as it takes here. + var events model.Batch + for { + var traceID string + select { + case <-ctx.Done(): + return ctx.Err() + case traceID = <-remoteSampledTraceIDs: + p.logger.Debug("received remotely sampled trace ID") + case traceID = <-localSampledTraceIDs: + } + if err := p.storage.WriteTraceSampled(traceID, true); err != nil { + return err + } + if err := p.storage.ReadEvents(traceID, &events); err != nil { + return err + } + transformables := events.Transformables() + if len(transformables) > 0 { + p.logger.Debugf("reporting %d events", len(transformables)) + if err := p.config.Reporter(ctx, publish.PendingReq{ + Transformables: transformables, + Trace: true, + }); err != nil { + p.logger.With(logp.Error(err)).Warn("failed to report events") + } + } + events.Reset() + } + }) + if err := errgroup.Wait(); err != nil && err != context.Canceled { + return err + } + return nil +} diff --git a/x-pack/apm-server/sampling/processor_bench_test.go b/x-pack/apm-server/sampling/processor_bench_test.go new file mode 100644 index 00000000000..3089002c4e6 --- /dev/null +++ b/x-pack/apm-server/sampling/processor_bench_test.go @@ -0,0 +1,59 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package sampling_test + +import ( + "context" + cryptorand "crypto/rand" + "encoding/binary" + "encoding/hex" + "math/rand" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/transform" + "github.com/elastic/apm-server/x-pack/apm-server/sampling" +) + +func BenchmarkProcess(b *testing.B) { + processor, err := sampling.NewProcessor(newTempdirConfig(b)) + require.NoError(b, err) + go processor.Run() + defer processor.Stop(context.Background()) + + b.RunParallel(func(pb *testing.PB) { + var seed int64 + err := binary.Read(cryptorand.Reader, binary.LittleEndian, &seed) + assert.NoError(b, err) + rng := rand.New(rand.NewSource(seed)) + + var traceID [16]byte + for pb.Next() { + binary.LittleEndian.PutUint64(traceID[:8], rng.Uint64()) + binary.LittleEndian.PutUint64(traceID[8:], rng.Uint64()) + transactionID := traceID[:8] + spanID := traceID[8:] + transaction := &model.Transaction{ + TraceID: hex.EncodeToString(traceID[:]), + ID: hex.EncodeToString(transactionID), + } + spanParentID := hex.EncodeToString(transactionID) + span := &model.Span{ + TraceID: hex.EncodeToString(traceID[:]), + ID: hex.EncodeToString(spanID), + ParentID: spanParentID, + } + if _, err := processor.ProcessTransformables(context.Background(), []transform.Transformable{ + transaction, + span, span, span, + }); err != nil { + b.Fatal(err) + } + } + }) +} diff --git a/x-pack/apm-server/sampling/processor_test.go b/x-pack/apm-server/sampling/processor_test.go new file mode 100644 index 00000000000..c3c087e205b --- /dev/null +++ b/x-pack/apm-server/sampling/processor_test.go @@ -0,0 +1,454 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package sampling_test + +import ( + "context" + "io/ioutil" + "os" + "sort" + "strings" + "testing" + "time" + + "github.com/dgraph-io/badger/v2" + "github.com/gofrs/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/publish" + "github.com/elastic/apm-server/transform" + "github.com/elastic/apm-server/x-pack/apm-server/sampling" + "github.com/elastic/apm-server/x-pack/apm-server/sampling/eventstorage" + "github.com/elastic/apm-server/x-pack/apm-server/sampling/pubsub/pubsubtest" +) + +func TestProcessUnsampled(t *testing.T) { + processor, err := sampling.NewProcessor(newTempdirConfig(t)) + require.NoError(t, err) + go processor.Run() + defer processor.Stop(context.Background()) + + transaction := &model.Transaction{ + TraceID: "0102030405060708090a0b0c0d0e0f10", + ID: "0102030405060708", + Sampled: newBool(false), + } + in := []transform.Transformable{transaction} + out, err := processor.ProcessTransformables(context.Background(), in) + require.NoError(t, err) + + // Unsampled transaction should be reported immediately. + assert.Equal(t, in, out) +} + +func TestProcessAlreadyTailSampled(t *testing.T) { + config := newTempdirConfig(t) + + // Seed event storage with a tail-sampling decisions, to show that + // subsequent events in the trace will be reported immediately. + traceID1 := "0102030405060708090a0b0c0d0e0f10" + traceID2 := "0102030405060708090a0b0c0d0e0f11" + withBadger(t, config.StorageDir, func(db *badger.DB) { + storage := eventstorage.New(db, eventstorage.JSONCodec{}, time.Minute) + writer := storage.NewReadWriter() + defer writer.Close() + assert.NoError(t, writer.WriteTraceSampled(traceID1, true)) + assert.NoError(t, writer.Flush()) + + storage = eventstorage.New(db, eventstorage.JSONCodec{}, -1) // expire immediately + writer = storage.NewReadWriter() + defer writer.Close() + assert.NoError(t, writer.WriteTraceSampled(traceID2, true)) + assert.NoError(t, writer.Flush()) + }) + + processor, err := sampling.NewProcessor(config) + require.NoError(t, err) + go processor.Run() + defer processor.Stop(context.Background()) + + transaction1 := &model.Transaction{ + TraceID: traceID1, + ID: "0102030405060708", + } + span1 := &model.Span{ + TraceID: traceID1, + ID: "0102030405060709", + } + transaction2 := &model.Transaction{ + TraceID: traceID2, + ID: "0102030405060710", + } + span2 := &model.Span{ + TraceID: traceID2, + ID: "0102030405060711", + } + + in := []transform.Transformable{transaction1, transaction2, span1, span2} + out, err := processor.ProcessTransformables(context.Background(), in) + require.NoError(t, err) + + // Tail sampling decision already made. The first transaction and span should be + // reported immediately, whereas the second ones should be written storage since + // they were received after the trace sampling entry expired. + assert.Equal(t, []transform.Transformable{transaction1, span1}, out) + + // Stop the processor so we can access the database. + assert.NoError(t, processor.Stop(context.Background())) + withBadger(t, config.StorageDir, func(db *badger.DB) { + storage := eventstorage.New(db, eventstorage.JSONCodec{}, time.Minute) + reader := storage.NewReadWriter() + defer reader.Close() + + var batch model.Batch + err := reader.ReadEvents(traceID1, &batch) + assert.NoError(t, err) + assert.Zero(t, batch) + + err = reader.ReadEvents(traceID2, &batch) + assert.NoError(t, err) + assert.Equal(t, model.Batch{ + Spans: []*model.Span{span2}, + Transactions: []*model.Transaction{transaction2}, + }, batch) + }) +} + +func TestProcessLocalTailSampling(t *testing.T) { + config := newTempdirConfig(t) + config.DefaultSampleRate = 0.5 + config.FlushInterval = 10 * time.Millisecond + published := make(chan string) + config.Elasticsearch = pubsubtest.Client(pubsubtest.PublisherChan(published), nil) + + processor, err := sampling.NewProcessor(config) + require.NoError(t, err) + go processor.Run() + defer processor.Stop(context.Background()) + + traceID1 := "0102030405060708090a0b0c0d0e0f10" + traceID2 := "0102030405060708090a0b0c0d0e0f11" + trace1Events := model.Batch{ + Transactions: []*model.Transaction{{ + TraceID: traceID1, + ID: "0102030405060708", + Duration: 123, + }}, + Spans: []*model.Span{{ + TraceID: traceID1, + ID: "0102030405060709", + Duration: 123, + }}, + } + trace2Events := model.Batch{ + Transactions: []*model.Transaction{{ + TraceID: traceID2, + ID: "0102030405060710", + Duration: 456, + }}, + Spans: []*model.Span{{ + TraceID: traceID2, + ID: "0102030405060711", + Duration: 456, + }}, + } + + in := append(trace1Events.Transformables(), trace2Events.Transformables()...) + out, err := processor.ProcessTransformables(context.Background(), in) + require.NoError(t, err) + assert.Empty(t, out) + + // We have configured 50% tail-sampling, so we expect a single trace ID + // to be published. Sampling is non-deterministic (weighted random), so + // we can't anticipate a specific trace ID. + + var sampledTraceID string + select { + case sampledTraceID = <-published: + case <-time.After(10 * time.Second): + t.Fatal("timed out waiting for publication") + } + select { + case <-published: + t.Fatal("unexpected publication") + case <-time.After(50 * time.Millisecond): + } + + unsampledTraceID := traceID2 + sampledTraceEvents := trace1Events + unsampledTraceEvents := trace2Events + if sampledTraceID == traceID2 { + unsampledTraceID = traceID1 + unsampledTraceEvents = trace1Events + sampledTraceEvents = trace2Events + } + + // Stop the processor so we can access the database. + assert.NoError(t, processor.Stop(context.Background())) + withBadger(t, config.StorageDir, func(db *badger.DB) { + storage := eventstorage.New(db, eventstorage.JSONCodec{}, time.Minute) + reader := storage.NewReadWriter() + defer reader.Close() + + sampled, err := reader.IsTraceSampled(sampledTraceID) + assert.NoError(t, err) + assert.True(t, sampled) + + sampled, err = reader.IsTraceSampled(unsampledTraceID) + assert.Equal(t, eventstorage.ErrNotFound, err) + assert.False(t, sampled) + + var batch model.Batch + err = reader.ReadEvents(sampledTraceID, &batch) + assert.NoError(t, err) + assert.Equal(t, sampledTraceEvents, batch) + + // Even though the trace is unsampled, the events will be + // available in storage until the TTL expires, as they're + // written there first. + batch.Reset() + err = reader.ReadEvents(unsampledTraceID, &batch) + assert.NoError(t, err) + assert.Equal(t, unsampledTraceEvents, batch) + }) +} + +func TestProcessLocalTailSamplingUnsampled(t *testing.T) { + config := newTempdirConfig(t) + config.FlushInterval = time.Minute + processor, err := sampling.NewProcessor(config) + require.NoError(t, err) + go processor.Run() + defer processor.Stop(context.Background()) + + // Process root transactions until one is rejected. + traceIDs := make([]string, 10000) + for i := range traceIDs { + traceID := uuid.Must(uuid.NewV4()).String() + traceIDs[i] = traceID + tx := &model.Transaction{ + TraceID: traceID, + ID: traceID, + Duration: 1, + } + out, err := processor.ProcessTransformables(context.Background(), []transform.Transformable{tx}) + require.NoError(t, err) + assert.Empty(t, out) + } + + // Stop the processor so we can access the database. + assert.NoError(t, processor.Stop(context.Background())) + withBadger(t, config.StorageDir, func(db *badger.DB) { + storage := eventstorage.New(db, eventstorage.JSONCodec{}, time.Minute) + reader := storage.NewReadWriter() + defer reader.Close() + + var anyUnsampled bool + for _, traceID := range traceIDs { + sampled, err := reader.IsTraceSampled(traceID) + if err == eventstorage.ErrNotFound { + // No sampling decision made yet. + } else { + assert.NoError(t, err) + assert.False(t, sampled) + anyUnsampled = true + break + } + } + assert.True(t, anyUnsampled) + }) +} + +func TestProcessRemoteTailSampling(t *testing.T) { + config := newTempdirConfig(t) + config.DefaultSampleRate = 0.5 + config.FlushInterval = 10 * time.Millisecond + + var published []string + var publisher pubsubtest.PublisherFunc = func(ctx context.Context, traceID string) error { + published = append(published, traceID) + return nil + } + subscriberChan := make(chan string) + subscriber := pubsubtest.SubscriberChan(subscriberChan) + config.Elasticsearch = pubsubtest.Client(publisher, subscriber) + + reported := make(chan []transform.Transformable) + config.Reporter = func(ctx context.Context, req publish.PendingReq) error { + select { + case <-ctx.Done(): + return ctx.Err() + case reported <- req.Transformables: + return nil + } + } + + processor, err := sampling.NewProcessor(config) + require.NoError(t, err) + go processor.Run() + defer processor.Stop(context.Background()) + + traceID1 := "0102030405060708090a0b0c0d0e0f10" + traceID2 := "0102030405060708090a0b0c0d0e0f11" + trace1Events := model.Batch{ + Spans: []*model.Span{{ + TraceID: traceID1, + ID: "0102030405060709", + Duration: 123, + }}, + } + + in := trace1Events.Transformables() + out, err := processor.ProcessTransformables(context.Background(), in) + require.NoError(t, err) + assert.Empty(t, out) + + subscriberChan <- traceID2 + subscriberChan <- traceID1 + + select { + case <-reported: + case <-time.After(10 * time.Second): + t.Fatal("timed out waiting for reporting") + } + select { + case <-reported: + t.Fatal("unexpected reporting") + case <-time.After(50 * time.Millisecond): + } + + // Stop the processor so we can access the database. + assert.NoError(t, processor.Stop(context.Background())) + assert.Empty(t, published) // remote decisions don't get republished + + withBadger(t, config.StorageDir, func(db *badger.DB) { + storage := eventstorage.New(db, eventstorage.JSONCodec{}, time.Minute) + reader := storage.NewReadWriter() + defer reader.Close() + + sampled, err := reader.IsTraceSampled(traceID1) + assert.NoError(t, err) + assert.True(t, sampled) + + sampled, err = reader.IsTraceSampled(traceID2) + assert.NoError(t, err) + assert.True(t, sampled) + + var batch model.Batch + err = reader.ReadEvents(traceID1, &batch) + assert.NoError(t, err) + assert.Equal(t, trace1Events, batch) + + batch = model.Batch{} + err = reader.ReadEvents(traceID2, &batch) + assert.NoError(t, err) + assert.Zero(t, batch) + }) +} + +func TestStorageGC(t *testing.T) { + if testing.Short() { + t.Skip("skipping slow test") + } + + config := newTempdirConfig(t) + config.TTL = 10 * time.Millisecond + config.FlushInterval = 10 * time.Millisecond + + writeBatch := func(n int) { + processor, err := sampling.NewProcessor(config) + require.NoError(t, err) + go processor.Run() + defer processor.Stop(context.Background()) + for i := 0; i < n; i++ { + traceID := uuid.Must(uuid.NewV4()).String() + out, err := processor.ProcessTransformables(context.Background(), []transform.Transformable{&model.Span{ + TraceID: traceID, + ID: traceID, + Duration: 123, + }}) + require.NoError(t, err) + assert.Empty(t, out) + } + } + + vlogFilenames := func() []string { + dir, _ := os.Open(config.StorageDir) + names, _ := dir.Readdirnames(-1) + defer dir.Close() + + var vlogs []string + for _, name := range names { + if strings.HasSuffix(name, ".vlog") { + vlogs = append(vlogs, name) + } + } + sort.Strings(vlogs) + return vlogs + } + + // Process spans until more than one value log file has been created, + // but the first one does not exist (has been garbage collected). + for len(vlogFilenames()) < 2 { + writeBatch(50000) + } + + config.StorageGCInterval = 10 * time.Millisecond + processor, err := sampling.NewProcessor(config) + require.NoError(t, err) + go processor.Run() + defer processor.Stop(context.Background()) + + deadline := time.Now().Add(10 * time.Second) + for time.Now().Before(deadline) { + vlogs := vlogFilenames() + if len(vlogs) == 0 || vlogs[0] != "000000.vlog" { + // garbage collected + return + } + time.Sleep(10 * time.Millisecond) + } + t.Fatal("timed out waiting for value log garbage collection") +} + +func withBadger(tb testing.TB, storageDir string, f func(db *badger.DB)) { + badgerOpts := badger.DefaultOptions(storageDir) + badgerOpts.Logger = nil + db, err := badger.Open(badgerOpts) + require.NoError(tb, err) + f(db) + assert.NoError(tb, db.Close()) +} + +func newTempdirConfig(tb testing.TB) sampling.Config { + tempdir, err := ioutil.TempDir("", "samplingtest") + require.NoError(tb, err) + tb.Cleanup(func() { os.RemoveAll(tempdir) }) + return sampling.Config{ + BeatID: "local-apm-server", + Reporter: func(ctx context.Context, req publish.PendingReq) error { return nil }, + LocalSamplingConfig: sampling.LocalSamplingConfig{ + FlushInterval: time.Second, + MaxTraceGroups: 1000, + DefaultSampleRate: 0.1, + IngestRateDecayFactor: 0.9, + }, + RemoteSamplingConfig: sampling.RemoteSamplingConfig{ + Elasticsearch: pubsubtest.Client(nil, nil), + SampledTracesIndex: ".apm-sampled-traces", + }, + StorageConfig: sampling.StorageConfig{ + StorageDir: tempdir, + StorageGCInterval: time.Second, + TTL: 30 * time.Minute, + }, + } +} + +func newBool(v bool) *bool { + return &v +} diff --git a/x-pack/apm-server/sampling/pubsub/pubsubtest/client.go b/x-pack/apm-server/sampling/pubsub/pubsubtest/client.go new file mode 100644 index 00000000000..8fca3050aca --- /dev/null +++ b/x-pack/apm-server/sampling/pubsub/pubsubtest/client.go @@ -0,0 +1,180 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package pubsubtest + +import ( + "context" + "encoding/json" + "errors" + "io" + "net/http" + "net/http/httptest" + "time" + + "github.com/elastic/go-elasticsearch/v7" + "github.com/elastic/go-elasticsearch/v7/esutil" +) + +// Publisher is an interface to pass to Client that responds to publish +// requests, consuming a trace ID sent by the requester. +type Publisher interface { + Publish(ctx context.Context, traceID string) error +} + +// PublisherChan is a Publisher implemented as a channel. +type PublisherChan chan<- string + +// Publish waits for traceID to be sent on c, or for ctx to be done. +func (c PublisherChan) Publish(ctx context.Context, traceID string) error { + select { + case <-ctx.Done(): + return ctx.Err() + case c <- traceID: + return nil + } +} + +// PublisherFunc is a Publisher implemented as a function. +type PublisherFunc func(context.Context, string) error + +// Publish calls f(ctx, traceID). +func (f PublisherFunc) Publish(ctx context.Context, traceID string) error { + return f(ctx, traceID) +} + +// Subscriber is an interface to pass to Client that responds to subscribe +// requests, returning a trace ID to send back to the requester. +type Subscriber interface { + Subscribe(ctx context.Context) (traceID string, err error) +} + +// SubscriberChan is a Subscriber implemented as a channel. +type SubscriberChan <-chan string + +// Subscribe waits for a trace ID to be received on c, or for ctx to be done. +func (c SubscriberChan) Subscribe(ctx context.Context) (string, error) { + select { + case <-ctx.Done(): + return "", ctx.Err() + case traceID, ok := <-c: + if !ok { + return "", errors.New("channel closed") + } + return traceID, nil + } +} + +// SubscriberFunc is a Subscriber implemented as a function. +type SubscriberFunc func(ctx context.Context) (string, error) + +// Subscribe calls f(ctx). +func (f SubscriberFunc) Subscribe(ctx context.Context) (string, error) { + return f(ctx) +} + +// Client returns a new elasticsearch.Client, suitable for use with pubsub, +// that responds to publish requests by calling pub (if non-nil) and subscribe +// requests by calling sub (if non-nil). If either function is nil, then the +// respective operation will be a no-op. +func Client(pub Publisher, sub Subscriber) *elasticsearch.Client { + client, err := elasticsearch.NewClient(elasticsearch.Config{ + Addresses: []string{"testing.invalid"}, + Transport: &channelClientRoundTripper{pub: pub, sub: sub}, + }) + if err != nil { + panic(err) + } + return client +} + +type channelClientRoundTripper struct { + pub Publisher + sub Subscriber + seqno int64 +} + +func (rt *channelClientRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { + type traceIDDocument struct { + Observer struct { + ID string `json:"id"` + } `json:"observer"` + + Trace struct { + ID string `json:"id"` + } `json:"trace"` + } + type traceIDDocumentHit struct { + SeqNo int64 `json:"_seq_no,omitempty"` + PrimaryTerm int64 `json:"_primary_term,omitempty"` + Source traceIDDocument `json:"_source"` + } + + recorder := httptest.NewRecorder() + switch r.Method { + case "GET": + // Subscribe + var result struct { + Hits struct { + Hits []traceIDDocumentHit + } + } + if rt.sub != nil { + ctx, cancel := context.WithTimeout(r.Context(), 50*time.Millisecond) + defer cancel() + for { + var traceID string + err := ctx.Err() + if err == nil { + traceID, err = rt.sub.Subscribe(ctx) + } + if err == context.DeadlineExceeded { + break + } else if err != nil { + return nil, err + } + rt.seqno++ + hit := traceIDDocumentHit{SeqNo: rt.seqno, PrimaryTerm: 1} + hit.Source.Trace.ID = traceID + hit.Source.Observer.ID = "👀" + result.Hits.Hits = append(result.Hits.Hits, hit) + } + } + if err := json.NewEncoder(recorder).Encode(result); err != nil { + return nil, err + } + recorder.Flush() + case "POST": + // Publish + var results []map[string]esutil.BulkIndexerResponseItem + dec := json.NewDecoder(r.Body) + for { + var m map[string]interface{} + if err := dec.Decode(&m); err != nil { + if err == io.EOF { + break + } + return nil, err + } + var action string + for action = range m { + } + var doc traceIDDocument + if err := dec.Decode(&doc); err != nil { + return nil, err + } + if rt.pub != nil { + if err := rt.pub.Publish(r.Context(), doc.Trace.ID); err != nil { + return nil, err + } + } + result := esutil.BulkIndexerResponseItem{Status: 200} + results = append(results, map[string]esutil.BulkIndexerResponseItem{action: result}) + } + if err := json.NewEncoder(recorder).Encode(results); err != nil { + return nil, err + } + } + return recorder.Result(), nil +}