From a6e40160c342192751332fb96044e9aa272a8a5c Mon Sep 17 00:00:00 2001 From: songy23 Date: Fri, 26 Jul 2019 15:22:23 -0700 Subject: [PATCH] Add factory and update config for tail sampling processor Updates #146 --- .../processor/tailsampling/factory.go | 58 +++++++++++++++++++ .../processor/tailsampling/factory_test.go | 41 +++++++++++++ .../processor/tailsampling/processor.go | 50 ++++++++-------- .../processor/tailsampling/processor_test.go | 50 +++++++++++----- service/builder/sampling_builder.go | 6 ++ 5 files changed, 167 insertions(+), 38 deletions(-) create mode 100644 internal/collector/processor/tailsampling/factory.go create mode 100644 internal/collector/processor/tailsampling/factory_test.go diff --git a/internal/collector/processor/tailsampling/factory.go b/internal/collector/processor/tailsampling/factory.go new file mode 100644 index 00000000000..fa4c6baca87 --- /dev/null +++ b/internal/collector/processor/tailsampling/factory.go @@ -0,0 +1,58 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tailsampling + +import ( + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-service/config/configerror" + "github.com/open-telemetry/opentelemetry-service/config/configmodels" + "github.com/open-telemetry/opentelemetry-service/consumer" + "github.com/open-telemetry/opentelemetry-service/processor" + "github.com/open-telemetry/opentelemetry-service/service/builder" +) + +const ( + // The value of "type" Tail Sampling in configuration. + typeStr = "tail-sampling" +) + +// Factory is the factory for Tail Sampling processor. +type Factory struct { +} + +// Type gets the type of the config created by this factory. +func (f *Factory) Type() string { + return typeStr +} + +// CreateTraceProcessor creates a trace processor based on this config. +func (f *Factory) CreateTraceProcessor( + logger *zap.Logger, + nextConsumer consumer.TraceConsumer, + cfg configmodels.Processor, +) (processor.TraceProcessor, error) { + tCfg := cfg.(*builder.TailBasedCfg) + return NewTraceProcessor(logger, nextConsumer, *tCfg) +} + +// CreateMetricsProcessor creates a metrics processor based on this config. +func (f *Factory) CreateMetricsProcessor( + logger *zap.Logger, + nextConsumer consumer.MetricsConsumer, + cfg configmodels.Processor, +) (processor.MetricsProcessor, error) { + return nil, configerror.ErrDataTypeIsNotSupported +} diff --git a/internal/collector/processor/tailsampling/factory_test.go b/internal/collector/processor/tailsampling/factory_test.go new file mode 100644 index 00000000000..dcfb40c9442 --- /dev/null +++ b/internal/collector/processor/tailsampling/factory_test.go @@ -0,0 +1,41 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tailsampling + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-service/exporter/exportertest" + "github.com/open-telemetry/opentelemetry-service/service/builder" +) + +func TestCreateProcessor(t *testing.T) { + factory := &Factory{} + require.NotNil(t, factory) + + cfg := builder.NewDefaultTailBasedCfg() + + tp, err := factory.CreateTraceProcessor(zap.NewNop(), exportertest.NewNopTraceExporter(), cfg) + assert.NotNil(t, tp) + assert.NoError(t, err, "cannot create trace processor") + + mp, err := factory.CreateMetricsProcessor(zap.NewNop(), nil, cfg) + assert.Nil(t, mp) + assert.Error(t, err, "should not be able to create metric processor") +} diff --git a/internal/collector/processor/tailsampling/processor.go b/internal/collector/processor/tailsampling/processor.go index 150706fbcbf..f08a15ecf3b 100644 --- a/internal/collector/processor/tailsampling/processor.go +++ b/internal/collector/processor/tailsampling/processor.go @@ -16,6 +16,7 @@ package tailsampling import ( "context" + "errors" "runtime" "sync" "sync/atomic" @@ -30,7 +31,8 @@ import ( "github.com/open-telemetry/opentelemetry-service/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-service/internal/collector/processor/idbatcher" "github.com/open-telemetry/opentelemetry-service/internal/collector/sampling" - "github.com/open-telemetry/opentelemetry-service/observability" + "github.com/open-telemetry/opentelemetry-service/processor" + "github.com/open-telemetry/opentelemetry-service/service/builder" ) // Policy combines a sampling policy evaluator with the destinations to be @@ -54,6 +56,7 @@ type traceKey string // policies to sample traces. type tailSamplingSpanProcessor struct { ctx context.Context + nextConsumer consumer.TraceConsumer start sync.Once maxNumTraces uint64 policies []*Policy @@ -69,40 +72,41 @@ const ( sourceFormat = "tail-sampling" ) -var _ consumer.TraceConsumer = (*tailSamplingSpanProcessor)(nil) +var _ processor.TraceProcessor = (*tailSamplingSpanProcessor)(nil) -// NewTailSamplingSpanProcessor creates a TailSamplingSpanProcessor with the given policies. -// It will keep maxNumTraces on memory and will attempt to wait until decisionWait before evaluating if -// a trace should be sampled or not. Providing expectedNewTracesPerSec helps with allocating data structures -// with closer to actual usage size. -func NewTailSamplingSpanProcessor( - policies []*Policy, - maxNumTraces, expectedNewTracesPerSec uint64, - decisionWait time.Duration, - logger *zap.Logger) (consumer.TraceConsumer, error) { +// NewTraceProcessor returns a processor.TraceProcessor that will perform tail sampling according to the given +// configuration. +func NewTraceProcessor(logger *zap.Logger, nextConsumer consumer.TraceConsumer, cfg builder.TailBasedCfg) (processor.TraceProcessor, error) { + if nextConsumer == nil { + return nil, errors.New("nextConsumer is nil") + } - numDecisionBatches := uint64(decisionWait.Seconds()) - inBatcher, err := idbatcher.New(numDecisionBatches, expectedNewTracesPerSec, uint64(2*runtime.NumCPU())) + numDecisionBatches := uint64(cfg.DecisionWait.Seconds()) + inBatcher, err := idbatcher.New(numDecisionBatches, cfg.ExpectedNewTracesPerSec, uint64(2*runtime.NumCPU())) if err != nil { return nil, err } + tsp := &tailSamplingSpanProcessor{ ctx: context.Background(), - maxNumTraces: maxNumTraces, - policies: policies, + nextConsumer: nextConsumer, + maxNumTraces: cfg.NumTraces, logger: logger, decisionBatcher: inBatcher, + // policies: policies, } - for _, policy := range policies { - policyCtx, err := tag.New(tsp.ctx, tag.Upsert(tagPolicyKey, policy.Name), tag.Upsert(observability.TagKeyReceiver, sourceFormat)) - if err != nil { - return nil, err - } - policy.ctx = policyCtx - } + // TODO(#146): add policies to TailBasedCfg + // for _, policy := range policies { + // policyCtx, err := tag.New(tsp.ctx, tag.Upsert(tagPolicyKey, policy.Name), tag.Upsert(observability.TagKeyReceiver, sourceFormat)) + // if err != nil { + // return nil, err + // } + // policy.ctx = policyCtx + // } tsp.policyTicker = &policyTicker{onTick: tsp.samplingPolicyOnTick} - tsp.deleteChan = make(chan traceKey, maxNumTraces) + tsp.deleteChan = make(chan traceKey, cfg.NumTraces) + return tsp, nil } diff --git a/internal/collector/processor/tailsampling/processor_test.go b/internal/collector/processor/tailsampling/processor_test.go index 5c8e0a116ef..a2755f58a52 100644 --- a/internal/collector/processor/tailsampling/processor_test.go +++ b/internal/collector/processor/tailsampling/processor_test.go @@ -20,14 +20,16 @@ import ( "testing" "time" + tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" + "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-service/consumer" "github.com/open-telemetry/opentelemetry-service/consumer/consumerdata" + "github.com/open-telemetry/opentelemetry-service/exporter/exportertest" "github.com/open-telemetry/opentelemetry-service/internal/collector/processor/idbatcher" "github.com/open-telemetry/opentelemetry-service/internal/collector/sampling" + "github.com/open-telemetry/opentelemetry-service/service/builder" tracetranslator "github.com/open-telemetry/opentelemetry-service/translator/trace" - - tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" - "go.uber.org/zap" ) const ( @@ -36,7 +38,12 @@ const ( func TestSequentialTraceArrival(t *testing.T) { traceIds, batches := generateIdsAndBatches(128) - sp, _ := NewTailSamplingSpanProcessor(newTestPolicy(), uint64(2*len(traceIds)), 64, defaultTestDecisionWait, zap.NewNop()) + cfg := builder.TailBasedCfg{ + DecisionWait: defaultTestDecisionWait, + NumTraces: uint64(2 * len(traceIds)), + ExpectedNewTracesPerSec: 64, + } + sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg) tsp := sp.(*tailSamplingSpanProcessor) for _, batch := range batches { tsp.ConsumeTraceData(context.Background(), batch) @@ -57,7 +64,12 @@ func TestConcurrentTraceArrival(t *testing.T) { traceIds, batches := generateIdsAndBatches(128) var wg sync.WaitGroup - sp, _ := NewTailSamplingSpanProcessor(newTestPolicy(), uint64(2*len(traceIds)), 64, defaultTestDecisionWait, zap.NewNop()) + cfg := builder.TailBasedCfg{ + DecisionWait: defaultTestDecisionWait, + NumTraces: uint64(2 * len(traceIds)), + ExpectedNewTracesPerSec: 64, + } + sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg) tsp := sp.(*tailSamplingSpanProcessor) for _, batch := range batches { // Add the same traceId twice. @@ -90,7 +102,12 @@ func TestConcurrentTraceArrival(t *testing.T) { func TestSequentialTraceMapSize(t *testing.T) { traceIds, batches := generateIdsAndBatches(210) const maxSize = 100 - sp, _ := NewTailSamplingSpanProcessor(newTestPolicy(), uint64(maxSize), 64, defaultTestDecisionWait, zap.NewNop()) + cfg := builder.TailBasedCfg{ + DecisionWait: defaultTestDecisionWait, + NumTraces: uint64(maxSize), + ExpectedNewTracesPerSec: 64, + } + sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg) tsp := sp.(*tailSamplingSpanProcessor) for _, batch := range batches { tsp.ConsumeTraceData(context.Background(), batch) @@ -108,7 +125,12 @@ func TestConcurrentTraceMapSize(t *testing.T) { _, batches := generateIdsAndBatches(210) const maxSize = 100 var wg sync.WaitGroup - sp, _ := NewTailSamplingSpanProcessor(newTestPolicy(), uint64(maxSize), 64, defaultTestDecisionWait, zap.NewNop()) + cfg := builder.TailBasedCfg{ + DecisionWait: defaultTestDecisionWait, + NumTraces: uint64(maxSize), + ExpectedNewTracesPerSec: 64, + } + sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg) tsp := sp.(*tailSamplingSpanProcessor) for _, batch := range batches { wg.Add(1) @@ -133,19 +155,17 @@ func TestConcurrentTraceMapSize(t *testing.T) { } func TestSamplingPolicyTypicalPath(t *testing.T) { + t.Skip("TODO(#146): add policies to TailBasedCfg and fix this test") const maxSize = 100 const decisionWaitSeconds = 5 - decisionWait := time.Second * decisionWaitSeconds msp := &mockSpanProcessor{} mpe := &mockPolicyEvaluator{} - testPolicy := []*Policy{ - { - Name: "test", - Evaluator: mpe, - Destination: msp, - }, + cfg := builder.TailBasedCfg{ + DecisionWait: defaultTestDecisionWait, + NumTraces: uint64(maxSize), + ExpectedNewTracesPerSec: 64, } - sp, _ := NewTailSamplingSpanProcessor(testPolicy, maxSize, 64, decisionWait, zap.NewNop()) + sp, _ := NewTraceProcessor(zap.NewNop(), &exportertest.SinkTraceExporter{}, cfg) tsp := sp.(*tailSamplingSpanProcessor) // For this test explicitly control the timer calls and batcher. diff --git a/service/builder/sampling_builder.go b/service/builder/sampling_builder.go index fe323459280..c7ceb188fe3 100644 --- a/service/builder/sampling_builder.go +++ b/service/builder/sampling_builder.go @@ -18,6 +18,8 @@ import ( "time" "github.com/spf13/viper" + + "github.com/open-telemetry/opentelemetry-service/config/configmodels" ) const ( @@ -159,12 +161,16 @@ func (sCfg *SamplingCfg) InitFromViper(v *viper.Viper) *SamplingCfg { // TailBasedCfg holds the configuration for tail-based sampling. type TailBasedCfg struct { + configmodels.ProcessorSettings `mapstructure:",squash"` // DecisionWait is the desired wait time from the arrival of the first span of // trace until the decision about sampling it or not is evaluated. DecisionWait time.Duration `mapstructure:"decision-wait"` // NumTraces is the number of traces kept on memory. Typically most of the data // of a trace is released after a sampling decision is taken. NumTraces uint64 `mapstructure:"num-traces"` + // ExpectedNewTracesPerSec sets the expected number of new traces sending to the tail sampling processor + // per second. This helps with allocating data structures with closer to actual usage size. + ExpectedNewTracesPerSec uint64 `mapstructure:"expected-new-traces-per-sec"` } // NewDefaultTailBasedCfg creates a TailBasedCfg with the default values.