Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Add cost enforcer to graphite queries #1449

Merged
merged 5 commits into from
Mar 13, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 64 additions & 19 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/storage/m3"
"github.com/m3db/m3/src/x/cost"
xdocs "github.com/m3db/m3/src/x/docs"
xconfig "github.com/m3db/m3x/config"
"github.com/m3db/m3x/config/listenaddress"
Expand Down Expand Up @@ -62,12 +63,6 @@ var (
defaultLookbackDuration = 5 * time.Minute

defaultCarbonIngesterAggregationType = aggregation.Mean

// defaultLimitsConfiguration is applied if `limits` isn't specified.
defaultLimitsConfiguration = &LimitsConfiguration{
// this is sufficient for 1 day span / 1s step, or 60 days with a 1m step.
MaxComputedDatapoints: 86400,
}
)

// Configuration is the configuration for the query service.
Expand Down Expand Up @@ -121,7 +116,7 @@ type Configuration struct {
Carbon *CarbonConfiguration `yaml:"carbon"`

// Limits specifies limits on per-query resource usage.
Limits *LimitsConfiguration `yaml:"limits"`
Limits LimitsConfiguration `yaml:"limits"`

// LookbackDuration determines the lookback duration for queries
LookbackDuration *time.Duration `yaml:"lookbackDuration"`
Expand Down Expand Up @@ -191,9 +186,69 @@ func (q *QueryConversionCacheConfiguration) Validate() error {
return nil
}

// LimitsConfiguration represents limitations on per-query resource usage. Zero or negative values imply no limit.
// LimitsConfiguration represents limitations on resource usage in the query instance. Limits are split between per-query
// and global limits.
type LimitsConfiguration struct {
MaxComputedDatapoints int64 `yaml:"maxComputedDatapoints"`
// deprecated: use PerQuery.MaxComputedDatapoints instead.
DeprecatedMaxComputedDatapoints int64 `yaml:"maxComputedDatapoints"`

// Global configures limits which apply across all queries running on this
// instance.
Global GlobalLimitsConfiguration `yaml:"global"`

// PerQuery configures limits which apply to each query individually.
PerQuery PerQueryLimitsConfiguration `yaml:"perQuery"`
}

// MaxComputedDatapoints is a getter providing backwards compatibility between
// LimitsConfiguration.DeprecatedMaxComputedDatapoints and
// LimitsConfiguration.PerQuery.PrivateMaxComputedDatapoints. See
// LimitsConfiguration.PerQuery.PrivateMaxComputedDatapoints for a comment on
// the semantics.
func (lc *LimitsConfiguration) MaxComputedDatapoints() int64 {
if lc.PerQuery.PrivateMaxComputedDatapoints != 0 {
return lc.PerQuery.PrivateMaxComputedDatapoints
}

return lc.DeprecatedMaxComputedDatapoints
}

// GlobalLimitsConfiguration represents limits on resource usage across a query instance. Zero or negative values imply no limit.
type GlobalLimitsConfiguration struct {
// MaxFetchedDatapoints limits the total number of datapoints actually fetched by all queries at any given time.
MaxFetchedDatapoints int64 `yaml:"maxFetchedDatapoints"`
}

// AsLimitManagerOptions converts this configuration to cost.LimitManagerOptions for MaxFetchedDatapoints.
func (l *GlobalLimitsConfiguration) AsLimitManagerOptions() cost.LimitManagerOptions {
return toLimitManagerOptions(l.MaxFetchedDatapoints)
}

// PerQueryLimitsConfiguration represents limits on resource usage within a single query. Zero or negative values imply no limit.
type PerQueryLimitsConfiguration struct {
// PrivateMaxComputedDatapoints limits the number of datapoints that can be
// returned by a query. It's determined purely
// from the size of the time range and the step size (end - start / step).
//
// N.B.: the hacky "Private" prefix is to indicate that callers should use
// LimitsConfiguration.MaxComputedDatapoints() instead of accessing
// this field directly.
PrivateMaxComputedDatapoints int64 `yaml:"maxComputedDatapoints"`

// MaxFetchedDatapoints limits the number of datapoints actually used by a given query.
MaxFetchedDatapoints int64 `yaml:"maxFetchedDatapoints"`
}

// AsLimitManagerOptions converts this configuration to cost.LimitManagerOptions for MaxFetchedDatapoints.
func (l *PerQueryLimitsConfiguration) AsLimitManagerOptions() cost.LimitManagerOptions {
return toLimitManagerOptions(l.MaxFetchedDatapoints)
}

func toLimitManagerOptions(limit int64) cost.LimitManagerOptions {
return cost.NewLimitManagerOptions().SetDefaultLimit(cost.Limit{
Threshold: cost.Cost(limit),
Enabled: limit > 0,
})
}

// IngestConfiguration is the configuration for ingestion server.
Expand Down Expand Up @@ -232,16 +287,6 @@ func (c Configuration) LookbackDurationOrDefault() (time.Duration, error) {
return v, nil
}

// LimitsOrDefault returns the specified limit configuration if provided, or the
// default value otherwise.
func (c Configuration) LimitsOrDefault() *LimitsConfiguration {
if c.Limits != nil {
return c.Limits
}

return defaultLimitsConfiguration
}

// ListenAddressOrDefault returns the specified carbon ingester listen address if provided, or the
// default value if not.
func (c *CarbonIngesterConfiguration) ListenAddressOrDefault() string {
Expand Down
110 changes: 103 additions & 7 deletions src/cmd/services/m3query/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"testing"

"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/x/cost"
xdocs "github.com/m3db/m3/src/x/docs"
xconfig "github.com/m3db/m3x/config"

Expand All @@ -34,6 +35,8 @@ import (
yaml "gopkg.in/yaml.v2"
)

const testConfigFile = "./testdata/config.yml"

func TestTagOptionsFromEmptyConfigErrors(t *testing.T) {
cfg := TagOptionsConfiguration{}
opts, err := TagOptionsFromConfig(cfg)
Expand Down Expand Up @@ -69,20 +72,112 @@ func TestTagOptionsFromConfig(t *testing.T) {
assert.Equal(t, []byte(name), opts.MetricName())
}

func TestLimitsConfiguration_AsLimitManagerOptions(t *testing.T) {
cases := []struct {
Input interface {
AsLimitManagerOptions() cost.LimitManagerOptions
}
ExpectedDefault int64
}{{
Input: &PerQueryLimitsConfiguration{
MaxFetchedDatapoints: 5,
},
ExpectedDefault: 5,
}, {
Input: &GlobalLimitsConfiguration{
MaxFetchedDatapoints: 6,
},
ExpectedDefault: 6,
}}

for _, tc := range cases {
t.Run(fmt.Sprintf("type_%T", tc.Input), func(t *testing.T) {
res := tc.Input.AsLimitManagerOptions()
assert.Equal(t, cost.Limit{
Threshold: cost.Cost(tc.ExpectedDefault),
Enabled: true,
}, res.DefaultLimit())
})
}
}

func TestLimitsConfiguration_MaxComputedDatapoints(t *testing.T) {
t.Run("uses PerQuery value if provided", func(t *testing.T) {
lc := &LimitsConfiguration{
DeprecatedMaxComputedDatapoints: 6,
PerQuery: PerQueryLimitsConfiguration{
PrivateMaxComputedDatapoints: 5,
},
}

assert.Equal(t, int64(5), lc.MaxComputedDatapoints())
})

t.Run("uses deprecated value if PerQuery not provided", func(t *testing.T) {
lc := &LimitsConfiguration{
DeprecatedMaxComputedDatapoints: 6,
}

assert.Equal(t, int64(6), lc.MaxComputedDatapoints())
})
}

func TestToLimitManagerOptions(t *testing.T) {
cases := []struct {
Name string
Input int64
ExpectedLimit cost.Limit
}{{
Name: "negative is disabled",
Input: -5,
ExpectedLimit: cost.Limit{
Threshold: cost.Cost(-5),
Enabled: false,
},
}, {
Name: "zero is disabled",
Input: 0,
ExpectedLimit: cost.Limit{
Threshold: cost.Cost(0),
Enabled: false,
},
}, {
Name: "positive is enabled",
Input: 5,
ExpectedLimit: cost.Limit{
Threshold: cost.Cost(5),
Enabled: true,
},
}}

for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
assert.Equal(t, tc.ExpectedLimit, toLimitManagerOptions(tc.Input).DefaultLimit())
})
}
}

func TestConfigLoading(t *testing.T) {
var cfg Configuration
require.NoError(t, xconfig.LoadFile(&cfg, "./testdata/sample_config.yml", xconfig.Options{}))
require.NoError(t, xconfig.LoadFile(&cfg, testConfigFile, xconfig.Options{}))

assert.Equal(t, &LimitsConfiguration{
MaxComputedDatapoints: 12000,
}, cfg.Limits)
DeprecatedMaxComputedDatapoints: 10555,
PerQuery: PerQueryLimitsConfiguration{
PrivateMaxComputedDatapoints: 12000,
MaxFetchedDatapoints: 11000,
},
Global: GlobalLimitsConfiguration{
MaxFetchedDatapoints: 13000,
},
}, &cfg.Limits)
// TODO: assert on more fields here.
}

func TestConfigValidation(t *testing.T) {
baseCfg := func(t *testing.T) *Configuration {
var cfg Configuration
require.NoError(t, xconfig.LoadFile(&cfg, "./testdata/sample_config.yml", xconfig.Options{}),
require.NoError(t, xconfig.LoadFile(&cfg, testConfigFile, xconfig.Options{}),
"sample configuration is no longer valid or loadable. Fix it up to provide a base config here")

return &cfg
Expand All @@ -106,9 +201,10 @@ func TestConfigValidation(t *testing.T) {
for _, tc := range limitsCfgCases {
t.Run(tc.Name, func(t *testing.T) {
cfg := baseCfg(t)
cfg.Limits = &LimitsConfiguration{
MaxComputedDatapoints: 5,
}
cfg.Limits = LimitsConfiguration{
PerQuery: PerQueryLimitsConfiguration{
PrivateMaxComputedDatapoints: tc.Limit,
}}

assert.NoError(t, validator.Validate(cfg))
})
Expand Down
59 changes: 59 additions & 0 deletions src/cmd/services/m3query/config/testdata/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
listenAddress:
type: "config"
value: "0.0.0.0:7201"

metrics:
scope:
prefix: "coordinator"
prometheus:
handlerPath: /metrics
listenAddress: 0.0.0.0:7203 # until https://github.com/m3db/m3/issues/682 is resolved
sanitization: prometheus
samplingRate: 1.0
extended: none

clusters:
- namespaces:
- namespace: default
type: unaggregated
retention: 48h
client:
config:
service:
env: default_env
zone: embedded
service: m3db
cacheDir: /var/lib/m3kv
etcdClusters:
- zone: embedded
endpoints:
- 127.0.0.1:2379
seedNodes:
initialCluster:
- hostID: m3db_local
endpoint: http://127.0.0.1:2380
writeConsistencyLevel: majority
readConsistencyLevel: unstrict_majority
writeTimeout: 10s
fetchTimeout: 15s
connectTimeout: 20s
writeRetry:
initialBackoff: 500ms
backoffFactor: 3
maxRetries: 2
jitter: true
fetchRetry:
initialBackoff: 500ms
backoffFactor: 2
maxRetries: 3
jitter: true
backgroundHealthCheckFailLimit: 4
backgroundHealthCheckFailThrottleFactor: 0.5

limits:
maxComputedDatapoints: 10555
perQuery:
maxComputedDatapoints: 12000
maxFetchedDatapoints: 11000
global:
maxFetchedDatapoints: 13000
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,8 @@ clusters:
backgroundHealthCheckFailThrottleFactor: 0.5

limits:
maxComputedDatapoints: 12000
perQuery:
maxComputedDatapoints: 12000
maxFetchedDatapoints: 11000
global:
maxFetchedDatapoints: 13000
4 changes: 3 additions & 1 deletion src/query/api/v1/handler/graphite/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"sync"

"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/cost"
"github.com/m3db/m3/src/query/graphite/common"
"github.com/m3db/m3/src/query/graphite/errors"
"github.com/m3db/m3/src/query/graphite/native"
Expand Down Expand Up @@ -62,8 +63,9 @@ type respError struct {
// NewRenderHandler returns a new render handler around the given storage.
func NewRenderHandler(
storage storage.Storage,
enforcer cost.ChainedEnforcer,
) http.Handler {
wrappedStore := graphite.NewM3WrappedStorage(storage)
wrappedStore := graphite.NewM3WrappedStorage(storage, enforcer)
return &renderHandler{
engine: native.NewEngine(wrappedStore),
}
Expand Down
Loading