Skip to content

Commit

Permalink
[query] Fix query limit metrics reporting not starting at startup (#2228
Browse files Browse the repository at this point in the history
)
  • Loading branch information
robskillington authored Mar 26, 2020
1 parent 6c3bf71 commit 02d1877
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 56 deletions.
5 changes: 3 additions & 2 deletions scripts/development/m3_stack/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ services:
dockerfile: ./docker/m3coordinator/development.Dockerfile
image: m3coordinator:dev
volumes:
# Use a path in the bin directory (gitignored) to easily change configs
- "../../../bin/m3coordinator.yml:/etc/m3coordinator/m3coordinator.yml"
# Use a git ignored path to easily change pre-set configs.
# Note: Use ".tmp" suffix is git ignored.
- "./m3coordinator.yml.tmp:/etc/m3coordinator/m3coordinator.yml"
- "./schema.proto:/etc/m3coordinator/schema.proto"
m3collector01:
expose:
Expand Down
7 changes: 5 additions & 2 deletions scripts/development/m3_stack/start_m3.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ else
fi

# Use standard coordinator config when bringing up coordinator first time
cp ./m3coordinator-standard.yml ${RELATIVE}/bin/m3coordinator.yml
# Note: Use ".tmp" suffix to be git ignored.
cp ./m3coordinator-standard.yml ./m3coordinator.yml.tmp

if [[ "$M3COORDINATOR_DEV_IMG" == "0" ]] || [[ "$FORCE_BUILD" == true ]] || [[ "$BUILD_M3COORDINATOR" == true ]]; then
prepare_build_cmd "make m3coordinator-linux-amd64"
Expand Down Expand Up @@ -298,7 +299,9 @@ if [[ "$USE_AGGREGATOR" = true ]]; then

# Restart with aggregator coordinator config
docker-compose -f docker-compose.yml stop m3coordinator01
cp ./m3coordinator-aggregator.yml ${RELATIVE}/bin/m3coordinator.yml

# Note: Use ".tmp" suffix to be git ignored.
cp ./m3coordinator-aggregator.yml ./m3coordinator.yml.tmp
docker-compose -f docker-compose.yml up $DOCKER_ARGS m3coordinator01

# May not necessarily flush
Expand Down
108 changes: 77 additions & 31 deletions src/query/server/cost_reporters.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,22 @@ package server
// This file contains reporters and setup for our query/cost.ChainedEnforcer
// instances.
import (
"fmt"
"sync"

"github.com/m3db/m3/src/cmd/services/m3query/config"
qcost "github.com/m3db/m3/src/query/cost"
"github.com/m3db/m3/src/x/close"
"github.com/m3db/m3/src/x/cost"
"github.com/m3db/m3/src/x/instrument"

"github.com/uber-go/tally"
)

const (
costScopeName = "cost"
limitManagerScopeName = "limits"
reporterScopeName = "reporter"
queriesOverLimitMetric = "over_datapoints_limit"
datapointsMetric = "datapoints"
datapointsCounterMetric = "datapoints_counter"
Expand All @@ -45,49 +50,90 @@ const (
// on them (as configured by cfg.Limits); per-block is purely for accounting
// purposes.
// Our enforcers report at least these stats:
// cost.global.datapoints: gauge; the number of datapoints currently in use
// by this instance.
// cost_reporter_datapoints{limit="global"}: gauge;
// > the number of datapoints currently in use by this instance.
//
// cost.global.datapoints_counter: counter; counter representation of the
// number of datapoints in use by this instance
// cost_reporter_datapoints_counter{limiter="global"}: counter;
// > counter representation of the number of datapoints in use by this instance.
//
// cost.{per_query,global}.over_datapoints_limit: counter; how many queries are over the
// datapoint limit
// cost_reporter_over_datapoints_limit{limiter=~"(global|per_query)"}: counter;
// > how many queries are over the datapoint limit.
//
// cost.per_query.max_datapoints_hist: histogram; represents the
// distribution of the maximum datapoints used at any point in each query.
func newConfiguredChainedEnforcer(cfg *config.Configuration, instrumentOptions instrument.Options) (qcost.ChainedEnforcer, error) {
costScope := instrumentOptions.MetricsScope().SubScope("cost")
costIops := instrumentOptions.SetMetricsScope(costScope)
limitMgr := cost.NewStaticLimitManager(cfg.Limits.Global.AsLimitManagerOptions().SetInstrumentOptions(costIops))
tracker := cost.NewTracker()

globalEnforcer := cost.NewEnforcer(limitMgr, tracker,
cost.NewEnforcerOptions().SetReporter(
newGlobalReporter(costScope.SubScope("global")),
).SetCostExceededMessage("limits.global.maxFetchedDatapoints exceeded"),
)

queryEnforcerOpts := cost.NewEnforcerOptions().SetCostExceededMessage("limits.perQuery.maxFetchedDatapoints exceeded").
SetReporter(newPerQueryReporter(costScope.
SubScope("per_query")))

queryEnforcer := cost.NewEnforcer(
cost.NewStaticLimitManager(cfg.Limits.PerQuery.AsLimitManagerOptions()),
cost.NewTracker(),
queryEnforcerOpts)
// cost_reporter_max_datapoints_hist{limiter=~"(global|per_query)"}: histogram;
// > represents the distribution of the maximum datapoints used at any point in each query.
func newConfiguredChainedEnforcer(
cfg *config.Configuration,
instrumentOptions instrument.Options,
) (qcost.ChainedEnforcer, close.SimpleCloser, error) {
scope := instrumentOptions.MetricsScope().SubScope(costScopeName)

exceededMessage := func(exceedType, exceedLimit string) string {
return fmt.Sprintf("exceeded limits.%s.%s", exceedType, exceedLimit)
}

// Create global limit manager and enforcer.
globalScope := scope.Tagged(map[string]string{
"limiter": "global",
})
globalLimitManagerScope := globalScope.SubScope(limitManagerScopeName)
globalReporterScope := globalScope.SubScope(reporterScopeName)

globalLimitMgr := cost.NewStaticLimitManager(
cfg.Limits.Global.AsLimitManagerOptions().
SetInstrumentOptions(instrumentOptions.SetMetricsScope(globalLimitManagerScope)))

globalTracker := cost.NewTracker()

globalEnforcer := cost.NewEnforcer(globalLimitMgr, globalTracker,
cost.NewEnforcerOptions().
SetReporter(newGlobalReporter(globalReporterScope)).
SetCostExceededMessage(exceededMessage("global", "maxFetchedDatapoints")))

// Create per query limit manager and enforcer.
queryScope := scope.Tagged(map[string]string{
"limiter": "query",
})
queryLimitManagerScope := queryScope.SubScope(limitManagerScopeName)
queryReporterScope := queryScope.SubScope(reporterScopeName)

queryLimitMgr := cost.NewStaticLimitManager(
cfg.Limits.PerQuery.AsLimitManagerOptions().
SetInstrumentOptions(instrumentOptions.SetMetricsScope(queryLimitManagerScope)))

queryTracker := cost.NewTracker()

queryEnforcer := cost.NewEnforcer(queryLimitMgr, queryTracker,
cost.NewEnforcerOptions().
SetReporter(newPerQueryReporter(queryReporterScope)).
SetCostExceededMessage(exceededMessage("perQuery", "maxFetchedDatapoints")))

// Create block enforcer.
blockEnforcer := cost.NewEnforcer(
cost.NewStaticLimitManager(cost.NewLimitManagerOptions().SetDefaultLimit(cost.Limit{Enabled: false})),
cost.NewTracker(),
nil,
)
nil)

return qcost.NewChainedEnforcer(qcost.GlobalLevel, []cost.Enforcer{
// Create chained enforcer.
enforcer, err := qcost.NewChainedEnforcer(qcost.GlobalLevel, []cost.Enforcer{
globalEnforcer,
queryEnforcer,
blockEnforcer,
})
if err != nil {
return nil, nil, err
}

// Start reporting stats for all limit managers.
go globalLimitMgr.Report()
go queryLimitMgr.Report()

// Close the stats at the end.
closer := close.SimpleCloserFn(func() {
globalLimitMgr.Close()
queryLimitMgr.Close()
})

return enforcer, closer, nil
}

// globalReporter records ChainedEnforcer statistics for the global enforcer.
Expand Down
37 changes: 25 additions & 12 deletions src/query/server/cost_reporters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/m3db/m3/src/cmd/services/m3query/config"
"github.com/m3db/m3/src/query/cost"
"github.com/m3db/m3/src/x/close"
"github.com/m3db/m3/src/x/cost/test"
"github.com/m3db/m3/src/x/instrument"

Expand All @@ -35,17 +36,22 @@ import (
"github.com/uber-go/tally"
)

func TestNewConfiguredChainedEnforcer(t *testing.T) {
type testCtx struct {
Scope tally.TestScope
GlobalEnforcer cost.ChainedEnforcer
}
type enforcerTestCtx struct {
Scope tally.TestScope
GlobalEnforcer cost.ChainedEnforcer
Closer close.SimpleCloser
}

func (c enforcerTestCtx) Close() {
c.Closer.Close()
}

setup := func(t *testing.T, perQueryLimit, globalLimit int64) testCtx {
func TestNewConfiguredChainedEnforcer(t *testing.T) {
setup := func(t *testing.T, perQueryLimit, globalLimit int64) enforcerTestCtx {
s := tally.NewTestScope("", nil)
iopts := instrument.NewOptions().SetMetricsScope(s)

globalEnforcer, err := newConfiguredChainedEnforcer(&config.Configuration{
globalEnforcer, closer, err := newConfiguredChainedEnforcer(&config.Configuration{
Limits: config.LimitsConfiguration{
PerQuery: config.PerQueryLimitsConfiguration{
MaxFetchedDatapoints: perQueryLimit,
Expand All @@ -58,14 +64,16 @@ func TestNewConfiguredChainedEnforcer(t *testing.T) {

require.NoError(t, err)

return testCtx{
return enforcerTestCtx{
Scope: s,
GlobalEnforcer: globalEnforcer,
Closer: closer,
}
}

t.Run("has 3 valid levels", func(t *testing.T) {
tctx := setup(t, 6, 10)
defer tctx.Close()

assertValid := func(ce cost.ChainedEnforcer) {
assert.NotEqual(t, ce, cost.NoopChainedEnforcer())
Expand All @@ -86,14 +94,16 @@ func TestNewConfiguredChainedEnforcer(t *testing.T) {

t.Run("configures reporters", func(t *testing.T) {
tctx := setup(t, 6, 10)
defer tctx.Close()

queryEf := tctx.GlobalEnforcer.Child(cost.QueryLevel)
blockEf := queryEf.Child(cost.BlockLevel)
blockEf.Add(7)

assertHasGauge(t,
tctx.Scope.Snapshot(),
tally.KeyForPrefixedStringMap(
fmt.Sprintf("cost.global.%s", datapointsMetric), nil),
fmt.Sprintf("cost.reporter.%s", datapointsMetric), map[string]string{"limiter": "global"}),
7,
)

Expand All @@ -103,26 +113,29 @@ func TestNewConfiguredChainedEnforcer(t *testing.T) {
assertHasHistogram(t,
tctx.Scope.Snapshot(),
tally.KeyForPrefixedStringMap(
fmt.Sprintf("cost.per_query.%s", maxDatapointsHistMetric), nil),
fmt.Sprintf("cost.reporter.%s", maxDatapointsHistMetric), map[string]string{"limiter": "query"}),
map[float64]int64{10: 1},
)
})

t.Run("block level doesn't have a limit", func(t *testing.T) {
tctx := setup(t, -1, -1)
defer tctx.Close()

block := tctx.GlobalEnforcer.Child(cost.QueryLevel).Child(cost.BlockLevel)
assert.NoError(t, block.Add(math.MaxFloat64-1).Error)
})

t.Run("works e2e", func(t *testing.T) {
tctx := setup(t, 6, 10)
defer tctx.Close()

qe1, qe2 := tctx.GlobalEnforcer.Child(cost.QueryLevel), tctx.GlobalEnforcer.Child(cost.QueryLevel)
r := qe1.Add(6)
test.AssertLimitErrorWithMsg(
t,
r.Error,
"exceeded query limit: limits.perQuery.maxFetchedDatapoints exceeded",
"exceeded query limit: exceeded limits.perQuery.maxFetchedDatapoints",
6,
6)

Expand All @@ -133,7 +146,7 @@ func TestNewConfiguredChainedEnforcer(t *testing.T) {
test.AssertLimitErrorWithMsg(
t,
r.Error,
"exceeded global limit: limits.global.maxFetchedDatapoints exceeded",
"exceeded global limit: exceeded limits.global.maxFetchedDatapoints",
11,
10)

Expand Down
11 changes: 7 additions & 4 deletions src/query/server/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,15 +302,18 @@ func Run(runOpts RunOptions) {
defer cleanup()
}

perQueryEnforcer, err := newConfiguredChainedEnforcer(&cfg, instrumentOptions)
chainedEnforcer, chainedEnforceCloser, err := newConfiguredChainedEnforcer(&cfg,
instrumentOptions)
if err != nil {
logger.Fatal("unable to setup perQueryEnforcer", zap.Error(err))
logger.Fatal("unable to setup chained enforcer", zap.Error(err))
}

defer chainedEnforceCloser.Close()

engineOpts := executor.NewEngineOptions().
SetStore(backendStorage).
SetLookbackDuration(*cfg.LookbackDuration).
SetGlobalEnforcer(perQueryEnforcer).
SetGlobalEnforcer(chainedEnforcer).
SetInstrumentOptions(instrumentOptions.
SetMetricsScope(instrumentOptions.MetricsScope().SubScope("engine")))
if fn := runOpts.CustomPromQLParseFunction; fn != nil {
Expand Down Expand Up @@ -341,7 +344,7 @@ func Run(runOpts RunOptions) {

handlerOptions, err := options.NewHandlerOptions(downsamplerAndWriter,
tagOptions, engine, m3dbClusters, clusterClient, cfg, runOpts.DBConfig,
perQueryEnforcer, fetchOptsBuilder, queryCtxOpts, instrumentOptions,
chainedEnforcer, fetchOptsBuilder, queryCtxOpts, instrumentOptions,
cpuProfileDuration, []string{handleroptions.M3DBServiceName},
serviceOptionDefaults)
if err != nil {
Expand Down
Loading

0 comments on commit 02d1877

Please sign in to comment.