Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add memory limiter extension, a copy of memorylimiter processor
Browse files Browse the repository at this point in the history
timannguyen committed Dec 8, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent dc28ec1 commit f3c7466
Showing 14 changed files with 1,173 additions and 0 deletions.
25 changes: 25 additions & 0 deletions .chloggen/extension-memory-limiter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: [new_component]

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: extension_memoryLimiter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: move memory limiter processor to extension to allow connection limits at the receiver

# One or more tracking issues or pull requests related to the change
issues: [8632]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
1 change: 1 addition & 0 deletions extension/memorylimiter/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
101 changes: 101 additions & 0 deletions extension/memorylimiter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Memory Limiter extension

| Status | |
|--------------------------|-------------------|
| Stability | [alpha] |
| Distributions | [core], [contrib] |

The memory limiter extension is used to prevent out of memory situations on
the collector. Given that the amount and type of data the collector processes is
environment specific and resource utilization of the collector is also dependent
on the configured extensions, it is important to put checks in place regarding
memory usage.

The memory_limiter extension allows to perform periodic checks of memory
usage if it exceeds defined limits will begin refusing data and forcing GC to reduce
memory consumption.

The memory_limiter uses soft and hard memory limits. Hard limit is always above or equal
the soft limit.

When the memory usage exceeds the soft limit the extension will enter the memory limited
mode and will start refusing the data by returning errors to the preceding component.
The preceding component should be normally a receiver.

In memory limited mode the error returned by CheckMemory function is a
non-permanent error. When receivers see this error they are expected to retry sending
the same data. The receivers may also apply a backpressure to their data sources
in order to slow down the inflow of data into the Collector and allow the memory usage
to go below the limits.

When the memory usage is above the hard limit in addition to refusing the data the
extension will forcedly perform garbage collection in order to try to free memory.

When the memory usage drop below the soft limit, the normal operation is resumed (data
will no longer be refused and no forced garbage collection will be performed).

The difference between the soft limit and hard limits is defined via `spike_limit_mib`
configuration option. The value of this option should be selected in a way that ensures
that between the memory check intervals the memory usage cannot increase by more than this
value (otherwise memory usage may exceed the hard limit - even if temporarily).
A good starting point for `spike_limit_mib` is 20% of the hard limit. Bigger
`spike_limit_mib` values may be necessary for spiky traffic or for longer check intervals.

Note that while the extension can help mitigate out of memory situations,
it is not a replacement for properly sizing and configuring the
collector. Keep in mind that if the soft limit is crossed, the collector will
return errors to all receive operations until enough memory is freed. This may
eventually result in dropped data since the receivers may not be able to hold back
and retry the data indefinitely.

Please refer to [config.go](./config.go) for the config spec.

The following configuration options **must be changed**:
- `check_interval` (default = 0s): Time between measurements of memory
usage. The recommended value is 1 second.
If the expected traffic to the Collector is very spiky then decrease the `check_interval`
or increase `spike_limit_mib` to avoid memory usage going over the hard limit.
- `limit_mib` (default = 0): Maximum amount of memory, in MiB, targeted to be
allocated by the process heap. Note that typically the total memory usage of
process will be about 50MiB higher than this value. This defines the hard limit.
- `spike_limit_mib` (default = 20% of `limit_mib`): Maximum spike expected between the
measurements of memory usage. The value must be less than `limit_mib`. The soft limit
value will be equal to (limit_mib - spike_limit_mib).
The recommended value for `spike_limit_mib` is about 20% `limit_mib`.
- `limit_percentage` (default = 0): Maximum amount of total memory targeted to be
allocated by the process heap. This configuration is supported on Linux systems with cgroups
and it's intended to be used in dynamic platforms like docker.
This option is used to calculate `memory_limit` from the total available memory.
For instance setting of 75% with the total memory of 1GiB will result in the limit of 750 MiB.
The fixed memory setting (`limit_mib`) takes precedence
over the percentage configuration.
- `spike_limit_percentage` (default = 0): Maximum spike expected between the
measurements of memory usage. The value must be less than `limit_percentage`.
This option is used to calculate `spike_limit_mib` from the total available memory.
For instance setting of 25% with the total memory of 1GiB will result in the spike limit of 250MiB.
This option is intended to be used only with `limit_percentage`.

Examples:

```yaml
extensions:
memory_limiter:
check_interval: 1s
limit_mib: 4000
spike_limit_mib: 800
```
```yaml
extensions:
memory_limiter:
check_interval: 1s
limit_percentage: 50
spike_limit_percentage: 30
```
Refer to [config.yaml](./testdata/config.yaml) for detailed
examples on using the extension.
[beta]: https://github.com/open-telemetry/opentelemetry-collector#beta
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
[core]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol
51 changes: 51 additions & 0 deletions extension/memorylimiter/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package memorylimiter provides an extension for OpenTelemetry Service
// that refuses data according to the current state of memory usage.
package memorylimiter // import "go.opentelemetry.io/collector/extension/memorylimiter"

import (
"time"

"go.opentelemetry.io/collector/component"
)

// Config defines configuration for memory memoryLimiter extension.
type Config struct {
// CheckInterval is the time between measurements of memory usage for the
// purposes of avoiding going over the limits. Defaults to zero, so no
// checks will be performed.
CheckInterval time.Duration `mapstructure:"check_interval"`

// MemoryLimitMiB is the maximum amount of memory, in MiB, targeted to be
// allocated by the extension.
MemoryLimitMiB uint32 `mapstructure:"limit_mib"`

// MemorySpikeLimitMiB is the maximum, in MiB, spike expected between the
// measurements of memory usage.
MemorySpikeLimitMiB uint32 `mapstructure:"spike_limit_mib"`

// MemoryLimitPercentage is the maximum amount of memory, in %, targeted to be
// allocated by the extension. The fixed memory settings MemoryLimitMiB has a higher precedence.
MemoryLimitPercentage uint32 `mapstructure:"limit_percentage"`

// MemorySpikePercentage is the maximum, in percents against the total memory,
// spike expected between the measurements of memory usage.
MemorySpikePercentage uint32 `mapstructure:"spike_limit_percentage"`
}

var _ component.Config = (*Config)(nil)

// Validate checks if the extension configuration is valid
func (cfg *Config) Validate() error {
if cfg.CheckInterval <= 0 {
return errCheckIntervalOutOfRange
}

if cfg.MemoryLimitMiB == 0 && cfg.MemoryLimitPercentage == 0 {
return errLimitOutOfRange
}

return nil
}
58 changes: 58 additions & 0 deletions extension/memorylimiter/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package memorylimiter

import (
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/confmaptest"
)

func TestUnmarshalDefaultConfig(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.NoError(t, component.UnmarshalConfig(confmap.New(), cfg))
assert.Equal(t, factory.CreateDefaultConfig(), cfg)
}

func TestUnmarshalConfig(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.NoError(t, component.UnmarshalConfig(cm, cfg))
assert.Equal(t,
&Config{
CheckInterval: 5 * time.Second,
MemoryLimitMiB: 4000,
MemorySpikeLimitMiB: 500,
}, cfg)
}

func TestValidateConfig(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
// Valid Config
assert.NoError(t, component.UnmarshalConfig(cm, cfg))
assert.NoError(t, cfg.(*Config).Validate())
// Invalid Interval
pCfg := cfg.(*Config)
pCfg.CheckInterval = 0
assert.Error(t, pCfg.Validate(), errCheckIntervalOutOfRange)
// Invalid mem limit
pCfg = cfg.(*Config)
pCfg.CheckInterval = 1
pCfg.MemoryLimitMiB = 0
pCfg.MemoryLimitPercentage = 0
assert.Error(t, pCfg.Validate(), errLimitOutOfRange)
}
35 changes: 35 additions & 0 deletions extension/memorylimiter/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package memorylimiter // import "go.opentelemetry.io/collector/extension/memorylimiter"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension"
)

const (
// The value of "type" Attribute Key in configuration.
typeStr = "memory_limiter"
)

// NewFactory returns a new factory for the Memory Limiter extension.
func NewFactory() extension.Factory {
return extension.NewFactory(
typeStr,
createDefaultConfig,
createExtension,
component.StabilityLevelAlpha)
}

// CreateDefaultConfig creates the default configuration for extension. Notice
// that the default configuration is expected to fail for this extension.
func createDefaultConfig() component.Config {
return &Config{}
}

func createExtension(_ context.Context, set extension.CreateSettings, cfg component.Config) (extension.Extension, error) {
return newMemoryLimiter(cfg.(*Config), set.TelemetrySettings.Logger)
}
53 changes: 53 additions & 0 deletions extension/memorylimiter/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package memorylimiter

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/extension/extensiontest"

"go.opentelemetry.io/collector/component/componenttest"
)

func TestCreateDefaultConfig(t *testing.T) {
factory := NewFactory()
require.NotNil(t, factory)

cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
}

func TestCreateExtension(t *testing.T) {
factory := NewFactory()
require.NotNil(t, factory)

cfg := factory.CreateDefaultConfig()
// This extension can't be created with the default config.
tp, err := factory.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), cfg)
assert.Nil(t, tp)
assert.Error(t, err, "created extension with invalid settings")

// Create extension with a valid config.
pCfg := cfg.(*Config)
pCfg.MemoryLimitMiB = 5722
pCfg.MemorySpikeLimitMiB = 1907
pCfg.CheckInterval = 100 * time.Millisecond

tp, err = factory.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), cfg)
assert.NoError(t, err)
assert.NotNil(t, tp)
// test if we can shutdown a monitoring routine that has not started
assert.ErrorIs(t, tp.Shutdown(context.Background()), errShutdownNotStarted)
assert.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost()))

assert.NoError(t, tp.Shutdown(context.Background()))
// verify that no monitoring routine is running
assert.Error(t, tp.Shutdown(context.Background()))
}
Loading

0 comments on commit f3c7466

Please sign in to comment.