diff --git a/.chloggen/extension-memory-limiter.yaml b/.chloggen/extension-memory-limiter.yaml index 90b02b3ffb8e..36d72dee6529 100644 --- a/.chloggen/extension-memory-limiter.yaml +++ b/.chloggen/extension-memory-limiter.yaml @@ -1,13 +1,13 @@ # Use this changelog template to create an entry for release notes. # One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' -change_type: [new_component] +change_type: new_component # The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) -component: extension_memoryLimiter +component: extension/memoryLimiter # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: move memory limiter processor to extension to allow connection limits at the receiver +note: replicate processor/memorylimiter -> extension/memorylimiter to limits at the receiver # One or more tracking issues or pull requests related to the change issues: [8632] diff --git a/extension/memorylimiter/config.go b/extension/memorylimiter/config.go index 4432ae1ac64e..a16aaaf1ff21 100644 --- a/extension/memorylimiter/config.go +++ b/extension/memorylimiter/config.go @@ -6,6 +6,7 @@ package memorylimiter // import "go.opentelemetry.io/collector/extension/memorylimiter" import ( + "fmt" "time" "go.opentelemetry.io/collector/component" @@ -35,6 +36,23 @@ type Config struct { MemorySpikePercentage uint32 `mapstructure:"spike_limit_percentage"` } +// MemoryLimitation defines memory limiter path for the receiver +type MemoryLimitation struct { + // MemoryLimiterID specifies the name of the memory limiter extension + MemoryLimiterID component.ID `mapstructure:"memory_limiter"` +} + +// GetMemoryLimiter attempts to find a memory limiter extension in the extension list. +// If a memory limiter extension is not found, an error is returned. +func (m *MemoryLimitation) GetMemoryLimiter(extensions map[component.ID]component.Component) (MemoryLimiter, error) { + if ext, found := extensions[m.MemoryLimiterID]; found { + if ml, ok := ext.(MemoryLimiter); ok { + return ml, nil + } + } + return nil, fmt.Errorf("failed to resolve Memory Limiter %q", m.MemoryLimiterID) +} + var _ component.Config = (*Config)(nil) // Validate checks if the extension configuration is valid diff --git a/extension/memorylimiter/config_test.go b/extension/memorylimiter/config_test.go index 0286ede2e623..1c8a78e545f0 100644 --- a/extension/memorylimiter/config_test.go +++ b/extension/memorylimiter/config_test.go @@ -5,11 +5,14 @@ package memorylimiter import ( "path/filepath" + "runtime" + "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" @@ -56,3 +59,31 @@ func TestValidateConfig(t *testing.T) { pCfg.MemoryLimitPercentage = 0 assert.Error(t, pCfg.Validate(), errLimitOutOfRange) } + +func TestMemoryLimitation(t *testing.T) { + var mlc = &MemoryLimitation{ + MemoryLimiterID: component.NewID("ml"), + } + exts := make(map[component.ID]component.Component) + t.Run("extension not found", func(t *testing.T) { + e, err := mlc.GetMemoryLimiter(exts) + require.Error(t, err) + assert.Nil(t, e) + }) + + exts[component.NewID("ml")] = &memoryLimiter{ + usageChecker: memUsageChecker{ + memAllocLimit: 1024, + }, + mustRefuse: &atomic.Bool{}, + readMemStatsFn: func(ms *runtime.MemStats) { + ms.Alloc = 100 + }, + logger: zap.NewNop(), + } + t.Run("extension found", func(t *testing.T) { + ml, err := mlc.GetMemoryLimiter(exts) + assert.NoError(t, err) + assert.NoError(t, ml.CheckMemory()) + }) +} diff --git a/extension/memorylimiter/go.mod b/extension/memorylimiter/go.mod index 611d8ebf063f..efc6551b89f8 100644 --- a/extension/memorylimiter/go.mod +++ b/extension/memorylimiter/go.mod @@ -9,7 +9,6 @@ require ( go.opentelemetry.io/collector/confmap v0.89.0 go.opentelemetry.io/collector/consumer v0.89.0 go.opentelemetry.io/collector/extension v0.89.0 - go.opentelemetry.io/collector/pdata v1.0.0-rcv0018 go.uber.org/zap v1.26.0 ) @@ -37,9 +36,10 @@ require ( github.com/yusufpapurcu/wmi v1.2.3 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.89.0 // indirect go.opentelemetry.io/collector/featuregate v1.0.0-rcv0018 // indirect - go.opentelemetry.io/otel v1.20.0 // indirect - go.opentelemetry.io/otel/metric v1.20.0 // indirect - go.opentelemetry.io/otel/trace v1.20.0 // indirect + go.opentelemetry.io/collector/pdata v1.0.0-rcv0018 // indirect + go.opentelemetry.io/otel v1.21.0 // indirect + go.opentelemetry.io/otel/metric v1.21.0 // indirect + go.opentelemetry.io/otel/trace v1.21.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.18.0 // indirect golang.org/x/sys v0.14.0 // indirect diff --git a/extension/memorylimiter/go.sum b/extension/memorylimiter/go.sum index 43ad22ff0e8b..90d55f6b7a6a 100644 --- a/extension/memorylimiter/go.sum +++ b/extension/memorylimiter/go.sum @@ -68,12 +68,12 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= -go.opentelemetry.io/otel v1.20.0 h1:vsb/ggIY+hUjD/zCAQHpzTmndPqv/ml2ArbsbfBYTAc= -go.opentelemetry.io/otel v1.20.0/go.mod h1:oUIGj3D77RwJdM6PPZImDpSZGDvkD9fhesHny69JFrs= -go.opentelemetry.io/otel/metric v1.20.0 h1:ZlrO8Hu9+GAhnepmRGhSU7/VkpjrNowxRN9GyKR4wzA= -go.opentelemetry.io/otel/metric v1.20.0/go.mod h1:90DRw3nfK4D7Sm/75yQ00gTJxtkBxX+wu6YaNymbpVM= -go.opentelemetry.io/otel/trace v1.20.0 h1:+yxVAPZPbQhbC3OfAkeIVTky6iTFpcr4SiY9om7mXSQ= -go.opentelemetry.io/otel/trace v1.20.0/go.mod h1:HJSK7F/hA5RlzpZ0zKDCHCDHm556LCDtKaAo6JmBFUU= +go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc= +go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo= +go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4= +go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM= +go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc= +go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= diff --git a/extension/memorylimiter/memorylimiter.go b/extension/memorylimiter/memorylimiter.go index b10c52ec8d55..3c40f2b77d41 100644 --- a/extension/memorylimiter/memorylimiter.go +++ b/extension/memorylimiter/memorylimiter.go @@ -129,17 +129,6 @@ func getMemUsageChecker(cfg *Config, logger *zap.Logger) (*memUsageChecker, erro return newPercentageMemUsageChecker(totalMemory, uint64(cfg.MemoryLimitPercentage), uint64(cfg.MemorySpikePercentage)) } -// GetMemoryLimiterExtension attempts to find a memory limiter extension in the extension list. -// If a memory limiter extension is not found, an error is returned. -func GetMemoryLimiterExtension(extensions map[component.ID]component.Component) (MemoryLimiter, error) { - for _, extension := range extensions { - if ext, ok := extension.(interface{ CheckMemory() error }); ok { - return ext.(MemoryLimiter), nil - } - } - return nil, fmt.Errorf("failed to resolve Memory Limiter") -} - func (ml *memoryLimiter) Start(_ context.Context, _ component.Host) error { ml.startMonitoring() return nil diff --git a/extension/memorylimiter/memorylimiter_test.go b/extension/memorylimiter/memorylimiter_test.go index feb7f9c62163..14b02f7b31bd 100644 --- a/extension/memorylimiter/memorylimiter_test.go +++ b/extension/memorylimiter/memorylimiter_test.go @@ -12,7 +12,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/internal/iruntime" "go.uber.org/zap" @@ -220,28 +219,3 @@ func TestRefuseDecision(t *testing.T) { }) } } - -func testGetGetMemoryLimiterExtension(t *testing.T) { - exts := make(map[component.ID]component.Component) - t.Run("extension not found", func(t *testing.T) { - e, err := GetMemoryLimiterExtension(exts) - require.Error(t, err) - assert.Nil(t, e) - }) - - exts[component.NewID("ml")] = &memoryLimiter{ - usageChecker: memUsageChecker{ - memAllocLimit: 1024, - }, - mustRefuse: &atomic.Bool{}, - readMemStatsFn: func(ms *runtime.MemStats) { - ms.Alloc = 100 - }, - logger: zap.NewNop(), - } - t.Run("extension found", func(t *testing.T) { - ml, err := GetMemoryLimiterExtension(exts) - assert.NoError(t, err) - assert.NoError(t, ml.CheckMemory()) - }) -}