Skip to content

Commit

Permalink
Add a memory limiter processor (#498)
Browse files Browse the repository at this point in the history
This adds a processor that drops data according to configured memory limits.
The processor is important for high load situations when receiving rate exceeds exporting
rate (and an extreme case of this is when the target of exporting is unavailable).

Typical production run will need to have this processor included in every pipeline
immediately after the batch processor.
  • Loading branch information
tigrannajaryan authored Jan 14, 2020
1 parent 9778b16 commit 21a70d6
Show file tree
Hide file tree
Showing 13 changed files with 922 additions and 13 deletions.
2 changes: 2 additions & 0 deletions defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector/processor"
"github.com/open-telemetry/opentelemetry-collector/processor/attributesprocessor"
"github.com/open-telemetry/opentelemetry-collector/processor/batchprocessor"
"github.com/open-telemetry/opentelemetry-collector/processor/memorylimiter"
"github.com/open-telemetry/opentelemetry-collector/processor/queuedprocessor"
"github.com/open-telemetry/opentelemetry-collector/processor/samplingprocessor/probabilisticsamplerprocessor"
"github.com/open-telemetry/opentelemetry-collector/processor/samplingprocessor/tailsamplingprocessor"
Expand Down Expand Up @@ -87,6 +88,7 @@ func Components() (
&attributesprocessor.Factory{},
&queuedprocessor.Factory{},
&batchprocessor.Factory{},
&memorylimiter.Factory{},
&tailsamplingprocessor.Factory{},
&probabilisticsamplerprocessor.Factory{},
)
Expand Down
2 changes: 2 additions & 0 deletions defaults/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector/processor"
"github.com/open-telemetry/opentelemetry-collector/processor/attributesprocessor"
"github.com/open-telemetry/opentelemetry-collector/processor/batchprocessor"
"github.com/open-telemetry/opentelemetry-collector/processor/memorylimiter"
"github.com/open-telemetry/opentelemetry-collector/processor/queuedprocessor"
"github.com/open-telemetry/opentelemetry-collector/processor/samplingprocessor/probabilisticsamplerprocessor"
"github.com/open-telemetry/opentelemetry-collector/processor/samplingprocessor/tailsamplingprocessor"
Expand Down Expand Up @@ -64,6 +65,7 @@ func TestDefaultComponents(t *testing.T) {
"attributes": &attributesprocessor.Factory{},
"queued_retry": &queuedprocessor.Factory{},
"batch": &batchprocessor.Factory{},
"memory_limiter": &memorylimiter.Factory{},
"tail_sampling": &tailsamplingprocessor.Factory{},
"probabilistic_sampler": &probabilisticsamplerprocessor.Factory{},
}
Expand Down
49 changes: 49 additions & 0 deletions processor/memorylimiter/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package memorylimiter provides a processor for OpenTelemetry Service pipeline
// that drops data on the pipeline according to the current state of memory
// usage.
package memorylimiter

import (
"time"

"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
)

// Config defines configuration for memory memoryLimiter processor.
type Config struct {
configmodels.ProcessorSettings `mapstructure:",squash"`

// CheckInterval is the time between measurements of memory usage for the
// purposes of avoiding going over the limits. Defaults to zero, so no
// checks will be performed.
CheckInterval time.Duration `mapstructure:"check_interval"`

// MemoryLimitMiB is the maximum amount of memory, in MiB, targeted to be
// allocated by the process.
MemoryLimitMiB uint32 `mapstructure:"limit_mib"`

// MemorySpikeLimitMiB is the maximum, in MiB, spike expected between the
// measurements of memory usage.
MemorySpikeLimitMiB uint32 `mapstructure:"spike_limit_mib"`

// BallastSizeMiB is the size, in MiB, of the ballast size being used by the
// process.
BallastSizeMiB uint32 `mapstructure:"ballast_size_mib"`
}

// Name of BallastSizeMiB config option.
const ballastSizeMibKey = "ballast_size_mib"
65 changes: 65 additions & 0 deletions processor/memorylimiter/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memorylimiter

import (
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
)

func TestLoadConfig(t *testing.T) {
factories, err := config.ExampleComponents()
require.NoError(t, err)
factory := &Factory{}
factories.Processors[typeStr] = factory
require.NoError(t, err)

config, err := config.LoadConfigFile(
t,
path.Join(".", "testdata", "config.yaml"),
factories)

require.Nil(t, err)
require.NotNil(t, config)

p0 := config.Processors["memory_limiter"]
assert.Equal(t, p0,
&Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "memory_limiter",
NameVal: "memory_limiter",
},
})

p1 := config.Processors["memory_limiter/with-settings"]
assert.Equal(t, p1,
&Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "memory_limiter",
NameVal: "memory_limiter/with-settings",
},
CheckInterval: 5 * time.Second,
MemoryLimitMiB: 4000,
MemorySpikeLimitMiB: 500,
BallastSizeMiB: 2000,
})
}
86 changes: 86 additions & 0 deletions processor/memorylimiter/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memorylimiter

import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/consumer"
"github.com/open-telemetry/opentelemetry-collector/processor"
)

const (
// The value of "type" Attribute Key in configuration.
typeStr = "memory_limiter"
)

// Factory is the factory for Attribute Key processor.
type Factory struct {
}

// Type gets the type of the config created by this factory.
func (f *Factory) Type() string {
return typeStr
}

// CreateDefaultConfig creates the default configuration for processor. Notice
// that the default configuration is expected to fail for this processor.
func (f *Factory) CreateDefaultConfig() configmodels.Processor {
return &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: typeStr,
NameVal: typeStr,
},
}
}

// CreateTraceProcessor creates a trace processor based on this config.
func (f *Factory) CreateTraceProcessor(
logger *zap.Logger,
nextConsumer consumer.TraceConsumer,
cfg configmodels.Processor,
) (processor.TraceProcessor, error) {
return f.createProcessor(logger, nextConsumer, nil, cfg)
}

// CreateMetricsProcessor creates a metrics processor based on this config.
func (f *Factory) CreateMetricsProcessor(
logger *zap.Logger,
nextConsumer consumer.MetricsConsumer,
cfg configmodels.Processor,
) (processor.MetricsProcessor, error) {
return f.createProcessor(logger, nil, nextConsumer, cfg)
}

func (f *Factory) createProcessor(
logger *zap.Logger,
traceConsumer consumer.TraceConsumer,
metricConsumer consumer.MetricsConsumer,
cfg configmodels.Processor,
) (processor.DualTypeProcessor, error) {
const mibBytes = 1024 * 1024
pCfg := cfg.(*Config)
return New(
cfg.Name(),
traceConsumer,
metricConsumer,
pCfg.CheckInterval,
uint64(pCfg.MemoryLimitMiB)*mibBytes,
uint64(pCfg.MemorySpikeLimitMiB)*mibBytes,
uint64(pCfg.BallastSizeMiB)*mibBytes,
logger,
)
}
69 changes: 69 additions & 0 deletions processor/memorylimiter/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memorylimiter

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector/config/configcheck"
"github.com/open-telemetry/opentelemetry-collector/exporter/exportertest"
)

func TestCreateDefaultConfig(t *testing.T) {
factory := &Factory{}
require.NotNil(t, factory)

cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, configcheck.ValidateConfig(cfg))
}

func TestCreateProcessor(t *testing.T) {
factory := &Factory{}
require.NotNil(t, factory)

cfg := factory.CreateDefaultConfig()

// This processor can't be created with the default config.
tp, err := factory.CreateTraceProcessor(zap.NewNop(), exportertest.NewNopTraceExporter(), cfg)
assert.Nil(t, tp)
assert.Error(t, err, "created processor with invalid settings")

mp, err := factory.CreateMetricsProcessor(zap.NewNop(), exportertest.NewNopMetricsExporter(), cfg)
assert.Nil(t, mp)
assert.Error(t, err, "created processor with invalid settings")

// Create processor with a valid config.
pCfg := cfg.(*Config)
pCfg.MemoryLimitMiB = 5722
pCfg.MemorySpikeLimitMiB = 1907
pCfg.BallastSizeMiB = 2048
pCfg.CheckInterval = 100 * time.Millisecond

tp, err = factory.CreateTraceProcessor(zap.NewNop(), exportertest.NewNopTraceExporter(), cfg)
assert.NoError(t, err)
assert.NotNil(t, tp)
assert.NoError(t, tp.Shutdown())

mp, err = factory.CreateMetricsProcessor(zap.NewNop(), exportertest.NewNopMetricsExporter(), cfg)
assert.NoError(t, err)
assert.NotNil(t, mp)
assert.NoError(t, mp.Shutdown())
}
Loading

0 comments on commit 21a70d6

Please sign in to comment.