From 02d18779b1ff808ebf6858e1a9167dd53871d19c Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Thu, 26 Mar 2020 10:43:11 -0400 Subject: [PATCH] [query] Fix query limit metrics reporting not starting at startup (#2228) --- .../development/m3_stack/docker-compose.yml | 5 +- scripts/development/m3_stack/start_m3.sh | 7 +- src/query/server/cost_reporters.go | 108 +++++++++++++----- src/query/server/cost_reporters_test.go | 37 ++++-- src/query/server/query.go | 11 +- src/query/server/query_test.go | 85 +++++++++++++- src/x/close/close.go | 16 +++ 7 files changed, 213 insertions(+), 56 deletions(-) diff --git a/scripts/development/m3_stack/docker-compose.yml b/scripts/development/m3_stack/docker-compose.yml index 2b8d46800e..83d7390dd9 100644 --- a/scripts/development/m3_stack/docker-compose.yml +++ b/scripts/development/m3_stack/docker-compose.yml @@ -90,8 +90,9 @@ services: dockerfile: ./docker/m3coordinator/development.Dockerfile image: m3coordinator:dev volumes: - # Use a path in the bin directory (gitignored) to easily change configs - - "../../../bin/m3coordinator.yml:/etc/m3coordinator/m3coordinator.yml" + # Use a git ignored path to easily change pre-set configs. + # Note: Use ".tmp" suffix is git ignored. + - "./m3coordinator.yml.tmp:/etc/m3coordinator/m3coordinator.yml" - "./schema.proto:/etc/m3coordinator/schema.proto" m3collector01: expose: diff --git a/scripts/development/m3_stack/start_m3.sh b/scripts/development/m3_stack/start_m3.sh index ba35261738..a86a5588b7 100755 --- a/scripts/development/m3_stack/start_m3.sh +++ b/scripts/development/m3_stack/start_m3.sh @@ -52,7 +52,8 @@ else fi # Use standard coordinator config when bringing up coordinator first time -cp ./m3coordinator-standard.yml ${RELATIVE}/bin/m3coordinator.yml +# Note: Use ".tmp" suffix to be git ignored. +cp ./m3coordinator-standard.yml ./m3coordinator.yml.tmp if [[ "$M3COORDINATOR_DEV_IMG" == "0" ]] || [[ "$FORCE_BUILD" == true ]] || [[ "$BUILD_M3COORDINATOR" == true ]]; then prepare_build_cmd "make m3coordinator-linux-amd64" @@ -298,7 +299,9 @@ if [[ "$USE_AGGREGATOR" = true ]]; then # Restart with aggregator coordinator config docker-compose -f docker-compose.yml stop m3coordinator01 - cp ./m3coordinator-aggregator.yml ${RELATIVE}/bin/m3coordinator.yml + + # Note: Use ".tmp" suffix to be git ignored. + cp ./m3coordinator-aggregator.yml ./m3coordinator.yml.tmp docker-compose -f docker-compose.yml up $DOCKER_ARGS m3coordinator01 # May not necessarily flush diff --git a/src/query/server/cost_reporters.go b/src/query/server/cost_reporters.go index 6a165c6ab9..1f6e2d0b5a 100644 --- a/src/query/server/cost_reporters.go +++ b/src/query/server/cost_reporters.go @@ -23,10 +23,12 @@ package server // This file contains reporters and setup for our query/cost.ChainedEnforcer // instances. import ( + "fmt" "sync" "github.com/m3db/m3/src/cmd/services/m3query/config" qcost "github.com/m3db/m3/src/query/cost" + "github.com/m3db/m3/src/x/close" "github.com/m3db/m3/src/x/cost" "github.com/m3db/m3/src/x/instrument" @@ -34,6 +36,9 @@ import ( ) const ( + costScopeName = "cost" + limitManagerScopeName = "limits" + reporterScopeName = "reporter" queriesOverLimitMetric = "over_datapoints_limit" datapointsMetric = "datapoints" datapointsCounterMetric = "datapoints_counter" @@ -45,49 +50,90 @@ const ( // on them (as configured by cfg.Limits); per-block is purely for accounting // purposes. // Our enforcers report at least these stats: -// cost.global.datapoints: gauge; the number of datapoints currently in use -// by this instance. +// cost_reporter_datapoints{limit="global"}: gauge; +// > the number of datapoints currently in use by this instance. // -// cost.global.datapoints_counter: counter; counter representation of the -// number of datapoints in use by this instance +// cost_reporter_datapoints_counter{limiter="global"}: counter; +// > counter representation of the number of datapoints in use by this instance. // -// cost.{per_query,global}.over_datapoints_limit: counter; how many queries are over the -// datapoint limit +// cost_reporter_over_datapoints_limit{limiter=~"(global|per_query)"}: counter; +// > how many queries are over the datapoint limit. // -// cost.per_query.max_datapoints_hist: histogram; represents the -// distribution of the maximum datapoints used at any point in each query. -func newConfiguredChainedEnforcer(cfg *config.Configuration, instrumentOptions instrument.Options) (qcost.ChainedEnforcer, error) { - costScope := instrumentOptions.MetricsScope().SubScope("cost") - costIops := instrumentOptions.SetMetricsScope(costScope) - limitMgr := cost.NewStaticLimitManager(cfg.Limits.Global.AsLimitManagerOptions().SetInstrumentOptions(costIops)) - tracker := cost.NewTracker() - - globalEnforcer := cost.NewEnforcer(limitMgr, tracker, - cost.NewEnforcerOptions().SetReporter( - newGlobalReporter(costScope.SubScope("global")), - ).SetCostExceededMessage("limits.global.maxFetchedDatapoints exceeded"), - ) - - queryEnforcerOpts := cost.NewEnforcerOptions().SetCostExceededMessage("limits.perQuery.maxFetchedDatapoints exceeded"). - SetReporter(newPerQueryReporter(costScope. - SubScope("per_query"))) - - queryEnforcer := cost.NewEnforcer( - cost.NewStaticLimitManager(cfg.Limits.PerQuery.AsLimitManagerOptions()), - cost.NewTracker(), - queryEnforcerOpts) +// cost_reporter_max_datapoints_hist{limiter=~"(global|per_query)"}: histogram; +// > represents the distribution of the maximum datapoints used at any point in each query. +func newConfiguredChainedEnforcer( + cfg *config.Configuration, + instrumentOptions instrument.Options, +) (qcost.ChainedEnforcer, close.SimpleCloser, error) { + scope := instrumentOptions.MetricsScope().SubScope(costScopeName) + + exceededMessage := func(exceedType, exceedLimit string) string { + return fmt.Sprintf("exceeded limits.%s.%s", exceedType, exceedLimit) + } + + // Create global limit manager and enforcer. + globalScope := scope.Tagged(map[string]string{ + "limiter": "global", + }) + globalLimitManagerScope := globalScope.SubScope(limitManagerScopeName) + globalReporterScope := globalScope.SubScope(reporterScopeName) + + globalLimitMgr := cost.NewStaticLimitManager( + cfg.Limits.Global.AsLimitManagerOptions(). + SetInstrumentOptions(instrumentOptions.SetMetricsScope(globalLimitManagerScope))) + + globalTracker := cost.NewTracker() + + globalEnforcer := cost.NewEnforcer(globalLimitMgr, globalTracker, + cost.NewEnforcerOptions(). + SetReporter(newGlobalReporter(globalReporterScope)). + SetCostExceededMessage(exceededMessage("global", "maxFetchedDatapoints"))) + + // Create per query limit manager and enforcer. + queryScope := scope.Tagged(map[string]string{ + "limiter": "query", + }) + queryLimitManagerScope := queryScope.SubScope(limitManagerScopeName) + queryReporterScope := queryScope.SubScope(reporterScopeName) + queryLimitMgr := cost.NewStaticLimitManager( + cfg.Limits.PerQuery.AsLimitManagerOptions(). + SetInstrumentOptions(instrumentOptions.SetMetricsScope(queryLimitManagerScope))) + + queryTracker := cost.NewTracker() + + queryEnforcer := cost.NewEnforcer(queryLimitMgr, queryTracker, + cost.NewEnforcerOptions(). + SetReporter(newPerQueryReporter(queryReporterScope)). + SetCostExceededMessage(exceededMessage("perQuery", "maxFetchedDatapoints"))) + + // Create block enforcer. blockEnforcer := cost.NewEnforcer( cost.NewStaticLimitManager(cost.NewLimitManagerOptions().SetDefaultLimit(cost.Limit{Enabled: false})), cost.NewTracker(), - nil, - ) + nil) - return qcost.NewChainedEnforcer(qcost.GlobalLevel, []cost.Enforcer{ + // Create chained enforcer. + enforcer, err := qcost.NewChainedEnforcer(qcost.GlobalLevel, []cost.Enforcer{ globalEnforcer, queryEnforcer, blockEnforcer, }) + if err != nil { + return nil, nil, err + } + + // Start reporting stats for all limit managers. + go globalLimitMgr.Report() + go queryLimitMgr.Report() + + // Close the stats at the end. + closer := close.SimpleCloserFn(func() { + globalLimitMgr.Close() + queryLimitMgr.Close() + }) + + return enforcer, closer, nil } // globalReporter records ChainedEnforcer statistics for the global enforcer. diff --git a/src/query/server/cost_reporters_test.go b/src/query/server/cost_reporters_test.go index 26fefc6252..9ac86cebc7 100644 --- a/src/query/server/cost_reporters_test.go +++ b/src/query/server/cost_reporters_test.go @@ -27,6 +27,7 @@ import ( "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/query/cost" + "github.com/m3db/m3/src/x/close" "github.com/m3db/m3/src/x/cost/test" "github.com/m3db/m3/src/x/instrument" @@ -35,17 +36,22 @@ import ( "github.com/uber-go/tally" ) -func TestNewConfiguredChainedEnforcer(t *testing.T) { - type testCtx struct { - Scope tally.TestScope - GlobalEnforcer cost.ChainedEnforcer - } +type enforcerTestCtx struct { + Scope tally.TestScope + GlobalEnforcer cost.ChainedEnforcer + Closer close.SimpleCloser +} + +func (c enforcerTestCtx) Close() { + c.Closer.Close() +} - setup := func(t *testing.T, perQueryLimit, globalLimit int64) testCtx { +func TestNewConfiguredChainedEnforcer(t *testing.T) { + setup := func(t *testing.T, perQueryLimit, globalLimit int64) enforcerTestCtx { s := tally.NewTestScope("", nil) iopts := instrument.NewOptions().SetMetricsScope(s) - globalEnforcer, err := newConfiguredChainedEnforcer(&config.Configuration{ + globalEnforcer, closer, err := newConfiguredChainedEnforcer(&config.Configuration{ Limits: config.LimitsConfiguration{ PerQuery: config.PerQueryLimitsConfiguration{ MaxFetchedDatapoints: perQueryLimit, @@ -58,14 +64,16 @@ func TestNewConfiguredChainedEnforcer(t *testing.T) { require.NoError(t, err) - return testCtx{ + return enforcerTestCtx{ Scope: s, GlobalEnforcer: globalEnforcer, + Closer: closer, } } t.Run("has 3 valid levels", func(t *testing.T) { tctx := setup(t, 6, 10) + defer tctx.Close() assertValid := func(ce cost.ChainedEnforcer) { assert.NotEqual(t, ce, cost.NoopChainedEnforcer()) @@ -86,6 +94,8 @@ func TestNewConfiguredChainedEnforcer(t *testing.T) { t.Run("configures reporters", func(t *testing.T) { tctx := setup(t, 6, 10) + defer tctx.Close() + queryEf := tctx.GlobalEnforcer.Child(cost.QueryLevel) blockEf := queryEf.Child(cost.BlockLevel) blockEf.Add(7) @@ -93,7 +103,7 @@ func TestNewConfiguredChainedEnforcer(t *testing.T) { assertHasGauge(t, tctx.Scope.Snapshot(), tally.KeyForPrefixedStringMap( - fmt.Sprintf("cost.global.%s", datapointsMetric), nil), + fmt.Sprintf("cost.reporter.%s", datapointsMetric), map[string]string{"limiter": "global"}), 7, ) @@ -103,26 +113,29 @@ func TestNewConfiguredChainedEnforcer(t *testing.T) { assertHasHistogram(t, tctx.Scope.Snapshot(), tally.KeyForPrefixedStringMap( - fmt.Sprintf("cost.per_query.%s", maxDatapointsHistMetric), nil), + fmt.Sprintf("cost.reporter.%s", maxDatapointsHistMetric), map[string]string{"limiter": "query"}), map[float64]int64{10: 1}, ) }) t.Run("block level doesn't have a limit", func(t *testing.T) { tctx := setup(t, -1, -1) + defer tctx.Close() + block := tctx.GlobalEnforcer.Child(cost.QueryLevel).Child(cost.BlockLevel) assert.NoError(t, block.Add(math.MaxFloat64-1).Error) }) t.Run("works e2e", func(t *testing.T) { tctx := setup(t, 6, 10) + defer tctx.Close() qe1, qe2 := tctx.GlobalEnforcer.Child(cost.QueryLevel), tctx.GlobalEnforcer.Child(cost.QueryLevel) r := qe1.Add(6) test.AssertLimitErrorWithMsg( t, r.Error, - "exceeded query limit: limits.perQuery.maxFetchedDatapoints exceeded", + "exceeded query limit: exceeded limits.perQuery.maxFetchedDatapoints", 6, 6) @@ -133,7 +146,7 @@ func TestNewConfiguredChainedEnforcer(t *testing.T) { test.AssertLimitErrorWithMsg( t, r.Error, - "exceeded global limit: limits.global.maxFetchedDatapoints exceeded", + "exceeded global limit: exceeded limits.global.maxFetchedDatapoints", 11, 10) diff --git a/src/query/server/query.go b/src/query/server/query.go index faeb6c2bca..74ce47a992 100644 --- a/src/query/server/query.go +++ b/src/query/server/query.go @@ -302,15 +302,18 @@ func Run(runOpts RunOptions) { defer cleanup() } - perQueryEnforcer, err := newConfiguredChainedEnforcer(&cfg, instrumentOptions) + chainedEnforcer, chainedEnforceCloser, err := newConfiguredChainedEnforcer(&cfg, + instrumentOptions) if err != nil { - logger.Fatal("unable to setup perQueryEnforcer", zap.Error(err)) + logger.Fatal("unable to setup chained enforcer", zap.Error(err)) } + defer chainedEnforceCloser.Close() + engineOpts := executor.NewEngineOptions(). SetStore(backendStorage). SetLookbackDuration(*cfg.LookbackDuration). - SetGlobalEnforcer(perQueryEnforcer). + SetGlobalEnforcer(chainedEnforcer). SetInstrumentOptions(instrumentOptions. SetMetricsScope(instrumentOptions.MetricsScope().SubScope("engine"))) if fn := runOpts.CustomPromQLParseFunction; fn != nil { @@ -341,7 +344,7 @@ func Run(runOpts RunOptions) { handlerOptions, err := options.NewHandlerOptions(downsamplerAndWriter, tagOptions, engine, m3dbClusters, clusterClient, cfg, runOpts.DBConfig, - perQueryEnforcer, fetchOptsBuilder, queryCtxOpts, instrumentOptions, + chainedEnforcer, fetchOptsBuilder, queryCtxOpts, instrumentOptions, cpuProfileDuration, []string{handleroptions.M3DBServiceName}, serviceOptionDefaults) if err != nil { diff --git a/src/query/server/query_test.go b/src/query/server/query_test.go index ab2eb12e23..9563f98f58 100644 --- a/src/query/server/query_test.go +++ b/src/query/server/query_test.go @@ -31,7 +31,6 @@ import ( "testing" "time" - "github.com/gogo/protobuf/proto" clusterclient "github.com/m3db/m3/src/cluster/client" "github.com/m3db/m3/src/cluster/kv" "github.com/m3db/m3/src/cmd/services/m3query/config" @@ -47,16 +46,19 @@ import ( rpc "github.com/m3db/m3/src/query/generated/proto/rpcpb" "github.com/m3db/m3/src/query/storage/m3" xclock "github.com/m3db/m3/src/x/clock" + "github.com/m3db/m3/src/x/close" xconfig "github.com/m3db/m3/src/x/config" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/serialize" xtest "github.com/m3db/m3/src/x/test" - "go.uber.org/atomic" + "github.com/gogo/protobuf/proto" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + "go.uber.org/atomic" "google.golang.org/grpc" ) @@ -510,8 +512,13 @@ func TestNewPerQueryEnforcer(t *testing.T) { Global cost.ChainedEnforcer Query cost.ChainedEnforcer Block cost.ChainedEnforcer + Closer close.SimpleCloser } + scope := tally.NewTestScope("", nil) + instrumentOpts := instrument.NewTestOptions(t). + SetMetricsScope(scope) + setup := func(t *testing.T, globalLimit, queryLimit int) testContext { cfg := &config.Configuration{ Limits: config.LimitsConfiguration{ @@ -524,7 +531,7 @@ func TestNewPerQueryEnforcer(t *testing.T) { }, } - global, err := newConfiguredChainedEnforcer(cfg, instrument.NewOptions()) + global, closer, err := newConfiguredChainedEnforcer(cfg, instrumentOpts) require.NoError(t, err) queryLvl := global.Child(cost.QueryLevel) @@ -534,13 +541,15 @@ func TestNewPerQueryEnforcer(t *testing.T) { Global: global, Query: queryLvl, Block: blockLvl, + Closer: closer, } } tctx := setup(t, 100, 10) + defer tctx.Closer.Close() - // spot check that limits are setup properly for each level - r := tctx.Block.Add(11) + // Spot check that limits are setup properly for each level. + r := tctx.Query.Add(11) require.Error(t, r.Error) floatsEqual := func(f1, f2 float64) { @@ -555,6 +564,72 @@ func TestNewPerQueryEnforcer(t *testing.T) { r, _ = tctx.Global.State() floatsEqual(float64(r.Cost), 11) require.NoError(t, r.Error) + + // Wait for stats reporting to start. + start := time.Now() + for time.Since(start) < 15*time.Second { + gauges := scope.Snapshot().Gauges() + globalEnabled, globalOk := gauges["cost.limits.enabled+limiter=global"] + queryEnabled, queryOk := gauges["cost.limits.enabled+limiter=query"] + if globalOk && queryOk && globalEnabled.Value() == 1 && queryEnabled.Value() == 1 { + break + } + + time.Sleep(100 * time.Millisecond) + } + + // Check stats. + expectCounterValues := map[string]int64{ + "cost.reporter.over_datapoints_limit+enabled=false,limiter=global": 0, + "cost.reporter.over_datapoints_limit+enabled=true,limiter=global": 0, + "cost.reporter.datapoints_counter+limiter=global": 11, + "cost.reporter.over_datapoints_limit+enabled=false,limiter=query": 0, + "cost.reporter.over_datapoints_limit+enabled=true,limiter=query": 1, + } + expectGaugeValues := map[string]float64{ + "cost.limits.threshold+limiter=global": 100, + "cost.limits.enabled+limiter=global": 1, + "cost.reporter.datapoints+limiter=global": 11, + "cost.limits.threshold+limiter=query": 10, + "cost.limits.enabled+limiter=query": 1, + } + + snapshot := scope.Snapshot() + actualCounterValues := make(map[string]int64) + for k, v := range snapshot.Counters() { + actualCounterValues[k] = v.Value() + + expected, ok := expectCounterValues[k] + if !ok { + continue + } + + // Check match. + assert.Equal(t, expected, v.Value(), + fmt.Sprintf("stat mismatch: stat=%s", k)) + + delete(expectCounterValues, k) + } + assert.Equal(t, 0, len(expectCounterValues), + fmt.Sprintf("missing stats: %+v", expectCounterValues)) + + actualGaugeValues := make(map[string]float64) + for k, v := range snapshot.Gauges() { + actualGaugeValues[k] = v.Value() + + expected, ok := expectGaugeValues[k] + if !ok { + continue + } + + // Check match. + assert.Equal(t, expected, v.Value(), + fmt.Sprintf("stat mismatch: stat=%s", k)) + + delete(expectGaugeValues, k) + } + assert.Equal(t, 0, len(expectGaugeValues), + fmt.Sprintf("missing stats: %+v", expectGaugeValues)) } var _ rpc.QueryServer = &queryServer{} diff --git a/src/x/close/close.go b/src/x/close/close.go index 5e71e3dbf7..1b652005fa 100644 --- a/src/x/close/close.go +++ b/src/x/close/close.go @@ -37,11 +37,27 @@ type Closer interface { io.Closer } +// CloserFn implements the SimpleCloser interface. +type CloserFn func() error + +// Close implements the SimplerCloser interface. +func (fn CloserFn) Close() error { + return fn() +} + // SimpleCloser is a resource that can be closed without returning a result. type SimpleCloser interface { Close() } +// SimpleCloserFn implements the SimpleCloser interface. +type SimpleCloserFn func() + +// Close implements the SimplerCloser interface. +func (fn SimpleCloserFn) Close() { + fn() +} + // TryClose attempts to close a resource, the resource is expected to // implement either Closeable or CloseableResult. func TryClose(r interface{}) error {