diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index 2860a2cb69..3090af5227 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -409,7 +409,17 @@ type IngestConfiguration struct { // CarbonConfiguration is the configuration for the carbon server. type CarbonConfiguration struct { + // Ingester if set defines an ingester to run for carbon. Ingester *CarbonIngesterConfiguration `yaml:"ingester"` + // AggregateNamespacesAllData configures whether all aggregate + // namespaces contain entire copies of the data set. + // This affects whether queries can be optimized or not, if false + // they cannot be since it's unclear if data matching an expression + // sits in one or many or none of the aggregate namespaces so all + // must be queried, but if true then it can be determined based + // on the query range whether a single namespace can fulfill the + // entire query and if so to only fetch from that one aggregated namespace. + AggregateNamespacesAllData bool `yaml:"aggregateNamespacesAllData"` } // CarbonIngesterConfiguration is the configuration struct for carbon ingestion. diff --git a/src/query/api/v1/handler/graphite/render.go b/src/query/api/v1/handler/graphite/render.go index 218051e28c..681431d486 100644 --- a/src/query/api/v1/handler/graphite/render.go +++ b/src/query/api/v1/handler/graphite/render.go @@ -67,7 +67,7 @@ type respError struct { // NewRenderHandler returns a new render handler around the given storage. func NewRenderHandler(opts options.HandlerOptions) http.Handler { wrappedStore := graphite.NewM3WrappedStorage(opts.Storage(), - opts.Enforcer(), opts.InstrumentOpts()) + opts.Enforcer(), opts.InstrumentOpts(), opts.GraphiteStorageOptions()) return &renderHandler{ engine: native.NewEngine(wrappedStore), queryContextOpts: opts.QueryContextOptions(), diff --git a/src/query/api/v1/httpd/handler_test.go b/src/query/api/v1/httpd/handler_test.go index 078ee2e68c..762fcb565b 100644 --- a/src/query/api/v1/httpd/handler_test.go +++ b/src/query/api/v1/httpd/handler_test.go @@ -39,6 +39,7 @@ import ( "github.com/m3db/m3/src/query/api/v1/options" qcost "github.com/m3db/m3/src/query/cost" "github.com/m3db/m3/src/query/executor" + graphite "github.com/m3db/m3/src/query/graphite/storage" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/test/m3" @@ -109,6 +110,7 @@ func setupHandler( svcDefaultOptions, NewQueryRouter(), NewQueryRouter(), + graphite.M3WrappedStorageOptions{}, ) if err != nil { @@ -145,6 +147,7 @@ func TestHandlerFetchTimeout(t *testing.T) { svcDefaultOptions, nil, nil, + graphite.M3WrappedStorageOptions{}, ) require.NoError(t, err) @@ -406,7 +409,7 @@ func TestCustomRoutes(t *testing.T) { handleroptions.NewFetchOptionsBuilder(handleroptions.FetchOptionsBuilderOptions{}), models.QueryContextOptions{}, instrumentOpts, defaultCPUProfileduration, defaultPlacementServices, svcDefaultOptions, NewQueryRouter(), NewQueryRouter(), - ) + graphite.M3WrappedStorageOptions{}) require.NoError(t, err) custom := &customHandler{t: t} diff --git a/src/query/api/v1/options/handler.go b/src/query/api/v1/options/handler.go index 63b16db547..f8f61946aa 100644 --- a/src/query/api/v1/options/handler.go +++ b/src/query/api/v1/options/handler.go @@ -34,6 +34,7 @@ import ( "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/cost" "github.com/m3db/m3/src/query/executor" + graphite "github.com/m3db/m3/src/query/graphite/storage" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/storage/m3" @@ -200,6 +201,11 @@ type HandlerOptions interface { InstantQueryRouter() QueryRouter // SetInstantQueryRouter sets query router for instant queries. SetInstantQueryRouter(value QueryRouter) HandlerOptions + + // GraphiteStorageOptions returns the Graphite storage options. + GraphiteStorageOptions() graphite.M3WrappedStorageOptions + // SetGraphiteStorageOptions sets the Graphite storage options. + SetGraphiteStorageOptions(value graphite.M3WrappedStorageOptions) HandlerOptions } // HandlerOptions represents handler options. @@ -226,6 +232,7 @@ type handlerOptions struct { nowFn clock.NowFn queryRouter QueryRouter instantQueryRouter QueryRouter + graphiteStorageOpts graphite.M3WrappedStorageOptions } // EmptyHandlerOptions returns default handler options. @@ -255,6 +262,7 @@ func NewHandlerOptions( serviceOptionDefaults []handleroptions.ServiceOptionsDefault, queryRouter QueryRouter, instantQueryRouter QueryRouter, + graphiteStorageOpts graphite.M3WrappedStorageOptions, ) (HandlerOptions, error) { timeout := cfg.Query.TimeoutOrDefault() if embeddedDbCfg != nil && @@ -286,8 +294,9 @@ func NewHandlerOptions( timeoutOpts: &prometheus.TimeoutOpts{ FetchTimeout: timeout, }, - queryRouter: queryRouter, - instantQueryRouter: instantQueryRouter, + queryRouter: queryRouter, + instantQueryRouter: instantQueryRouter, + graphiteStorageOpts: graphiteStorageOpts, }, nil } @@ -529,3 +538,13 @@ func (o *handlerOptions) SetInstantQueryRouter(value QueryRouter) HandlerOptions opts.instantQueryRouter = value return &opts } + +func (o *handlerOptions) GraphiteStorageOptions() graphite.M3WrappedStorageOptions { + return o.graphiteStorageOpts +} + +func (o *handlerOptions) SetGraphiteStorageOptions(value graphite.M3WrappedStorageOptions) HandlerOptions { + opts := *o + opts.graphiteStorageOpts = value + return &opts +} diff --git a/src/query/graphite/storage/m3_wrapper.go b/src/query/graphite/storage/m3_wrapper.go index 297440775d..a8d1409099 100644 --- a/src/query/graphite/storage/m3_wrapper.go +++ b/src/query/graphite/storage/m3_wrapper.go @@ -48,6 +48,12 @@ type m3WrappedStore struct { m3 storage.Storage enforcer cost.ChainedEnforcer instrumentOpts instrument.Options + opts M3WrappedStorageOptions +} + +// M3WrappedStorageOptions is the graphite storage options. +type M3WrappedStorageOptions struct { + AggregateNamespacesAllData bool } // NewM3WrappedStorage creates a graphite storage wrapper around an m3query @@ -56,6 +62,7 @@ func NewM3WrappedStorage( m3storage storage.Storage, enforcer cost.ChainedEnforcer, instrumentOpts instrument.Options, + opts M3WrappedStorageOptions, ) Storage { if enforcer == nil { enforcer = cost.NoopChainedEnforcer() @@ -65,6 +72,7 @@ func NewM3WrappedStorage( m3: m3storage, enforcer: enforcer, instrumentOpts: instrumentOpts, + opts: opts, } } @@ -235,6 +243,12 @@ func (s *m3WrappedStore) FetchByQuery( FanoutAggregated: storage.FanoutDefault, FanoutAggregatedOptimized: storage.FanoutForceDisable, } + if s.opts.AggregateNamespacesAllData { + // NB(r): If aggregate namespaces house all the data, we can do a + // default optimized fanout where we only query the namespaces + // that contain the data for the ranges we are querying for. + fetchOptions.FanoutOptions.FanoutAggregatedOptimized = storage.FanoutDefault + } res, err := s.m3.FetchBlocks(m3ctx, m3query, fetchOptions) if err != nil { diff --git a/src/query/graphite/storage/m3_wrapper_test.go b/src/query/graphite/storage/m3_wrapper_test.go index ac7325e3a3..486cb06a48 100644 --- a/src/query/graphite/storage/m3_wrapper_test.go +++ b/src/query/graphite/storage/m3_wrapper_test.go @@ -200,7 +200,8 @@ func TestFetchByQuery(t *testing.T) { enforcer := cost.NewMockChainedEnforcer(ctrl) enforcer.EXPECT().Child(cost.QueryLevel).Return(childEnforcer).MinTimes(1) - wrapper := NewM3WrappedStorage(store, enforcer, instrument.NewOptions()) + wrapper := NewM3WrappedStorage(store, enforcer, instrument.NewOptions(), + M3WrappedStorageOptions{}) ctx := xctx.New() ctx.SetRequestContext(context.TODO()) end := start.Add(time.Duration(steps) * resolution) @@ -241,7 +242,8 @@ func TestFetchByInvalidQuery(t *testing.T) { query := "a." ctx := xctx.New() - wrapper := NewM3WrappedStorage(store, nil, instrument.NewOptions()) + wrapper := NewM3WrappedStorage(store, nil, instrument.NewOptions(), + M3WrappedStorageOptions{}) result, err := wrapper.FetchByQuery(ctx, query, opts) assert.NoError(t, err) require.Equal(t, 0, len(result.SeriesList)) diff --git a/src/query/server/query.go b/src/query/server/query.go index f316fca5c8..7d3069502f 100644 --- a/src/query/server/query.go +++ b/src/query/server/query.go @@ -48,6 +48,7 @@ import ( "github.com/m3db/m3/src/query/api/v1/options" m3dbcluster "github.com/m3db/m3/src/query/cluster/m3db" "github.com/m3db/m3/src/query/executor" + graphite "github.com/m3db/m3/src/query/graphite/storage" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/parser/promql" "github.com/m3db/m3/src/query/policy/filter" @@ -463,13 +464,20 @@ func Run(runOpts RunOptions) { } } + var graphiteStorageOpts graphite.M3WrappedStorageOptions + if cfg.Carbon != nil { + graphiteStorageOpts.AggregateNamespacesAllData = + cfg.Carbon.AggregateNamespacesAllData + } + prometheusEngine := newPromQLEngine(cfg.Query, prometheusEngineRegistry, instrumentOptions) handlerOptions, err := options.NewHandlerOptions(downsamplerAndWriter, tagOptions, engine, prometheusEngine, m3dbClusters, clusterClient, cfg, runOpts.DBConfig, chainedEnforcer, fetchOptsBuilder, queryCtxOpts, instrumentOptions, cpuProfileDuration, []string{handleroptions.M3DBServiceName}, - serviceOptionDefaults, httpd.NewQueryRouter(), httpd.NewQueryRouter()) + serviceOptionDefaults, httpd.NewQueryRouter(), httpd.NewQueryRouter(), + graphiteStorageOpts) if err != nil { logger.Fatal("unable to set up handler options", zap.Error(err)) }