Skip to content

Commit

Permalink
fix(checker): Add cap on metric fetching to prevent checker OOM
Browse files Browse the repository at this point in the history
Closes #190
  • Loading branch information
beevee committed Mar 26, 2020
1 parent ea96709 commit fda16c1
Show file tree
Hide file tree
Showing 24 changed files with 110 additions and 43 deletions.
67 changes: 33 additions & 34 deletions checker/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,14 +491,12 @@ func TestCheckErrors(t *testing.T) {
database: dataBase,
source: source,
logger: logger,
config: &Config{
MetricsTTLSeconds: 10,
},
metrics: checkerMetrics.LocalMetrics,
from: 17,
until: 67,
ttl: ttl,
ttlState: moira.TTLStateNODATA,
config: &Config{},
metrics: checkerMetrics.LocalMetrics,
from: 17,
until: 67,
ttl: ttl,
ttlState: moira.TTLStateNODATA,
trigger: &moira.Trigger{
Name: "Super trigger",
ErrorValue: &errValue,
Expand Down Expand Up @@ -530,6 +528,7 @@ func TestCheckErrors(t *testing.T) {
}

source.EXPECT().Fetch(pattern, triggerChecker.from, triggerChecker.until, true).Return(nil, metricErr)
dataBase.EXPECT().GetMetricsTTLSeconds().Return(int64(10))
dataBase.EXPECT().SetTriggerLastCheck(triggerChecker.triggerID, &lastCheck, triggerChecker.trigger.IsRemote).Return(nil)
err := triggerChecker.Check()
So(err, ShouldBeNil)
Expand Down Expand Up @@ -634,13 +633,11 @@ func TestIgnoreNodataToOk(t *testing.T) {
database: dataBase,
source: source,
logger: logger,
config: &Config{
MetricsTTLSeconds: 3600,
},
from: 3617,
until: 3667,
ttl: ttl,
ttlState: moira.TTLStateNODATA,
config: &Config{},
from: 3617,
until: 3667,
ttl: ttl,
ttlState: moira.TTLStateNODATA,
trigger: &moira.Trigger{
ErrorValue: &errValue,
WarnValue: &warnValue,
Expand All @@ -656,7 +653,8 @@ func TestIgnoreNodataToOk(t *testing.T) {
source.EXPECT().Fetch(pattern, triggerChecker.from, triggerChecker.until, true).Return(fetchResult, nil)
fetchResult.EXPECT().GetMetricsData().Return([]*metricSource.MetricData{metricSource.MakeMetricData(metric, []float64{0, 1, 2, 3, 4}, retention, triggerChecker.from)})
fetchResult.EXPECT().GetPatternMetrics().Return([]string{metric}, nil)
dataBase.EXPECT().RemoveMetricsValues([]string{metric}, triggerChecker.until-triggerChecker.config.MetricsTTLSeconds)
dataBase.EXPECT().GetMetricsTTLSeconds().Return(int64(3600))
dataBase.EXPECT().RemoveMetricsValues([]string{metric}, triggerChecker.until-3600)
checkData, err := triggerChecker.checkTrigger()
So(err, ShouldBeNil)
So(checkData, ShouldResemble, moira.CheckData{
Expand Down Expand Up @@ -700,13 +698,11 @@ func TestHandleTrigger(t *testing.T) {
database: dataBase,
source: source,
logger: logger,
config: &Config{
MetricsTTLSeconds: 3600,
},
from: 3617,
until: 3667,
ttl: ttl,
ttlState: moira.TTLStateNODATA,
config: &Config{},
from: 3617,
until: 3667,
ttl: ttl,
ttlState: moira.TTLStateNODATA,
trigger: &moira.Trigger{
ErrorValue: &errValue,
WarnValue: &warnValue,
Expand All @@ -723,7 +719,8 @@ func TestHandleTrigger(t *testing.T) {
fetchResult.EXPECT().GetPatternMetrics().Return([]string{metric}, nil)
var val float64
var val1 float64 = 4
dataBase.EXPECT().RemoveMetricsValues([]string{metric}, triggerChecker.until-triggerChecker.config.MetricsTTLSeconds)
dataBase.EXPECT().GetMetricsTTLSeconds().Return(int64(3600))
dataBase.EXPECT().RemoveMetricsValues([]string{metric}, triggerChecker.until-3600)
dataBase.EXPECT().PushNotificationEvent(&moira.NotificationEvent{
TriggerID: triggerChecker.triggerID,
Timestamp: 3617,
Expand Down Expand Up @@ -767,7 +764,8 @@ func TestHandleTrigger(t *testing.T) {
source.EXPECT().Fetch(pattern, triggerChecker.from, triggerChecker.until, true).Return(fetchResult, nil)
fetchResult.EXPECT().GetMetricsData().Return([]*metricSource.MetricData{metricSource.MakeMetricData(metric, []float64{0, 1, 2, 3, 4}, retention, triggerChecker.from)})
fetchResult.EXPECT().GetPatternMetrics().Return([]string{metric}, nil)
dataBase.EXPECT().RemoveMetricsValues([]string{metric}, triggerChecker.until-triggerChecker.config.MetricsTTLSeconds)
dataBase.EXPECT().GetMetricsTTLSeconds().Return(int64(3600))
dataBase.EXPECT().RemoveMetricsValues([]string{metric}, triggerChecker.until-3600)
checkData, err := triggerChecker.checkTrigger()
So(err, ShouldBeNil)
var val1 float64 = 4
Expand All @@ -793,7 +791,8 @@ func TestHandleTrigger(t *testing.T) {
source.EXPECT().Fetch(pattern, triggerChecker.from, triggerChecker.until, true).Return(fetchResult, nil)
fetchResult.EXPECT().GetMetricsData().Return([]*metricSource.MetricData{metricSource.MakeMetricData(metric, []float64{}, retention, triggerChecker.from)})
fetchResult.EXPECT().GetPatternMetrics().Return([]string{metric}, nil)
dataBase.EXPECT().RemoveMetricsValues([]string{metric}, triggerChecker.until-triggerChecker.config.MetricsTTLSeconds)
dataBase.EXPECT().GetMetricsTTLSeconds().Return(int64(3600))
dataBase.EXPECT().RemoveMetricsValues([]string{metric}, triggerChecker.until-3600)
dataBase.EXPECT().PushNotificationEvent(&moira.NotificationEvent{
TriggerID: triggerChecker.triggerID,
Timestamp: lastCheck.Timestamp,
Expand Down Expand Up @@ -830,13 +829,11 @@ func TestHandleTrigger(t *testing.T) {
database: dataBase,
source: source,
logger: logger,
config: &Config{
MetricsTTLSeconds: 3600,
},
from: 3617,
until: 3667,
ttl: ttl,
ttlState: moira.TTLStateNODATA,
config: &Config{},
from: 3617,
until: 3667,
ttl: ttl,
ttlState: moira.TTLStateNODATA,
trigger: &moira.Trigger{
ErrorValue: &errValue,
WarnValue: &warnValue,
Expand All @@ -857,6 +854,7 @@ func TestHandleTrigger(t *testing.T) {
metricSource.MakeMetricData("super", []float64{0, 1, 2, 3}, retention, triggerChecker1.from),
})
fetchResult.EXPECT().GetPatternMetrics().Return([]string{metric1, metric2}, nil)
dataBase.EXPECT().GetMetricsTTLSeconds().Return(int64(3600))
dataBase.EXPECT().RemoveMetricsValues([]string{metric1, metric2}, gomock.Any())
dataBase.EXPECT().PushNotificationEvent(gomock.Any(), true).Return(nil)
checkData, err := triggerChecker1.checkTrigger()
Expand Down Expand Up @@ -890,7 +888,8 @@ func TestHandleTrigger(t *testing.T) {
source.EXPECT().Fetch(pattern, triggerChecker.from, triggerChecker.until, true).Return(fetchResult, nil)
fetchResult.EXPECT().GetMetricsData().Return([]*metricSource.MetricData{metricSource.MakeMetricData(metric, []float64{}, retention, triggerChecker.from)})
fetchResult.EXPECT().GetPatternMetrics().Return([]string{metric}, nil)
dataBase.EXPECT().RemoveMetricsValues([]string{metric}, triggerChecker.until-triggerChecker.config.MetricsTTLSeconds)
dataBase.EXPECT().GetMetricsTTLSeconds().Return(int64(3600))
dataBase.EXPECT().RemoveMetricsValues([]string{metric}, triggerChecker.until-3600)
dataBase.EXPECT().RemovePatternsMetrics(triggerChecker.trigger.Patterns).Return(nil)

checkData, err := triggerChecker.checkTrigger()
Expand Down
1 change: 0 additions & 1 deletion checker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ type Config struct {
NoDataCheckInterval time.Duration
CheckInterval time.Duration
LazyTriggersCheckInterval time.Duration
MetricsTTLSeconds int64
StopCheckingIntervalSeconds int64
MaxParallelChecks int
MaxParallelRemoteChecks int
Expand Down
2 changes: 1 addition & 1 deletion checker/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (triggerChecker *TriggerChecker) fetch() (*metricSource.TriggerMetricsData,

func (triggerChecker *TriggerChecker) cleanupMetricsValues(metrics []string, until int64) {
if len(metrics) > 0 {
if err := triggerChecker.database.RemoveMetricsValues(metrics, until-triggerChecker.config.MetricsTTLSeconds); err != nil {
if err := triggerChecker.database.RemoveMetricsValues(metrics, until-triggerChecker.database.GetMetricsTTLSeconds()); err != nil {
triggerChecker.logger.Error(err.Error())
}
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func getDefault() config {
Host: "localhost",
Port: "6379",
ConnectionLimit: 512,
MetricsTTL: "1h",
},
Logger: cmd.LoggerConfig{
LogFile: "stdout",
Expand All @@ -107,7 +108,8 @@ func getDefault() config {
Pprof: cmd.ProfilerConfig{Enabled: false},
},
Remote: cmd.RemoteConfig{
Timeout: "60s",
Timeout: "60s",
MetricsTTL: "7d",
},
}
}
6 changes: 2 additions & 4 deletions cmd/checker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ type checkerConfig struct {
// Max period to perform lazy triggers re-check. Note: lazy triggers are triggers which has no subscription for it. Moira will check its state less frequently.
// Delay for check lazy trigger is random between LazyTriggersCheckInterval/2 and LazyTriggersCheckInterval.
LazyTriggersCheckInterval string `yaml:"lazy_triggers_check_interval"`
// Time interval to store metrics. Note: Increasing of this value leads to increasing of Redis memory consumption value
MetricsTTL string `yaml:"metrics_ttl"`
// Max concurrent checkers to run. Equals to the number of processor cores found on Moira host by default or when variable is defined as 0.
MaxParallelChecks int `yaml:"max_parallel_checks"`
// Max concurrent remote checkers to run. Equals to the number of processor cores found on Moira host by default or when variable is defined as 0.
Expand All @@ -34,7 +32,6 @@ type checkerConfig struct {

func (config *checkerConfig) getSettings() *checker.Config {
return &checker.Config{
MetricsTTLSeconds: int64(to.Duration(config.MetricsTTL).Seconds()),
CheckInterval: to.Duration(config.CheckInterval),
LazyTriggersCheckInterval: to.Duration(config.LazyTriggersCheckInterval),
NoDataCheckInterval: to.Duration(config.NoDataCheckInterval),
Expand All @@ -50,6 +47,7 @@ func getDefault() config {
Host: "localhost",
Port: "6379",
ConnectionLimit: 512,
MetricsTTL: "1h",
},
Logger: cmd.LoggerConfig{
LogFile: "stdout",
Expand All @@ -59,7 +57,6 @@ func getDefault() config {
NoDataCheckInterval: "60s",
CheckInterval: "5s",
LazyTriggersCheckInterval: "10m",
MetricsTTL: "1h",
StopCheckingInterval: "30s",
MaxParallelChecks: 0,
MaxParallelRemoteChecks: 0,
Expand All @@ -78,6 +75,7 @@ func getDefault() config {
Remote: cmd.RemoteConfig{
CheckInterval: "60s",
Timeout: "60s",
MetricsTTL: "7d",
},
}
}
1 change: 1 addition & 0 deletions cmd/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func getDefault() config {
Host: "localhost",
Port: "6379",
ConnectionLimit: 512,
MetricsTTL: "1h",
},
Cleanup: cleanupConfig{
Whitelist: []string{},
Expand Down
9 changes: 9 additions & 0 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type RedisConfig struct {
// Redis database
DB int `yaml:"dbid"`
ConnectionLimit int `yaml:"connection_limit"`
// Moira will delete metrics older than this value from Redis. Large values will lead to various problems everywhere.
// See https://github.com/moira-alert/moira/pull/519
MetricsTTL string `yaml:"metrics_ttl"`
}

// GetSettings returns redis config parsed from moira config files
Expand All @@ -41,6 +44,7 @@ func (config *RedisConfig) GetSettings() redis.Config {
Port: config.Port,
DB: config.DB,
ConnectionLimit: config.ConnectionLimit,
MetricsTTL: to.Duration(config.MetricsTTL),
}
}

Expand Down Expand Up @@ -93,6 +97,10 @@ type RemoteConfig struct {
URL string `yaml:"url"`
// Min period to perform triggers re-check. Note: Reducing of this value leads to increasing of CPU and memory usage values
CheckInterval string `yaml:"check_interval"`
// Moira won't fetch metrics older than this value from remote storage. Note that Moira doesn't delete old data from
// remote storage. Large values will lead to OOM problems in checker.
// See https://github.com/moira-alert/moira/pull/519
MetricsTTL string `yaml:"metrics_ttl"`
// Timeout for remote requests
Timeout string `yaml:"timeout"`
// Username for basic auth
Expand All @@ -113,6 +121,7 @@ func (config *RemoteConfig) GetRemoteSourceSettings() *remoteSource.Config {
return &remoteSource.Config{
URL: config.URL,
CheckInterval: to.Duration(config.CheckInterval),
MetricsTTL: to.Duration(config.MetricsTTL),
Timeout: to.Duration(config.Timeout),
User: config.User,
Password: config.Password,
Expand Down
1 change: 1 addition & 0 deletions cmd/filter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func getDefault() config {
Host: "localhost",
Port: "6379",
ConnectionLimit: 512,
MetricsTTL: "1h",
},
Logger: cmd.LoggerConfig{
LogFile: "stdout",
Expand Down
4 changes: 3 additions & 1 deletion cmd/notifier/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func getDefault() config {
Host: "localhost",
Port: "6379",
ConnectionLimit: 512,
MetricsTTL: "1h",
},

Logger: cmd.LoggerConfig{
Expand Down Expand Up @@ -94,7 +95,8 @@ func getDefault() config {
Pprof: cmd.ProfilerConfig{Enabled: false},
},
Remote: cmd.RemoteConfig{
Timeout: "60s",
Timeout: "60s",
MetricsTTL: "24h",
},
ImageStores: cmd.ImageStoreConfig{},
}
Expand Down
3 changes: 3 additions & 0 deletions database/redis/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package redis

import "time"

// Config - Redis database connection config
type Config struct {
MasterName string
Expand All @@ -8,4 +10,5 @@ type Config struct {
Port string
DB int
ConnectionLimit int
MetricsTTL time.Duration
}
2 changes: 2 additions & 0 deletions database/redis/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type DbConnector struct {
retentionSavingCache *cache.Cache
metricsCache *cache.Cache
sync *redsync.Redsync
metricsTTLSeconds int64
source DBSource
}

Expand Down Expand Up @@ -76,6 +77,7 @@ func NewDatabase(logger moira.Logger, config Config, source DBSource) *DbConnect
retentionSavingCache: cache.New(cache.NoExpiration, cache.DefaultExpiration),
metricsCache: cache.New(cacheValueExpirationDuration, cacheCleanupInterval),
sync: redsync.New([]redsync.Pool{syncPool}),
metricsTTLSeconds: int64(config.MetricsTTL.Seconds()),
source: source,
}
}
Expand Down
5 changes: 5 additions & 0 deletions database/redis/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ func (connector *DbConnector) RemoveMetricValues(metric string, toTime int64) er
return nil
}

// GetMetricsTTLSeconds returns maximum time in seconds to store metrics in Redis
func (connector *DbConnector) GetMetricsTTLSeconds() int64 {
return connector.metricsTTLSeconds
}

// RemoveMetricsValues remove metrics timestamps values from 0 to given time
func (connector *DbConnector) RemoveMetricsValues(metrics []string, toTime int64) error {
c := connector.pool.Get()
Expand Down
7 changes: 7 additions & 0 deletions helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,10 @@ func ChunkSlice(original []string, chunkSize int) (divided [][]string) {
func RoundToNearestRetention(ts, retention int64) int64 {
return (ts + retention/2) / retention * retention
}

func MaxInt64(a, b int64) int64 {
if a > b {
return a
}
return b
}
1 change: 1 addition & 0 deletions interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type Database interface {
GetMetricsValues(metrics []string, from int64, until int64) (map[string][]*MetricValue, error)
RemoveMetricValues(metric string, toTime int64) error
RemoveMetricsValues(metrics []string, toTime int64) error
GetMetricsTTLSeconds() int64

AddLocalTriggersToCheck(triggerIDs []string) error
GetLocalTriggersToCheck(count int) ([]string, error)
Expand Down
2 changes: 2 additions & 0 deletions local/api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ redis:
host: redis
port: "6379"
dbid: 0
metrics_ttl: 3h
telemetry:
graphite:
enabled: true
Expand All @@ -18,6 +19,7 @@ remote:
url: "http://graphite:80/render"
check_interval: 60s
timeout: 60s
metrics_ttl: 7d
api:
listen: ":8081"
enable_cors: false
Expand Down
3 changes: 2 additions & 1 deletion local/checker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ redis:
host: redis
port: "6379"
dbid: 0
metrics_ttl: 3h
telemetry:
graphite:
enabled: true
Expand All @@ -18,10 +19,10 @@ remote:
url: "http://graphite:80/render"
check_interval: 60s
timeout: 60s
metrics_ttl: 7d
checker:
nodata_check_interval: 60s
check_interval: 10s
metrics_ttl: 3h
stop_checking_interval: 30s
log:
log_file: stdout
Expand Down
1 change: 1 addition & 0 deletions local/cli.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ redis:
host: redis
port: "6379"
dbid: 0
metrics_ttl: 3h
log_file: stdout
log_level: debug

Loading

0 comments on commit fda16c1

Please sign in to comment.