From f3c7466b0a57cb582e08e5f26a15eaff911986d3 Mon Sep 17 00:00:00 2001 From: timn Date: Mon, 20 Nov 2023 16:10:16 -0500 Subject: [PATCH] add memory limiter extension, a copy of memorylimiter processor --- .chloggen/extension-memory-limiter.yaml | 25 ++ extension/memorylimiter/Makefile | 1 + extension/memorylimiter/README.md | 101 +++++++ extension/memorylimiter/config.go | 51 ++++ extension/memorylimiter/config_test.go | 58 ++++ extension/memorylimiter/factory.go | 35 +++ extension/memorylimiter/factory_test.go | 53 ++++ extension/memorylimiter/go.mod | 88 ++++++ extension/memorylimiter/go.sum | 130 +++++++++ .../memorylimiter/internal/mock_exporter.go | 67 +++++ .../memorylimiter/internal/mock_receiver.go | 61 ++++ extension/memorylimiter/memorylimiter.go | 263 ++++++++++++++++++ extension/memorylimiter/memorylimiter_test.go | 221 +++++++++++++++ extension/memorylimiter/testdata/config.yaml | 19 ++ 14 files changed, 1173 insertions(+) create mode 100644 .chloggen/extension-memory-limiter.yaml create mode 100644 extension/memorylimiter/Makefile create mode 100644 extension/memorylimiter/README.md create mode 100644 extension/memorylimiter/config.go create mode 100644 extension/memorylimiter/config_test.go create mode 100644 extension/memorylimiter/factory.go create mode 100644 extension/memorylimiter/factory_test.go create mode 100644 extension/memorylimiter/go.mod create mode 100644 extension/memorylimiter/go.sum create mode 100644 extension/memorylimiter/internal/mock_exporter.go create mode 100644 extension/memorylimiter/internal/mock_receiver.go create mode 100644 extension/memorylimiter/memorylimiter.go create mode 100644 extension/memorylimiter/memorylimiter_test.go create mode 100644 extension/memorylimiter/testdata/config.yaml diff --git a/.chloggen/extension-memory-limiter.yaml b/.chloggen/extension-memory-limiter.yaml new file mode 100644 index 000000000000..90b02b3ffb8e --- /dev/null +++ b/.chloggen/extension-memory-limiter.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: [new_component] + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: extension_memoryLimiter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: move memory limiter processor to extension to allow connection limits at the receiver + +# One or more tracking issues or pull requests related to the change +issues: [8632] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] \ No newline at end of file diff --git a/extension/memorylimiter/Makefile b/extension/memorylimiter/Makefile new file mode 100644 index 000000000000..ded7a36092dc --- /dev/null +++ b/extension/memorylimiter/Makefile @@ -0,0 +1 @@ +include ../../Makefile.Common diff --git a/extension/memorylimiter/README.md b/extension/memorylimiter/README.md new file mode 100644 index 000000000000..cab5c7b25daa --- /dev/null +++ b/extension/memorylimiter/README.md @@ -0,0 +1,101 @@ +# Memory Limiter extension + +| Status | | +|--------------------------|-------------------| +| Stability | [alpha] | +| Distributions | [core], [contrib] | + +The memory limiter extension is used to prevent out of memory situations on +the collector. Given that the amount and type of data the collector processes is +environment specific and resource utilization of the collector is also dependent +on the configured extensions, it is important to put checks in place regarding +memory usage. + +The memory_limiter extension allows to perform periodic checks of memory +usage if it exceeds defined limits will begin refusing data and forcing GC to reduce +memory consumption. + +The memory_limiter uses soft and hard memory limits. Hard limit is always above or equal +the soft limit. + +When the memory usage exceeds the soft limit the extension will enter the memory limited +mode and will start refusing the data by returning errors to the preceding component. +The preceding component should be normally a receiver. + +In memory limited mode the error returned by CheckMemory function is a +non-permanent error. When receivers see this error they are expected to retry sending +the same data. The receivers may also apply a backpressure to their data sources +in order to slow down the inflow of data into the Collector and allow the memory usage +to go below the limits. + +When the memory usage is above the hard limit in addition to refusing the data the +extension will forcedly perform garbage collection in order to try to free memory. + +When the memory usage drop below the soft limit, the normal operation is resumed (data +will no longer be refused and no forced garbage collection will be performed). + +The difference between the soft limit and hard limits is defined via `spike_limit_mib` +configuration option. The value of this option should be selected in a way that ensures +that between the memory check intervals the memory usage cannot increase by more than this +value (otherwise memory usage may exceed the hard limit - even if temporarily). +A good starting point for `spike_limit_mib` is 20% of the hard limit. Bigger +`spike_limit_mib` values may be necessary for spiky traffic or for longer check intervals. + +Note that while the extension can help mitigate out of memory situations, +it is not a replacement for properly sizing and configuring the +collector. Keep in mind that if the soft limit is crossed, the collector will +return errors to all receive operations until enough memory is freed. This may +eventually result in dropped data since the receivers may not be able to hold back +and retry the data indefinitely. + +Please refer to [config.go](./config.go) for the config spec. + +The following configuration options **must be changed**: +- `check_interval` (default = 0s): Time between measurements of memory +usage. The recommended value is 1 second. +If the expected traffic to the Collector is very spiky then decrease the `check_interval` +or increase `spike_limit_mib` to avoid memory usage going over the hard limit. +- `limit_mib` (default = 0): Maximum amount of memory, in MiB, targeted to be +allocated by the process heap. Note that typically the total memory usage of +process will be about 50MiB higher than this value. This defines the hard limit. +- `spike_limit_mib` (default = 20% of `limit_mib`): Maximum spike expected between the +measurements of memory usage. The value must be less than `limit_mib`. The soft limit +value will be equal to (limit_mib - spike_limit_mib). +The recommended value for `spike_limit_mib` is about 20% `limit_mib`. +- `limit_percentage` (default = 0): Maximum amount of total memory targeted to be +allocated by the process heap. This configuration is supported on Linux systems with cgroups +and it's intended to be used in dynamic platforms like docker. +This option is used to calculate `memory_limit` from the total available memory. +For instance setting of 75% with the total memory of 1GiB will result in the limit of 750 MiB. +The fixed memory setting (`limit_mib`) takes precedence +over the percentage configuration. +- `spike_limit_percentage` (default = 0): Maximum spike expected between the +measurements of memory usage. The value must be less than `limit_percentage`. +This option is used to calculate `spike_limit_mib` from the total available memory. +For instance setting of 25% with the total memory of 1GiB will result in the spike limit of 250MiB. +This option is intended to be used only with `limit_percentage`. + +Examples: + +```yaml +extensions: + memory_limiter: + check_interval: 1s + limit_mib: 4000 + spike_limit_mib: 800 +``` + +```yaml +extensions: + memory_limiter: + check_interval: 1s + limit_percentage: 50 + spike_limit_percentage: 30 +``` + +Refer to [config.yaml](./testdata/config.yaml) for detailed +examples on using the extension. + +[beta]: https://github.com/open-telemetry/opentelemetry-collector#beta +[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib +[core]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol diff --git a/extension/memorylimiter/config.go b/extension/memorylimiter/config.go new file mode 100644 index 000000000000..4432ae1ac64e --- /dev/null +++ b/extension/memorylimiter/config.go @@ -0,0 +1,51 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package memorylimiter provides an extension for OpenTelemetry Service +// that refuses data according to the current state of memory usage. +package memorylimiter // import "go.opentelemetry.io/collector/extension/memorylimiter" + +import ( + "time" + + "go.opentelemetry.io/collector/component" +) + +// Config defines configuration for memory memoryLimiter extension. +type Config struct { + // CheckInterval is the time between measurements of memory usage for the + // purposes of avoiding going over the limits. Defaults to zero, so no + // checks will be performed. + CheckInterval time.Duration `mapstructure:"check_interval"` + + // MemoryLimitMiB is the maximum amount of memory, in MiB, targeted to be + // allocated by the extension. + MemoryLimitMiB uint32 `mapstructure:"limit_mib"` + + // MemorySpikeLimitMiB is the maximum, in MiB, spike expected between the + // measurements of memory usage. + MemorySpikeLimitMiB uint32 `mapstructure:"spike_limit_mib"` + + // MemoryLimitPercentage is the maximum amount of memory, in %, targeted to be + // allocated by the extension. The fixed memory settings MemoryLimitMiB has a higher precedence. + MemoryLimitPercentage uint32 `mapstructure:"limit_percentage"` + + // MemorySpikePercentage is the maximum, in percents against the total memory, + // spike expected between the measurements of memory usage. + MemorySpikePercentage uint32 `mapstructure:"spike_limit_percentage"` +} + +var _ component.Config = (*Config)(nil) + +// Validate checks if the extension configuration is valid +func (cfg *Config) Validate() error { + if cfg.CheckInterval <= 0 { + return errCheckIntervalOutOfRange + } + + if cfg.MemoryLimitMiB == 0 && cfg.MemoryLimitPercentage == 0 { + return errLimitOutOfRange + } + + return nil +} diff --git a/extension/memorylimiter/config_test.go b/extension/memorylimiter/config_test.go new file mode 100644 index 000000000000..0286ede2e623 --- /dev/null +++ b/extension/memorylimiter/config_test.go @@ -0,0 +1,58 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package memorylimiter + +import ( + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/confmap/confmaptest" +) + +func TestUnmarshalDefaultConfig(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + assert.NoError(t, component.UnmarshalConfig(confmap.New(), cfg)) + assert.Equal(t, factory.CreateDefaultConfig(), cfg) +} + +func TestUnmarshalConfig(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + assert.NoError(t, component.UnmarshalConfig(cm, cfg)) + assert.Equal(t, + &Config{ + CheckInterval: 5 * time.Second, + MemoryLimitMiB: 4000, + MemorySpikeLimitMiB: 500, + }, cfg) +} + +func TestValidateConfig(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + // Valid Config + assert.NoError(t, component.UnmarshalConfig(cm, cfg)) + assert.NoError(t, cfg.(*Config).Validate()) + // Invalid Interval + pCfg := cfg.(*Config) + pCfg.CheckInterval = 0 + assert.Error(t, pCfg.Validate(), errCheckIntervalOutOfRange) + // Invalid mem limit + pCfg = cfg.(*Config) + pCfg.CheckInterval = 1 + pCfg.MemoryLimitMiB = 0 + pCfg.MemoryLimitPercentage = 0 + assert.Error(t, pCfg.Validate(), errLimitOutOfRange) +} diff --git a/extension/memorylimiter/factory.go b/extension/memorylimiter/factory.go new file mode 100644 index 000000000000..05f5f6e8d10b --- /dev/null +++ b/extension/memorylimiter/factory.go @@ -0,0 +1,35 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package memorylimiter // import "go.opentelemetry.io/collector/extension/memorylimiter" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/extension" +) + +const ( + // The value of "type" Attribute Key in configuration. + typeStr = "memory_limiter" +) + +// NewFactory returns a new factory for the Memory Limiter extension. +func NewFactory() extension.Factory { + return extension.NewFactory( + typeStr, + createDefaultConfig, + createExtension, + component.StabilityLevelAlpha) +} + +// CreateDefaultConfig creates the default configuration for extension. Notice +// that the default configuration is expected to fail for this extension. +func createDefaultConfig() component.Config { + return &Config{} +} + +func createExtension(_ context.Context, set extension.CreateSettings, cfg component.Config) (extension.Extension, error) { + return newMemoryLimiter(cfg.(*Config), set.TelemetrySettings.Logger) +} diff --git a/extension/memorylimiter/factory_test.go b/extension/memorylimiter/factory_test.go new file mode 100644 index 000000000000..054fa3cc5aee --- /dev/null +++ b/extension/memorylimiter/factory_test.go @@ -0,0 +1,53 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package memorylimiter + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/extension/extensiontest" + + "go.opentelemetry.io/collector/component/componenttest" +) + +func TestCreateDefaultConfig(t *testing.T) { + factory := NewFactory() + require.NotNil(t, factory) + + cfg := factory.CreateDefaultConfig() + assert.NotNil(t, cfg, "failed to create default config") + assert.NoError(t, componenttest.CheckConfigStruct(cfg)) +} + +func TestCreateExtension(t *testing.T) { + factory := NewFactory() + require.NotNil(t, factory) + + cfg := factory.CreateDefaultConfig() + // This extension can't be created with the default config. + tp, err := factory.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), cfg) + assert.Nil(t, tp) + assert.Error(t, err, "created extension with invalid settings") + + // Create extension with a valid config. + pCfg := cfg.(*Config) + pCfg.MemoryLimitMiB = 5722 + pCfg.MemorySpikeLimitMiB = 1907 + pCfg.CheckInterval = 100 * time.Millisecond + + tp, err = factory.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), cfg) + assert.NoError(t, err) + assert.NotNil(t, tp) + // test if we can shutdown a monitoring routine that has not started + assert.ErrorIs(t, tp.Shutdown(context.Background()), errShutdownNotStarted) + assert.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost())) + + assert.NoError(t, tp.Shutdown(context.Background())) + // verify that no monitoring routine is running + assert.Error(t, tp.Shutdown(context.Background())) +} diff --git a/extension/memorylimiter/go.mod b/extension/memorylimiter/go.mod new file mode 100644 index 000000000000..611d8ebf063f --- /dev/null +++ b/extension/memorylimiter/go.mod @@ -0,0 +1,88 @@ +module go.opentelemetry.io/collector/extension/memorylimiter + +go 1.20 + +require ( + github.com/stretchr/testify v1.8.4 + go.opentelemetry.io/collector v0.89.0 + go.opentelemetry.io/collector/component v0.89.0 + go.opentelemetry.io/collector/confmap v0.89.0 + go.opentelemetry.io/collector/consumer v0.89.0 + go.opentelemetry.io/collector/extension v0.89.0 + go.opentelemetry.io/collector/pdata v1.0.0-rcv0018 + go.uber.org/zap v1.26.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/go-ole/go-ole v1.2.6 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/hashicorp/go-version v1.6.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/knadh/koanf/maps v0.1.1 // indirect + github.com/knadh/koanf/providers/confmap v0.1.0 // indirect + github.com/knadh/koanf/v2 v2.0.1 // indirect + github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect + github.com/mitchellh/copystructure v1.2.0 // indirect + github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 // indirect + github.com/mitchellh/reflectwalk v1.0.2 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect + github.com/shirou/gopsutil/v3 v3.23.10 // indirect + github.com/tklauser/go-sysconf v0.3.12 // indirect + github.com/tklauser/numcpus v0.6.1 // indirect + github.com/yusufpapurcu/wmi v1.2.3 // indirect + go.opentelemetry.io/collector/config/configtelemetry v0.89.0 // indirect + go.opentelemetry.io/collector/featuregate v1.0.0-rcv0018 // indirect + go.opentelemetry.io/otel v1.20.0 // indirect + go.opentelemetry.io/otel/metric v1.20.0 // indirect + go.opentelemetry.io/otel/trace v1.20.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + golang.org/x/net v0.18.0 // indirect + golang.org/x/sys v0.14.0 // indirect + golang.org/x/text v0.14.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect + google.golang.org/grpc v1.59.0 // indirect + google.golang.org/protobuf v1.31.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace go.opentelemetry.io/collector => ../../ + +replace go.opentelemetry.io/collector/processor => ../ + +replace go.opentelemetry.io/collector/component => ../../component + +replace go.opentelemetry.io/collector/confmap => ../../confmap + +replace go.opentelemetry.io/collector/exporter => ../../exporter + +replace go.opentelemetry.io/collector/extension => ../../extension + +replace go.opentelemetry.io/collector/featuregate => ../../featuregate + +replace go.opentelemetry.io/collector/pdata => ../../pdata + +replace go.opentelemetry.io/collector/receiver => ../../receiver + +replace go.opentelemetry.io/collector/semconv => ../../semconv + +replace go.opentelemetry.io/collector/extension/zpagesextension => ../../extension/zpagesextension + +replace go.opentelemetry.io/collector/consumer => ../../consumer + +retract ( + v0.76.0 // Depends on retracted pdata v1.0.0-rc10 module, use v0.76.1 + v0.69.0 // Release failed, use v0.69.1 +) + +replace go.opentelemetry.io/collector/connector => ../../connector + +replace go.opentelemetry.io/collector/config/confignet => ../../config/confignet + +replace go.opentelemetry.io/collector/config/configtelemetry => ../../config/configtelemetry + +replace go.opentelemetry.io/collector/service => ../../service diff --git a/extension/memorylimiter/go.sum b/extension/memorylimiter/go.sum new file mode 100644 index 000000000000..43ad22ff0e8b --- /dev/null +++ b/extension/memorylimiter/go.sum @@ -0,0 +1,130 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= +github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= +github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= +github.com/knadh/koanf/providers/confmap v0.1.0 h1:gOkxhHkemwG4LezxxN8DMOFopOPghxRVp7JbIvdvqzU= +github.com/knadh/koanf/providers/confmap v0.1.0/go.mod h1:2uLhxQzJnyHKfxG927awZC7+fyHFdQkd697K4MdLnIU= +github.com/knadh/koanf/v2 v2.0.1 h1:1dYGITt1I23x8cfx8ZnldtezdyaZtfAuRtIFOiRzK7g= +github.com/knadh/koanf/v2 v2.0.1/go.mod h1:ZeiIlIDXTE7w1lMT6UVcNiRAS2/rCeLn/GdLNvY1Dus= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= +github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= +github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= +github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 h1:BpfhmLKZf+SjVanKKhCgf3bg+511DmU9eDQTen7LLbY= +github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= +github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/shirou/gopsutil/v3 v3.23.10 h1:/N42opWlYzegYaVkWejXWJpbzKv2JDy3mrgGzKsh9hM= +github.com/shirou/gopsutil/v3 v3.23.10/go.mod h1:JIE26kpucQi+innVlAUnIEOSBhBUkirr5b44yr55+WE= +github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= +github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= +github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= +github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= +github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= +github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +go.opentelemetry.io/otel v1.20.0 h1:vsb/ggIY+hUjD/zCAQHpzTmndPqv/ml2ArbsbfBYTAc= +go.opentelemetry.io/otel v1.20.0/go.mod h1:oUIGj3D77RwJdM6PPZImDpSZGDvkD9fhesHny69JFrs= +go.opentelemetry.io/otel/metric v1.20.0 h1:ZlrO8Hu9+GAhnepmRGhSU7/VkpjrNowxRN9GyKR4wzA= +go.opentelemetry.io/otel/metric v1.20.0/go.mod h1:90DRw3nfK4D7Sm/75yQ00gTJxtkBxX+wu6YaNymbpVM= +go.opentelemetry.io/otel/trace v1.20.0 h1:+yxVAPZPbQhbC3OfAkeIVTky6iTFpcr4SiY9om7mXSQ= +go.opentelemetry.io/otel/trace v1.20.0/go.mod h1:HJSK7F/hA5RlzpZ0zKDCHCDHm556LCDtKaAo6JmBFUU= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= +go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= +golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= +golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M= +google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= +google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/extension/memorylimiter/internal/mock_exporter.go b/extension/memorylimiter/internal/mock_exporter.go new file mode 100644 index 000000000000..d030a7b90ae8 --- /dev/null +++ b/extension/memorylimiter/internal/mock_exporter.go @@ -0,0 +1,67 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/extension/memorylimiter/internal" + +import ( + "context" + "sync/atomic" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" +) + +type MockExporter struct { + destAvailable int64 + acceptedLogCount int64 + deliveredLogCount int64 + Logs []plog.Logs +} + +var _ consumer.Logs = (*MockExporter)(nil) + +func (e *MockExporter) Capabilities() consumer.Capabilities { + return consumer.Capabilities{} +} + +func (e *MockExporter) ConsumeLogs(_ context.Context, ld plog.Logs) error { + atomic.AddInt64(&e.acceptedLogCount, int64(ld.LogRecordCount())) + + if atomic.LoadInt64(&e.destAvailable) == 1 { + // Destination is available, immediately deliver. + atomic.AddInt64(&e.deliveredLogCount, int64(ld.LogRecordCount())) + } else { + // Destination is not available. Queue the logs in the exporter. + e.Logs = append(e.Logs, ld) + } + return nil +} + +func (e *MockExporter) SetDestAvailable(available bool) { + if available { + // Pretend we delivered all queued accepted logs. + atomic.AddInt64(&e.deliveredLogCount, atomic.LoadInt64(&e.acceptedLogCount)) + + // Get rid of the delivered logs so that memory can be collected. + e.Logs = nil + + // Now mark destination available so that subsequent ConsumeLogs + // don't queue the logs anymore. + atomic.StoreInt64(&e.destAvailable, 1) + + } else { + atomic.StoreInt64(&e.destAvailable, 0) + } +} + +func (e *MockExporter) AcceptedLogCount() int { + return int(atomic.LoadInt64(&e.acceptedLogCount)) +} + +func (e *MockExporter) DeliveredLogCount() int { + return int(atomic.LoadInt64(&e.deliveredLogCount)) +} + +func NewMockExporter() *MockExporter { + return &MockExporter{} +} diff --git a/extension/memorylimiter/internal/mock_receiver.go b/extension/memorylimiter/internal/mock_receiver.go new file mode 100644 index 000000000000..bac3b436b223 --- /dev/null +++ b/extension/memorylimiter/internal/mock_receiver.go @@ -0,0 +1,61 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/extension/memorylimiter/internal" + +import ( + "context" + "strings" + "sync" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/pdata/plog" +) + +type MockReceiver struct { + ProduceCount int + NextConsumer consumer.Logs + lastConsumeResult error + mux sync.Mutex +} + +func (m *MockReceiver) Start() { + go m.produce() +} + +// This function demonstrates how the receivers should behave when the ConsumeLogs/Traces/Metrics +// call returns an error. +func (m *MockReceiver) produce() { + for i := 0; i < m.ProduceCount; i++ { + // Create a large log to consume some memory. + ld := plog.NewLogs() + lr := ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + kiloStr := strings.Repeat("x", 10*1024) + lr.SetSeverityText(kiloStr) + + retry: + // Send to the pipeline. + err := m.NextConsumer.ConsumeLogs(context.Background(), ld) + + // Remember the result to be used in the tests. + m.mux.Lock() + m.lastConsumeResult = err + m.mux.Unlock() + + if err != nil { + // Sending to the pipeline failed. + if !consumererror.IsPermanent(err) { + // Retryable error. Try the same data again. + goto retry + } + // Permanent error. Drop it. + } + } +} + +func (m *MockReceiver) LastConsumeResult() error { + m.mux.Lock() + defer m.mux.Unlock() + return m.lastConsumeResult +} diff --git a/extension/memorylimiter/memorylimiter.go b/extension/memorylimiter/memorylimiter.go new file mode 100644 index 000000000000..1bfd0e38395b --- /dev/null +++ b/extension/memorylimiter/memorylimiter.go @@ -0,0 +1,263 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package memorylimiter // import "go.opentelemetry.io/collector/extension/memorylimiter" + +import ( + "context" + "errors" + "fmt" + "runtime" + "sync" + "sync/atomic" + "time" + + "go.opentelemetry.io/collector/component" + "go.uber.org/zap" + + "go.opentelemetry.io/collector/internal/iruntime" +) + +const ( + mibBytes = 1024 * 1024 + + // Minimum interval between forced GC when in soft limited mode. We don't want to + // do GCs too frequently since it is a CPU-heavy operation. + minGCIntervalWhenSoftLimited = 10 * time.Second +) + +var ( + // errDataRefused will be returned to callers of ConsumeTraceData to indicate + // that data is being refused due to high memory usage. + errDataRefused = errors.New("data refused due to high memory usage") + + // Construction errors + + errCheckIntervalOutOfRange = errors.New( + "checkInterval must be greater than zero") + + errLimitOutOfRange = errors.New( + "memAllocLimit or memoryLimitPercentage must be greater than zero") + + errMemSpikeLimitOutOfRange = errors.New( + "memSpikeLimit must be smaller than memAllocLimit") + + errPercentageLimitOutOfRange = errors.New( + "memoryLimitPercentage and memorySpikePercentage must be greater than zero and less than or equal to hundred", + ) + + errShutdownNotStarted = errors.New("no existing monitoring routine is running") + + // make it overridable by tests + getMemoryFn = iruntime.TotalMemory +) + +type memoryLimiter struct { + usageChecker memUsageChecker + + memCheckWait time.Duration + + // mustRefuse is used to indicate when data should be refused. + mustRefuse *atomic.Bool + + ticker *time.Ticker + lastGCDone time.Time + + // The function to read the mem values is set as a reference to help with + // testing different values. + readMemStatsFn func(m *runtime.MemStats) + + // Fields used for logging. + logger *zap.Logger + + refCounterLock sync.Mutex + refCounter int +} + +// newMemoryLimiter returns a new memorylimiter extension. +func newMemoryLimiter(cfg *Config, logger *zap.Logger) (*memoryLimiter, error) { + if err := cfg.Validate(); err != nil { + return nil, err + } + + usageChecker, err := getMemUsageChecker(cfg, logger) + if err != nil { + return nil, err + } + + logger.Info("Memory limiter configured", + zap.Uint64("limit_mib", usageChecker.memAllocLimit/mibBytes), + zap.Uint64("spike_limit_mib", usageChecker.memSpikeLimit/mibBytes), + zap.Duration("check_interval", cfg.CheckInterval)) + + ml := &memoryLimiter{ + usageChecker: *usageChecker, + memCheckWait: cfg.CheckInterval, + ticker: time.NewTicker(cfg.CheckInterval), + readMemStatsFn: runtime.ReadMemStats, + logger: logger, + mustRefuse: &atomic.Bool{}, + } + + return ml, nil +} + +func getMemUsageChecker(cfg *Config, logger *zap.Logger) (*memUsageChecker, error) { + memAllocLimit := uint64(cfg.MemoryLimitMiB) * mibBytes + memSpikeLimit := uint64(cfg.MemorySpikeLimitMiB) * mibBytes + if cfg.MemoryLimitMiB != 0 { + return newFixedMemUsageChecker(memAllocLimit, memSpikeLimit) + } + + totalMemory, err := getMemoryFn() + if err != nil { + return nil, fmt.Errorf("failed to get total memory, use fixed memory settings (limit_mib): %w", err) + } + logger.Info("Using percentage memory limiter", + zap.Uint64("total_memory_mib", totalMemory/mibBytes), + zap.Uint32("limit_percentage", cfg.MemoryLimitPercentage), + zap.Uint32("spike_limit_percentage", cfg.MemorySpikePercentage)) + + return newPercentageMemUsageChecker(totalMemory, uint64(cfg.MemoryLimitPercentage), uint64(cfg.MemorySpikePercentage)) +} + +func (ml *memoryLimiter) Start(_ context.Context, _ component.Host) error { + ml.startMonitoring() + return nil +} + +func (ml *memoryLimiter) Shutdown(_ context.Context) error { + ml.refCounterLock.Lock() + defer ml.refCounterLock.Unlock() + + if ml.refCounter == 0 { + return errShutdownNotStarted + } else if ml.refCounter == 1 { + ml.ticker.Stop() + } + ml.refCounter-- + return nil +} + +func (ml *memoryLimiter) CheckMemory() error { + if ml.mustRefuse.Load() { + // TODO: actually to be 100% sure that this is "refused" and not "dropped" + // it is necessary to check the pipeline to see if this is directly connected + // to a receiver (ie.: a receiver is on the call stack). For now it + // assumes that the pipeline is properly configured and a receiver is on the + // callstack and that the receiver will correctly retry the refused data again. + + return errDataRefused + } + + // Even if the next consumer returns error record the data as accepted by + // this extension. + return nil +} + +func (ml *memoryLimiter) readMemStats() *runtime.MemStats { + ms := &runtime.MemStats{} + ml.readMemStatsFn(ms) + return ms +} + +// startMonitoring starts a single ticker'd goroutine per instance +// that will check memory usage every checkInterval period. +func (ml *memoryLimiter) startMonitoring() { + ml.refCounterLock.Lock() + defer ml.refCounterLock.Unlock() + + ml.refCounter++ + if ml.refCounter == 1 { + go func() { + for range ml.ticker.C { + ml.checkMemLimits() + } + }() + } +} + +func memstatToZapField(ms *runtime.MemStats) zap.Field { + return zap.Uint64("cur_mem_mib", ms.Alloc/mibBytes) +} + +func (ml *memoryLimiter) doGCandReadMemStats() *runtime.MemStats { + runtime.GC() + ml.lastGCDone = time.Now() + ms := ml.readMemStats() + ml.logger.Debug("Memory usage after GC.", memstatToZapField(ms)) + return ms +} + +func (ml *memoryLimiter) checkMemLimits() { + ms := ml.readMemStats() + + ml.logger.Debug("Currently used memory.", memstatToZapField(ms)) + + if ml.usageChecker.aboveHardLimit(ms) { + ml.logger.Warn("Memory usage is above hard limit. Forcing a GC.", memstatToZapField(ms)) + ms = ml.doGCandReadMemStats() + } + + // Remember current state. + wasRefusing := ml.mustRefuse.Load() + + // Check if the memory usage is above the soft limit. + mustRefuse := ml.usageChecker.aboveSoftLimit(ms) + + if wasRefusing && !mustRefuse { + // Was previously refusing but enough memory is available now, no need to limit. + ml.logger.Debug("Memory usage back within limits. Resuming normal operation.", memstatToZapField(ms)) + } + + if !wasRefusing && mustRefuse { + // We are above soft limit, do a GC if it wasn't done recently and see if + // it brings memory usage below the soft limit. + if time.Since(ml.lastGCDone) > minGCIntervalWhenSoftLimited { + ml.logger.Debug("Memory usage is above soft limit. Forcing a GC.", memstatToZapField(ms)) + ms = ml.doGCandReadMemStats() + // Check the limit again to see if GC helped. + mustRefuse = ml.usageChecker.aboveSoftLimit(ms) + } + + if mustRefuse { + ml.logger.Warn("Memory usage is above soft limit. Refusing data.", memstatToZapField(ms)) + } + } + + ml.mustRefuse.Store(mustRefuse) +} + +type memUsageChecker struct { + memAllocLimit uint64 + memSpikeLimit uint64 +} + +func (d memUsageChecker) aboveSoftLimit(ms *runtime.MemStats) bool { + return ms.Alloc >= d.memAllocLimit-d.memSpikeLimit +} + +func (d memUsageChecker) aboveHardLimit(ms *runtime.MemStats) bool { + return ms.Alloc >= d.memAllocLimit +} + +func newFixedMemUsageChecker(memAllocLimit, memSpikeLimit uint64) (*memUsageChecker, error) { + if memSpikeLimit >= memAllocLimit { + return nil, errMemSpikeLimitOutOfRange + } + if memSpikeLimit == 0 { + // If spike limit is unspecified use 20% of mem limit. + memSpikeLimit = memAllocLimit / 5 + } + return &memUsageChecker{ + memAllocLimit: memAllocLimit, + memSpikeLimit: memSpikeLimit, + }, nil +} + +func newPercentageMemUsageChecker(totalMemory uint64, percentageLimit, percentageSpike uint64) (*memUsageChecker, error) { + if percentageLimit > 100 || percentageLimit <= 0 || percentageSpike > 100 || percentageSpike <= 0 { + return nil, errPercentageLimitOutOfRange + } + return newFixedMemUsageChecker(percentageLimit*totalMemory/100, percentageSpike*totalMemory/100) +} diff --git a/extension/memorylimiter/memorylimiter_test.go b/extension/memorylimiter/memorylimiter_test.go new file mode 100644 index 000000000000..14b02f7b31bd --- /dev/null +++ b/extension/memorylimiter/memorylimiter_test.go @@ -0,0 +1,221 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package memorylimiter + +import ( + "context" + "runtime" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/internal/iruntime" + "go.uber.org/zap" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" +) + +func TestNew(t *testing.T) { + type args struct { + nextConsumer consumer.Traces + checkInterval time.Duration + memoryLimitMiB uint32 + memorySpikeLimitMiB uint32 + } + sink := new(consumertest.TracesSink) + tests := []struct { + name string + args args + wantErr error + }{ + { + name: "zero_checkInterval", + args: args{ + nextConsumer: sink, + }, + wantErr: errCheckIntervalOutOfRange, + }, + { + name: "zero_memAllocLimit", + args: args{ + nextConsumer: sink, + checkInterval: 100 * time.Millisecond, + }, + wantErr: errLimitOutOfRange, + }, + { + name: "memSpikeLimit_gt_memAllocLimit", + args: args{ + nextConsumer: sink, + checkInterval: 100 * time.Millisecond, + memoryLimitMiB: 1, + memorySpikeLimitMiB: 2, + }, + wantErr: errMemSpikeLimitOutOfRange, + }, + { + name: "success", + args: args{ + nextConsumer: sink, + checkInterval: 100 * time.Millisecond, + memoryLimitMiB: 1024, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := &Config{} + cfg.CheckInterval = tt.args.checkInterval + cfg.MemoryLimitMiB = tt.args.memoryLimitMiB + cfg.MemorySpikeLimitMiB = tt.args.memorySpikeLimitMiB + got, err := newMemoryLimiter(cfg, zap.NewNop()) + ctx := context.Background() + if tt.wantErr != nil { + assert.ErrorIs(t, err, tt.wantErr) + return + } + assert.NoError(t, err) + got.Start(ctx, nil) + assert.NoError(t, got.Shutdown(ctx)) + }) + } +} + +func TestMemoryPressureResponse(t *testing.T) { + var currentMemAlloc uint64 + ml := &memoryLimiter{ + usageChecker: memUsageChecker{ + memAllocLimit: 1024, + }, + mustRefuse: &atomic.Bool{}, + readMemStatsFn: func(ms *runtime.MemStats) { + ms.Alloc = currentMemAlloc + }, + logger: zap.NewNop(), + } + + // Below memAllocLimit. + currentMemAlloc = 800 + ml.checkMemLimits() + assert.NoError(t, ml.CheckMemory()) + + // Above memAllocLimit. + currentMemAlloc = 1800 + ml.checkMemLimits() + assert.Equal(t, errDataRefused, ml.CheckMemory()) + + // Check spike limit + ml.usageChecker.memSpikeLimit = 512 + + // Below memSpikeLimit. + currentMemAlloc = 500 + ml.checkMemLimits() + assert.NoError(t, ml.CheckMemory()) + + // Above memSpikeLimit. + currentMemAlloc = 550 + ml.checkMemLimits() + assert.Equal(t, errDataRefused, ml.CheckMemory()) +} + +func TestGetDecision(t *testing.T) { + t.Run("fixed_limit", func(t *testing.T) { + d, err := getMemUsageChecker(&Config{MemoryLimitMiB: 100, MemorySpikeLimitMiB: 20}, zap.NewNop()) + require.NoError(t, err) + assert.Equal(t, &memUsageChecker{ + memAllocLimit: 100 * mibBytes, + memSpikeLimit: 20 * mibBytes, + }, d) + }) + t.Run("fixed_limit_error", func(t *testing.T) { + d, err := getMemUsageChecker(&Config{MemoryLimitMiB: 20, MemorySpikeLimitMiB: 100}, zap.NewNop()) + require.Error(t, err) + assert.Nil(t, d) + }) + + t.Cleanup(func() { + getMemoryFn = iruntime.TotalMemory + }) + getMemoryFn = func() (uint64, error) { + return 100 * mibBytes, nil + } + t.Run("percentage_limit", func(t *testing.T) { + d, err := getMemUsageChecker(&Config{MemoryLimitPercentage: 50, MemorySpikePercentage: 10}, zap.NewNop()) + require.NoError(t, err) + assert.Equal(t, &memUsageChecker{ + memAllocLimit: 50 * mibBytes, + memSpikeLimit: 10 * mibBytes, + }, d) + }) + t.Run("percentage_limit_error", func(t *testing.T) { + d, err := getMemUsageChecker(&Config{MemoryLimitPercentage: 101, MemorySpikePercentage: 10}, zap.NewNop()) + require.Error(t, err) + assert.Nil(t, d) + d, err = getMemUsageChecker(&Config{MemoryLimitPercentage: 99, MemorySpikePercentage: 101}, zap.NewNop()) + require.Error(t, err) + assert.Nil(t, d) + }) +} + +func TestRefuseDecision(t *testing.T) { + decison1000Limit30Spike30, err := newPercentageMemUsageChecker(1000, 60, 30) + require.NoError(t, err) + decison1000Limit60Spike50, err := newPercentageMemUsageChecker(1000, 60, 50) + require.NoError(t, err) + decison1000Limit40Spike20, err := newPercentageMemUsageChecker(1000, 40, 20) + require.NoError(t, err) + decison1000Limit40Spike60, err := newPercentageMemUsageChecker(1000, 40, 60) + require.Error(t, err) + assert.Nil(t, decison1000Limit40Spike60) + + tests := []struct { + name string + usageChecker memUsageChecker + ms *runtime.MemStats + shouldRefuse bool + }{ + { + name: "should refuse over limit", + usageChecker: *decison1000Limit30Spike30, + ms: &runtime.MemStats{Alloc: 600}, + shouldRefuse: true, + }, + { + name: "should not refuse", + usageChecker: *decison1000Limit30Spike30, + ms: &runtime.MemStats{Alloc: 100}, + shouldRefuse: false, + }, + { + name: "should not refuse spike, fixed usageChecker", + usageChecker: memUsageChecker{ + memAllocLimit: 600, + memSpikeLimit: 500, + }, + ms: &runtime.MemStats{Alloc: 300}, + shouldRefuse: true, + }, + { + name: "should refuse, spike, percentage usageChecker", + usageChecker: *decison1000Limit60Spike50, + ms: &runtime.MemStats{Alloc: 300}, + shouldRefuse: true, + }, + { + name: "should refuse, spike, percentage usageChecker", + usageChecker: *decison1000Limit40Spike20, + ms: &runtime.MemStats{Alloc: 250}, + shouldRefuse: true, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + shouldRefuse := test.usageChecker.aboveSoftLimit(test.ms) + assert.Equal(t, test.shouldRefuse, shouldRefuse) + }) + } +} diff --git a/extension/memorylimiter/testdata/config.yaml b/extension/memorylimiter/testdata/config.yaml new file mode 100644 index 000000000000..4aa0385c0008 --- /dev/null +++ b/extension/memorylimiter/testdata/config.yaml @@ -0,0 +1,19 @@ +# check_interval is the time between measurements of memory usage for the +# purposes of avoiding going over the limits. Defaults to zero, so no +# checks will be performed. Values below 1 second are not recommended since +# it can result in unnecessary CPU consumption. +check_interval: 5s + +# Maximum amount of memory, in MiB, targeted to be allocated by the process heap. +# Note that typically the total memory usage of process will be about 50MiB higher +# than this value. +limit_mib: 4000 + +# The maximum, in MiB, spike expected between the measurements of memory usage. +spike_limit_mib: 500 + +# the maximum amount of memory, in %, targeted to be allocated by the process +limit_percentage: 0 + +# the maximum, in percents against the total memory, spike expected between the measurements of memory usage. +spike_limit_percentage: 0 \ No newline at end of file