From 21a70d61d66459fbe766165c23f6f58c46b268a0 Mon Sep 17 00:00:00 2001 From: Tigran Najaryan <4194920+tigrannajaryan@users.noreply.github.com> Date: Tue, 14 Jan 2020 13:20:07 -0500 Subject: [PATCH] Add a memory limiter processor (#498) This adds a processor that drops data according to configured memory limits. The processor is important for high load situations when receiving rate exceeds exporting rate (and an extreme case of this is when the target of exporting is unavailable). Typical production run will need to have this processor included in every pipeline immediately after the batch processor. --- defaults/defaults.go | 2 + defaults/defaults_test.go | 2 + processor/memorylimiter/config.go | 49 ++++ processor/memorylimiter/config_test.go | 65 +++++ processor/memorylimiter/factory.go | 86 +++++++ processor/memorylimiter/factory_test.go | 69 ++++++ processor/memorylimiter/memorylimiter.go | 225 ++++++++++++++++++ processor/memorylimiter/memorylimiter_test.go | 217 +++++++++++++++++ processor/memorylimiter/metrics.go | 94 ++++++++ processor/memorylimiter/testdata/config.yaml | 36 +++ processor/processor.go | 6 + testbed/tests/testdata/memory-limiter.yaml | 29 +++ testbed/tests/trace_test.go | 55 ++++- 13 files changed, 922 insertions(+), 13 deletions(-) create mode 100644 processor/memorylimiter/config.go create mode 100644 processor/memorylimiter/config_test.go create mode 100644 processor/memorylimiter/factory.go create mode 100644 processor/memorylimiter/factory_test.go create mode 100644 processor/memorylimiter/memorylimiter.go create mode 100644 processor/memorylimiter/memorylimiter_test.go create mode 100644 processor/memorylimiter/metrics.go create mode 100644 processor/memorylimiter/testdata/config.yaml create mode 100644 testbed/tests/testdata/memory-limiter.yaml diff --git a/defaults/defaults.go b/defaults/defaults.go index 3c17059bc0eb..c1a35eba917b 100644 --- a/defaults/defaults.go +++ b/defaults/defaults.go @@ -32,6 +32,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector/processor" "github.com/open-telemetry/opentelemetry-collector/processor/attributesprocessor" "github.com/open-telemetry/opentelemetry-collector/processor/batchprocessor" + "github.com/open-telemetry/opentelemetry-collector/processor/memorylimiter" "github.com/open-telemetry/opentelemetry-collector/processor/queuedprocessor" "github.com/open-telemetry/opentelemetry-collector/processor/samplingprocessor/probabilisticsamplerprocessor" "github.com/open-telemetry/opentelemetry-collector/processor/samplingprocessor/tailsamplingprocessor" @@ -87,6 +88,7 @@ func Components() ( &attributesprocessor.Factory{}, &queuedprocessor.Factory{}, &batchprocessor.Factory{}, + &memorylimiter.Factory{}, &tailsamplingprocessor.Factory{}, &probabilisticsamplerprocessor.Factory{}, ) diff --git a/defaults/defaults_test.go b/defaults/defaults_test.go index cf0ce1b7b71e..3271f216d330 100644 --- a/defaults/defaults_test.go +++ b/defaults/defaults_test.go @@ -36,6 +36,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector/processor" "github.com/open-telemetry/opentelemetry-collector/processor/attributesprocessor" "github.com/open-telemetry/opentelemetry-collector/processor/batchprocessor" + "github.com/open-telemetry/opentelemetry-collector/processor/memorylimiter" "github.com/open-telemetry/opentelemetry-collector/processor/queuedprocessor" "github.com/open-telemetry/opentelemetry-collector/processor/samplingprocessor/probabilisticsamplerprocessor" "github.com/open-telemetry/opentelemetry-collector/processor/samplingprocessor/tailsamplingprocessor" @@ -64,6 +65,7 @@ func TestDefaultComponents(t *testing.T) { "attributes": &attributesprocessor.Factory{}, "queued_retry": &queuedprocessor.Factory{}, "batch": &batchprocessor.Factory{}, + "memory_limiter": &memorylimiter.Factory{}, "tail_sampling": &tailsamplingprocessor.Factory{}, "probabilistic_sampler": &probabilisticsamplerprocessor.Factory{}, } diff --git a/processor/memorylimiter/config.go b/processor/memorylimiter/config.go new file mode 100644 index 000000000000..6c7c3840ca76 --- /dev/null +++ b/processor/memorylimiter/config.go @@ -0,0 +1,49 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package memorylimiter provides a processor for OpenTelemetry Service pipeline +// that drops data on the pipeline according to the current state of memory +// usage. +package memorylimiter + +import ( + "time" + + "github.com/open-telemetry/opentelemetry-collector/config/configmodels" +) + +// Config defines configuration for memory memoryLimiter processor. +type Config struct { + configmodels.ProcessorSettings `mapstructure:",squash"` + + // CheckInterval is the time between measurements of memory usage for the + // purposes of avoiding going over the limits. Defaults to zero, so no + // checks will be performed. + CheckInterval time.Duration `mapstructure:"check_interval"` + + // MemoryLimitMiB is the maximum amount of memory, in MiB, targeted to be + // allocated by the process. + MemoryLimitMiB uint32 `mapstructure:"limit_mib"` + + // MemorySpikeLimitMiB is the maximum, in MiB, spike expected between the + // measurements of memory usage. + MemorySpikeLimitMiB uint32 `mapstructure:"spike_limit_mib"` + + // BallastSizeMiB is the size, in MiB, of the ballast size being used by the + // process. + BallastSizeMiB uint32 `mapstructure:"ballast_size_mib"` +} + +// Name of BallastSizeMiB config option. +const ballastSizeMibKey = "ballast_size_mib" diff --git a/processor/memorylimiter/config_test.go b/processor/memorylimiter/config_test.go new file mode 100644 index 000000000000..b5367f7669b0 --- /dev/null +++ b/processor/memorylimiter/config_test.go @@ -0,0 +1,65 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memorylimiter + +import ( + "path" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector/config" + "github.com/open-telemetry/opentelemetry-collector/config/configmodels" +) + +func TestLoadConfig(t *testing.T) { + factories, err := config.ExampleComponents() + require.NoError(t, err) + factory := &Factory{} + factories.Processors[typeStr] = factory + require.NoError(t, err) + + config, err := config.LoadConfigFile( + t, + path.Join(".", "testdata", "config.yaml"), + factories) + + require.Nil(t, err) + require.NotNil(t, config) + + p0 := config.Processors["memory_limiter"] + assert.Equal(t, p0, + &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "memory_limiter", + NameVal: "memory_limiter", + }, + }) + + p1 := config.Processors["memory_limiter/with-settings"] + assert.Equal(t, p1, + &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: "memory_limiter", + NameVal: "memory_limiter/with-settings", + }, + CheckInterval: 5 * time.Second, + MemoryLimitMiB: 4000, + MemorySpikeLimitMiB: 500, + BallastSizeMiB: 2000, + }) +} diff --git a/processor/memorylimiter/factory.go b/processor/memorylimiter/factory.go new file mode 100644 index 000000000000..392f259b45d0 --- /dev/null +++ b/processor/memorylimiter/factory.go @@ -0,0 +1,86 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memorylimiter + +import ( + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector/config/configmodels" + "github.com/open-telemetry/opentelemetry-collector/consumer" + "github.com/open-telemetry/opentelemetry-collector/processor" +) + +const ( + // The value of "type" Attribute Key in configuration. + typeStr = "memory_limiter" +) + +// Factory is the factory for Attribute Key processor. +type Factory struct { +} + +// Type gets the type of the config created by this factory. +func (f *Factory) Type() string { + return typeStr +} + +// CreateDefaultConfig creates the default configuration for processor. Notice +// that the default configuration is expected to fail for this processor. +func (f *Factory) CreateDefaultConfig() configmodels.Processor { + return &Config{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: typeStr, + NameVal: typeStr, + }, + } +} + +// CreateTraceProcessor creates a trace processor based on this config. +func (f *Factory) CreateTraceProcessor( + logger *zap.Logger, + nextConsumer consumer.TraceConsumer, + cfg configmodels.Processor, +) (processor.TraceProcessor, error) { + return f.createProcessor(logger, nextConsumer, nil, cfg) +} + +// CreateMetricsProcessor creates a metrics processor based on this config. +func (f *Factory) CreateMetricsProcessor( + logger *zap.Logger, + nextConsumer consumer.MetricsConsumer, + cfg configmodels.Processor, +) (processor.MetricsProcessor, error) { + return f.createProcessor(logger, nil, nextConsumer, cfg) +} + +func (f *Factory) createProcessor( + logger *zap.Logger, + traceConsumer consumer.TraceConsumer, + metricConsumer consumer.MetricsConsumer, + cfg configmodels.Processor, +) (processor.DualTypeProcessor, error) { + const mibBytes = 1024 * 1024 + pCfg := cfg.(*Config) + return New( + cfg.Name(), + traceConsumer, + metricConsumer, + pCfg.CheckInterval, + uint64(pCfg.MemoryLimitMiB)*mibBytes, + uint64(pCfg.MemorySpikeLimitMiB)*mibBytes, + uint64(pCfg.BallastSizeMiB)*mibBytes, + logger, + ) +} diff --git a/processor/memorylimiter/factory_test.go b/processor/memorylimiter/factory_test.go new file mode 100644 index 000000000000..895f5ba68ebf --- /dev/null +++ b/processor/memorylimiter/factory_test.go @@ -0,0 +1,69 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memorylimiter + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector/config/configcheck" + "github.com/open-telemetry/opentelemetry-collector/exporter/exportertest" +) + +func TestCreateDefaultConfig(t *testing.T) { + factory := &Factory{} + require.NotNil(t, factory) + + cfg := factory.CreateDefaultConfig() + assert.NotNil(t, cfg, "failed to create default config") + assert.NoError(t, configcheck.ValidateConfig(cfg)) +} + +func TestCreateProcessor(t *testing.T) { + factory := &Factory{} + require.NotNil(t, factory) + + cfg := factory.CreateDefaultConfig() + + // This processor can't be created with the default config. + tp, err := factory.CreateTraceProcessor(zap.NewNop(), exportertest.NewNopTraceExporter(), cfg) + assert.Nil(t, tp) + assert.Error(t, err, "created processor with invalid settings") + + mp, err := factory.CreateMetricsProcessor(zap.NewNop(), exportertest.NewNopMetricsExporter(), cfg) + assert.Nil(t, mp) + assert.Error(t, err, "created processor with invalid settings") + + // Create processor with a valid config. + pCfg := cfg.(*Config) + pCfg.MemoryLimitMiB = 5722 + pCfg.MemorySpikeLimitMiB = 1907 + pCfg.BallastSizeMiB = 2048 + pCfg.CheckInterval = 100 * time.Millisecond + + tp, err = factory.CreateTraceProcessor(zap.NewNop(), exportertest.NewNopTraceExporter(), cfg) + assert.NoError(t, err) + assert.NotNil(t, tp) + assert.NoError(t, tp.Shutdown()) + + mp, err = factory.CreateMetricsProcessor(zap.NewNop(), exportertest.NewNopMetricsExporter(), cfg) + assert.NoError(t, err) + assert.NotNil(t, mp) + assert.NoError(t, mp.Shutdown()) +} diff --git a/processor/memorylimiter/memorylimiter.go b/processor/memorylimiter/memorylimiter.go new file mode 100644 index 000000000000..dd01bb07917c --- /dev/null +++ b/processor/memorylimiter/memorylimiter.go @@ -0,0 +1,225 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memorylimiter + +import ( + "context" + "errors" + "runtime" + "sync/atomic" + "time" + + "go.opencensus.io/stats" + "go.opencensus.io/tag" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector/component" + "github.com/open-telemetry/opentelemetry-collector/consumer" + "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" + "github.com/open-telemetry/opentelemetry-collector/processor" +) + +var ( + // errForcedDrop will be returned to callers of ConsumeTraceData to indicate + // that data is being dropped due to high memory usage. + errForcedDrop = errors.New("data dropped due to high memory usage") + + // Construction errors + + errNilNextConsumer = errors.New("nil nextConsumer") + + errCheckIntervalOutOfRange = errors.New( + "checkInterval must be greater than zero") + + errMemAllocLimitOutOfRange = errors.New( + "memAllocLimit must be greater than zero") + + errMemSpikeLimitOutOfRange = errors.New( + "memSpikeLimit must be smaller than memAllocLimit") +) + +type memoryLimiter struct { + traceConsumer consumer.TraceConsumer + metricsConsumer consumer.MetricsConsumer + + memAllocLimit uint64 + memSpikeLimit uint64 + memCheckWait time.Duration + ballastSize uint64 + + // forceDrop is used atomically to indicate when data should be dropped. + forceDrop int64 + + ticker *time.Ticker + + // The function to read the mem values is set as a reference to help with + // testing different values. + readMemStatsFn func(m *runtime.MemStats) + + statsTags []tag.Mutator + + // Fields used for logging. + procName string + logger *zap.Logger + configMismatchedLogged bool +} + +var _ processor.DualTypeProcessor = (*memoryLimiter)(nil) + +// New returns a new memorylimiter processor. +func New( + name string, + traceConsumer consumer.TraceConsumer, + metricsConsumer consumer.MetricsConsumer, + checkInterval time.Duration, + memAllocLimit uint64, + memSpikeLimit uint64, + ballastSize uint64, + logger *zap.Logger, +) (processor.DualTypeProcessor, error) { + + if traceConsumer == nil && metricsConsumer == nil { + return nil, errNilNextConsumer + } + if checkInterval <= 0 { + return nil, errCheckIntervalOutOfRange + } + if memAllocLimit == 0 { + return nil, errMemAllocLimitOutOfRange + } + if memSpikeLimit >= memAllocLimit { + return nil, errMemSpikeLimitOutOfRange + } + + ml := &memoryLimiter{ + traceConsumer: traceConsumer, + metricsConsumer: metricsConsumer, + memAllocLimit: memAllocLimit, + memSpikeLimit: memSpikeLimit, + memCheckWait: checkInterval, + ballastSize: ballastSize, + ticker: time.NewTicker(checkInterval), + readMemStatsFn: runtime.ReadMemStats, + statsTags: statsTagsForBatch(name), + procName: name, + logger: logger, + } + + initMetrics() + + ml.startMonitoring() + + return ml, nil +} + +func (ml *memoryLimiter) ConsumeTraceData( + ctx context.Context, + td consumerdata.TraceData, +) error { + + if ml.forcingDrop() { + numSpans := len(td.Spans) + stats.RecordWithTags( + context.Background(), + ml.statsTags, + StatDroppedSpanCount.M(int64(numSpans))) + + return errForcedDrop + } + return ml.traceConsumer.ConsumeTraceData(ctx, td) +} + +func (ml *memoryLimiter) ConsumeMetricsData( + ctx context.Context, + md consumerdata.MetricsData, +) error { + + if ml.forcingDrop() { + numMetrics := len(md.Metrics) + stats.RecordWithTags( + context.Background(), + ml.statsTags, + StatDroppedMetricCount.M(int64(numMetrics))) + + return errForcedDrop + } + return ml.metricsConsumer.ConsumeMetricsData(ctx, md) +} + +func (ml *memoryLimiter) GetCapabilities() processor.Capabilities { + return processor.Capabilities{MutatesConsumedData: false} +} + +func (ml *memoryLimiter) Start(host component.Host) error { + return nil +} + +func (ml *memoryLimiter) Shutdown() error { + ml.ticker.Stop() + return nil +} + +func (ml *memoryLimiter) readMemStats(ms *runtime.MemStats) { + ml.readMemStatsFn(ms) + // If proper configured ms.Alloc should be at least ml.ballastSize but since + // a misconfiguration is possible check for that here. + if ms.Alloc >= ml.ballastSize { + ms.Alloc -= ml.ballastSize + } else { + // This indicates misconfiguration. Log it once. + if !ml.configMismatchedLogged { + ml.configMismatchedLogged = true + ml.logger.Warn(typeStr+" is likely incorrectly configured. "+ballastSizeMibKey+ + " must be set equal to --mem-ballast-size-mib command line option.", + zap.String("processor", ml.procName)) + } + } +} + +// startMonitoring starts a ticker'd goroutine that will check memory usage +// every checkInterval period. +func (ml *memoryLimiter) startMonitoring() { + go func() { + for range ml.ticker.C { + ml.memCheck() + } + }() +} + +// forcingDrop indicates when memory resources need to be released. +func (ml *memoryLimiter) forcingDrop() bool { + return atomic.LoadInt64(&ml.forceDrop) != 0 +} + +func (ml *memoryLimiter) memCheck() { + ms := &runtime.MemStats{} + ml.readMemStats(ms) + ml.memLimiting(ms) +} + +func (ml *memoryLimiter) shouldForceDrop(ms *runtime.MemStats) bool { + return ml.memAllocLimit <= ms.Alloc || ml.memAllocLimit-ms.Alloc <= ml.memSpikeLimit +} + +func (ml *memoryLimiter) memLimiting(ms *runtime.MemStats) { + if !ml.shouldForceDrop(ms) { + atomic.StoreInt64(&ml.forceDrop, 0) + } else { + atomic.StoreInt64(&ml.forceDrop, 1) + // Force a GC at this point and see if this is enough to get to + // the desired level. + runtime.GC() + } +} diff --git a/processor/memorylimiter/memorylimiter_test.go b/processor/memorylimiter/memorylimiter_test.go new file mode 100644 index 000000000000..8ae7217e10db --- /dev/null +++ b/processor/memorylimiter/memorylimiter_test.go @@ -0,0 +1,217 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memorylimiter + +import ( + "context" + "runtime" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector/consumer" + "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" + "github.com/open-telemetry/opentelemetry-collector/exporter/exportertest" +) + +func TestNew(t *testing.T) { + type args struct { + nextConsumer consumer.TraceConsumer + checkInterval time.Duration + memAllocLimit uint64 + memSpikeLimit uint64 + ballastSize uint64 + } + sink := new(exportertest.SinkTraceExporter) + tests := []struct { + name string + args args + wantErr error + }{ + { + name: "nil_nextConsumer", + wantErr: errNilNextConsumer, + }, + { + name: "zero_checkInterval", + args: args{ + nextConsumer: sink, + }, + wantErr: errCheckIntervalOutOfRange, + }, + { + name: "zero_memAllocLimit", + args: args{ + nextConsumer: sink, + checkInterval: 100 * time.Millisecond, + }, + wantErr: errMemAllocLimitOutOfRange, + }, + { + name: "memSpikeLimit_gt_memAllocLimit", + args: args{ + nextConsumer: sink, + checkInterval: 100 * time.Millisecond, + memAllocLimit: 1024, + memSpikeLimit: 2048, + }, + wantErr: errMemSpikeLimitOutOfRange, + }, + { + name: "success", + args: args{ + nextConsumer: sink, + checkInterval: 100 * time.Millisecond, + memAllocLimit: 1e10, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := New( + "test", + tt.args.nextConsumer, + nil, + tt.args.checkInterval, + tt.args.memAllocLimit, + tt.args.memSpikeLimit, + tt.args.ballastSize, + zap.NewNop()) + if err != tt.wantErr { + t.Errorf("New() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != nil { + assert.NoError(t, got.Shutdown()) + } + }) + } +} + +// TestMetricsMemoryPressureResponse manipulates results from querying memory and +// check expected side effects. +func TestMetricsMemoryPressureResponse(t *testing.T) { + var currentMemAlloc uint64 + sink := new(exportertest.SinkMetricsExporter) + ml := &memoryLimiter{ + metricsConsumer: sink, + memAllocLimit: 1024, + readMemStatsFn: func(ms *runtime.MemStats) { + ms.Alloc = currentMemAlloc + }, + } + + ctx := context.Background() + td := consumerdata.MetricsData{} + + // Below memAllocLimit. + currentMemAlloc = 800 + ml.memCheck() + assert.NoError(t, ml.ConsumeMetricsData(ctx, td)) + + // Above memAllocLimit. + currentMemAlloc = 1800 + ml.memCheck() + assert.Equal(t, errForcedDrop, ml.ConsumeMetricsData(ctx, td)) + + // Check ballast effect + ml.ballastSize = 1000 + + // Below memAllocLimit accounting for ballast. + currentMemAlloc = 800 + ml.ballastSize + ml.memCheck() + assert.NoError(t, ml.ConsumeMetricsData(ctx, td)) + + // Above memAllocLimit even accountiing for ballast. + currentMemAlloc = 1800 + ml.ballastSize + ml.memCheck() + assert.Equal(t, errForcedDrop, ml.ConsumeMetricsData(ctx, td)) + + // Restore ballast to default. + ml.ballastSize = 0 + + // Check spike limit + ml.memSpikeLimit = 512 + + // Below memSpikeLimit. + currentMemAlloc = 500 + ml.memCheck() + assert.NoError(t, ml.ConsumeMetricsData(ctx, td)) + + // Above memSpikeLimit. + currentMemAlloc = 550 + ml.memCheck() + assert.Equal(t, errForcedDrop, ml.ConsumeMetricsData(ctx, td)) + +} + +// TestTraceMemoryPressureResponse manipulates results from querying memory and +// check expected side effects. +func TestTraceMemoryPressureResponse(t *testing.T) { + var currentMemAlloc uint64 + sink := new(exportertest.SinkTraceExporter) + ml := &memoryLimiter{ + traceConsumer: sink, + memAllocLimit: 1024, + readMemStatsFn: func(ms *runtime.MemStats) { + ms.Alloc = currentMemAlloc + }, + } + + ctx := context.Background() + td := consumerdata.TraceData{} + + // Below memAllocLimit. + currentMemAlloc = 800 + ml.memCheck() + assert.NoError(t, ml.ConsumeTraceData(ctx, td)) + + // Above memAllocLimit. + currentMemAlloc = 1800 + ml.memCheck() + assert.Equal(t, errForcedDrop, ml.ConsumeTraceData(ctx, td)) + + // Check ballast effect + ml.ballastSize = 1000 + + // Below memAllocLimit accounting for ballast. + currentMemAlloc = 800 + ml.ballastSize + ml.memCheck() + assert.NoError(t, ml.ConsumeTraceData(ctx, td)) + + // Above memAllocLimit even accountiing for ballast. + currentMemAlloc = 1800 + ml.ballastSize + ml.memCheck() + assert.Equal(t, errForcedDrop, ml.ConsumeTraceData(ctx, td)) + + // Restore ballast to default. + ml.ballastSize = 0 + + // Check spike limit + ml.memSpikeLimit = 512 + + // Below memSpikeLimit. + currentMemAlloc = 500 + ml.memCheck() + assert.NoError(t, ml.ConsumeTraceData(ctx, td)) + + // Above memSpikeLimit. + currentMemAlloc = 550 + ml.memCheck() + assert.Equal(t, errForcedDrop, ml.ConsumeTraceData(ctx, td)) + +} diff --git a/processor/memorylimiter/metrics.go b/processor/memorylimiter/metrics.go new file mode 100644 index 000000000000..f60b1dea8f57 --- /dev/null +++ b/processor/memorylimiter/metrics.go @@ -0,0 +1,94 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This file contains metrics to record dropped data via memory limiter, +// the package and its int wouldn't be necessary when proper dependencies are +// exposed via packages. + +package memorylimiter + +import ( + "sync" + + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" +) + +// Keys and stats for telemetry. +var ( + TagExporterNameKey, _ = tag.NewKey("exporter") + + StatDroppedSpanCount = stats.Int64( + "spans_dropped", + "counts the number of spans dropped", + stats.UnitDimensionless) + + StatDroppedMetricCount = stats.Int64( + "metrics_dropped", + "counts the number of metrics dropped", + stats.UnitDimensionless) +) + +var initOnce sync.Once + +func initMetrics() { + initOnce.Do(func() { + tagKeys := []tag.Key{ + TagExporterNameKey, + } + droppedSpanBatchesView := &view.View{ + Name: "batches_dropped", + Measure: StatDroppedSpanCount, + Description: "The number of span batches dropped.", + TagKeys: tagKeys, + Aggregation: view.Count(), + } + droppedSpansView := &view.View{ + Name: StatDroppedSpanCount.Name(), + Measure: StatDroppedSpanCount, + Description: "The number of spans dropped.", + TagKeys: tagKeys, + Aggregation: view.Sum(), + } + + droppedMetricBatchesView := &view.View{ + Name: "batches_dropped", + Measure: StatDroppedMetricCount, + Description: "The number of metric batches dropped.", + TagKeys: tagKeys, + Aggregation: view.Count(), + } + droppedMetricsView := &view.View{ + Name: StatDroppedMetricCount.Name(), + Measure: StatDroppedMetricCount, + Description: "The number of metrics dropped.", + TagKeys: tagKeys, + Aggregation: view.Sum(), + } + + view.Register(droppedSpanBatchesView, droppedSpansView, droppedMetricBatchesView, droppedMetricsView) + }) +} + +// statsTagsForBatch creates a tag.Mutator that can be used to add a metric +// label with the processorName to context.Context via stats.RecordWithTags +// function. This ensures uniformity of labels for the metrics. +func statsTagsForBatch(processorName string) []tag.Mutator { + statsTags := []tag.Mutator{ + tag.Upsert(TagExporterNameKey, processorName), + } + + return statsTags +} diff --git a/processor/memorylimiter/testdata/config.yaml b/processor/memorylimiter/testdata/config.yaml new file mode 100644 index 000000000000..5f6504ba93c2 --- /dev/null +++ b/processor/memorylimiter/testdata/config.yaml @@ -0,0 +1,36 @@ +receivers: + examplereceiver: + +processors: + memory_limiter: + # empty config + + memory_limiter/with-settings: + # check_interval is the time between measurements of memory usage for the + # purposes of avoiding going over the limits. Defaults to zero, so no + # checks will be performed. Values below 1 second are not recommended since + # it can result in unnecessary CPU consumption. + check_interval: 5s + + # Maximum amount of memory, in MiB, targeted to be allocated by the process heap. + # Note that typically the total memory usage of process will be about 50MiB higher + # than this value. + limit_mib: 4000 + + # The maximum, in MiB, spike expected between the measurements of memory usage. + spike_limit_mib: 500 + + # BallastSizeMiB is the size, in MiB, of the ballast size being used by the process. + # This must match the value of mem-ballast-size-mib command line option (if used) + # otherwise the memory limiter will not work correctly. + ballast_size_mib: 2000 + +exporters: + exampleexporter: + +service: + pipelines: + traces: + receivers: [examplereceiver] + processors: [memory_limiter/with-settings] + exporters: [exampleexporter] diff --git a/processor/processor.go b/processor/processor.go index 4e2115877af7..88497a986e07 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -41,6 +41,12 @@ type MetricsProcessor interface { Processor } +type DualTypeProcessor interface { + consumer.TraceConsumer + consumer.MetricsConsumer + Processor +} + // Capabilities describes the capabilities of TraceProcessor or MetricsProcessor. type Capabilities struct { // MutatesConsumedData is set to true if ConsumeTraceData or ConsumeMetricsData diff --git a/testbed/tests/testdata/memory-limiter.yaml b/testbed/tests/testdata/memory-limiter.yaml new file mode 100644 index 000000000000..d2d52a25418b --- /dev/null +++ b/testbed/tests/testdata/memory-limiter.yaml @@ -0,0 +1,29 @@ +receivers: + jaeger: + protocols: + thrift-http: + endpoint: "localhost:14268" + opencensus: + endpoint: "localhost:55678" + +exporters: + opencensus: + endpoint: "localhost:56565" + logging: + loglevel: debug + +processors: + queued_retry: + memory_limiter: + check_interval: 1s + limit_mib: 10 + +service: + pipelines: + traces: + receivers: [jaeger] + processors: [memory_limiter,queued_retry] + exporters: [opencensus,logging] + metrics: + receivers: [opencensus] + exporters: [opencensus,logging] diff --git a/testbed/tests/trace_test.go b/testbed/tests/trace_test.go index c6d3cc1e907c..1654d880ac61 100644 --- a/testbed/tests/trace_test.go +++ b/testbed/tests/trace_test.go @@ -24,6 +24,8 @@ import ( "path" "testing" + "github.com/stretchr/testify/assert" + "github.com/open-telemetry/opentelemetry-collector/testbed/testbed" ) @@ -73,22 +75,49 @@ func TestTrace10kSPS(t *testing.T) { } func TestTraceNoBackend10kSPSJaeger(t *testing.T) { - tc := testbed.NewTestCase( - t, - testbed.NewJaegerThriftDataSender(testbed.DefaultJaegerPort), - testbed.NewOCDataReceiver(testbed.DefaultOCPort), - ) - defer tc.Stop() + tests := []struct { + name string + configFileName string + expectedMaxRAM uint32 + expectedMinFinalRAM uint32 + }{ + {name: "NoMemoryLimiter", configFileName: "agent-config.yaml", expectedMaxRAM: 200, expectedMinFinalRAM: 100}, + + // Memory limiter in memory-limiter.yaml is configured to allow max 10MiB of heap size. + // However, heap is not the only memory user, so the total limit we set for this + // test is 60MiB. Note: to ensure this test verifies memorylimiter correctly + // expectedMaxRAM of this test case must be lower than expectedMinFinalRAM of the + // previous test case (which runs without memorylimiter). + {name: "MemoryLimiter", configFileName: "memory-limiter.yaml", expectedMaxRAM: 60, expectedMinFinalRAM: 10}, + } - tc.SetResourceLimits(testbed.ResourceSpec{ - ExpectedMaxCPU: 60, - ExpectedMaxRAM: 198, - }) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { - tc.StartAgent() - tc.StartLoad(testbed.LoadOptions{DataItemsPerSecond: 10000}) + configFilePath := path.Join("testdata", test.configFileName) - tc.Sleep(tc.Duration) + tc := testbed.NewTestCase( + t, + testbed.NewJaegerThriftDataSender(testbed.DefaultJaegerPort), + testbed.NewOCDataReceiver(testbed.DefaultOCPort), + testbed.WithConfigFile(configFilePath), + ) + defer tc.Stop() + + tc.SetResourceLimits(testbed.ResourceSpec{ + ExpectedMaxCPU: 60, + ExpectedMaxRAM: 198, + }) + + tc.StartAgent() + tc.StartLoad(testbed.LoadOptions{DataItemsPerSecond: 10000}) + + tc.Sleep(tc.Duration) + + rss, _, _ := tc.AgentMemoryInfo() + assert.True(t, rss > test.expectedMinFinalRAM) + }) + } } func TestTrace1kSPSWithAttrs(t *testing.T) {