Skip to content

Commit

Permalink
[query] Add ability to enable optimized Graphite fanout if all agg na…
Browse files Browse the repository at this point in the history
…mespaces have all the data (#2665)
  • Loading branch information
robskillington authored Sep 25, 2020
1 parent aefca00 commit 41b4bc4
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 7 deletions.
10 changes: 10 additions & 0 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,17 @@ type IngestConfiguration struct {

// CarbonConfiguration is the configuration for the carbon server.
type CarbonConfiguration struct {
// Ingester if set defines an ingester to run for carbon.
Ingester *CarbonIngesterConfiguration `yaml:"ingester"`
// AggregateNamespacesAllData configures whether all aggregate
// namespaces contain entire copies of the data set.
// This affects whether queries can be optimized or not, if false
// they cannot be since it's unclear if data matching an expression
// sits in one or many or none of the aggregate namespaces so all
// must be queried, but if true then it can be determined based
// on the query range whether a single namespace can fulfill the
// entire query and if so to only fetch from that one aggregated namespace.
AggregateNamespacesAllData bool `yaml:"aggregateNamespacesAllData"`
}

// CarbonIngesterConfiguration is the configuration struct for carbon ingestion.
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/handler/graphite/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type respError struct {
// NewRenderHandler returns a new render handler around the given storage.
func NewRenderHandler(opts options.HandlerOptions) http.Handler {
wrappedStore := graphite.NewM3WrappedStorage(opts.Storage(),
opts.Enforcer(), opts.InstrumentOpts())
opts.Enforcer(), opts.InstrumentOpts(), opts.GraphiteStorageOptions())
return &renderHandler{
engine: native.NewEngine(wrappedStore),
queryContextOpts: opts.QueryContextOptions(),
Expand Down
5 changes: 4 additions & 1 deletion src/query/api/v1/httpd/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/m3db/m3/src/query/api/v1/options"
qcost "github.com/m3db/m3/src/query/cost"
"github.com/m3db/m3/src/query/executor"
graphite "github.com/m3db/m3/src/query/graphite/storage"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/test/m3"
Expand Down Expand Up @@ -109,6 +110,7 @@ func setupHandler(
svcDefaultOptions,
NewQueryRouter(),
NewQueryRouter(),
graphite.M3WrappedStorageOptions{},
)

if err != nil {
Expand Down Expand Up @@ -145,6 +147,7 @@ func TestHandlerFetchTimeout(t *testing.T) {
svcDefaultOptions,
nil,
nil,
graphite.M3WrappedStorageOptions{},
)
require.NoError(t, err)

Expand Down Expand Up @@ -406,7 +409,7 @@ func TestCustomRoutes(t *testing.T) {
handleroptions.NewFetchOptionsBuilder(handleroptions.FetchOptionsBuilderOptions{}),
models.QueryContextOptions{}, instrumentOpts, defaultCPUProfileduration,
defaultPlacementServices, svcDefaultOptions, NewQueryRouter(), NewQueryRouter(),
)
graphite.M3WrappedStorageOptions{})

require.NoError(t, err)
custom := &customHandler{t: t}
Expand Down
23 changes: 21 additions & 2 deletions src/query/api/v1/options/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/cost"
"github.com/m3db/m3/src/query/executor"
graphite "github.com/m3db/m3/src/query/graphite/storage"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/storage/m3"
Expand Down Expand Up @@ -200,6 +201,11 @@ type HandlerOptions interface {
InstantQueryRouter() QueryRouter
// SetInstantQueryRouter sets query router for instant queries.
SetInstantQueryRouter(value QueryRouter) HandlerOptions

// GraphiteStorageOptions returns the Graphite storage options.
GraphiteStorageOptions() graphite.M3WrappedStorageOptions
// SetGraphiteStorageOptions sets the Graphite storage options.
SetGraphiteStorageOptions(value graphite.M3WrappedStorageOptions) HandlerOptions
}

// HandlerOptions represents handler options.
Expand All @@ -226,6 +232,7 @@ type handlerOptions struct {
nowFn clock.NowFn
queryRouter QueryRouter
instantQueryRouter QueryRouter
graphiteStorageOpts graphite.M3WrappedStorageOptions
}

// EmptyHandlerOptions returns default handler options.
Expand Down Expand Up @@ -255,6 +262,7 @@ func NewHandlerOptions(
serviceOptionDefaults []handleroptions.ServiceOptionsDefault,
queryRouter QueryRouter,
instantQueryRouter QueryRouter,
graphiteStorageOpts graphite.M3WrappedStorageOptions,
) (HandlerOptions, error) {
timeout := cfg.Query.TimeoutOrDefault()
if embeddedDbCfg != nil &&
Expand Down Expand Up @@ -286,8 +294,9 @@ func NewHandlerOptions(
timeoutOpts: &prometheus.TimeoutOpts{
FetchTimeout: timeout,
},
queryRouter: queryRouter,
instantQueryRouter: instantQueryRouter,
queryRouter: queryRouter,
instantQueryRouter: instantQueryRouter,
graphiteStorageOpts: graphiteStorageOpts,
}, nil
}

Expand Down Expand Up @@ -529,3 +538,13 @@ func (o *handlerOptions) SetInstantQueryRouter(value QueryRouter) HandlerOptions
opts.instantQueryRouter = value
return &opts
}

func (o *handlerOptions) GraphiteStorageOptions() graphite.M3WrappedStorageOptions {
return o.graphiteStorageOpts
}

func (o *handlerOptions) SetGraphiteStorageOptions(value graphite.M3WrappedStorageOptions) HandlerOptions {
opts := *o
opts.graphiteStorageOpts = value
return &opts
}
14 changes: 14 additions & 0 deletions src/query/graphite/storage/m3_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ type m3WrappedStore struct {
m3 storage.Storage
enforcer cost.ChainedEnforcer
instrumentOpts instrument.Options
opts M3WrappedStorageOptions
}

// M3WrappedStorageOptions is the graphite storage options.
type M3WrappedStorageOptions struct {
AggregateNamespacesAllData bool
}

// NewM3WrappedStorage creates a graphite storage wrapper around an m3query
Expand All @@ -56,6 +62,7 @@ func NewM3WrappedStorage(
m3storage storage.Storage,
enforcer cost.ChainedEnforcer,
instrumentOpts instrument.Options,
opts M3WrappedStorageOptions,
) Storage {
if enforcer == nil {
enforcer = cost.NoopChainedEnforcer()
Expand All @@ -65,6 +72,7 @@ func NewM3WrappedStorage(
m3: m3storage,
enforcer: enforcer,
instrumentOpts: instrumentOpts,
opts: opts,
}
}

Expand Down Expand Up @@ -235,6 +243,12 @@ func (s *m3WrappedStore) FetchByQuery(
FanoutAggregated: storage.FanoutDefault,
FanoutAggregatedOptimized: storage.FanoutForceDisable,
}
if s.opts.AggregateNamespacesAllData {
// NB(r): If aggregate namespaces house all the data, we can do a
// default optimized fanout where we only query the namespaces
// that contain the data for the ranges we are querying for.
fetchOptions.FanoutOptions.FanoutAggregatedOptimized = storage.FanoutDefault
}

res, err := s.m3.FetchBlocks(m3ctx, m3query, fetchOptions)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions src/query/graphite/storage/m3_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ func TestFetchByQuery(t *testing.T) {
enforcer := cost.NewMockChainedEnforcer(ctrl)
enforcer.EXPECT().Child(cost.QueryLevel).Return(childEnforcer).MinTimes(1)

wrapper := NewM3WrappedStorage(store, enforcer, instrument.NewOptions())
wrapper := NewM3WrappedStorage(store, enforcer, instrument.NewOptions(),
M3WrappedStorageOptions{})
ctx := xctx.New()
ctx.SetRequestContext(context.TODO())
end := start.Add(time.Duration(steps) * resolution)
Expand Down Expand Up @@ -241,7 +242,8 @@ func TestFetchByInvalidQuery(t *testing.T) {

query := "a."
ctx := xctx.New()
wrapper := NewM3WrappedStorage(store, nil, instrument.NewOptions())
wrapper := NewM3WrappedStorage(store, nil, instrument.NewOptions(),
M3WrappedStorageOptions{})
result, err := wrapper.FetchByQuery(ctx, query, opts)
assert.NoError(t, err)
require.Equal(t, 0, len(result.SeriesList))
Expand Down
10 changes: 9 additions & 1 deletion src/query/server/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/m3db/m3/src/query/api/v1/options"
m3dbcluster "github.com/m3db/m3/src/query/cluster/m3db"
"github.com/m3db/m3/src/query/executor"
graphite "github.com/m3db/m3/src/query/graphite/storage"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/parser/promql"
"github.com/m3db/m3/src/query/policy/filter"
Expand Down Expand Up @@ -463,13 +464,20 @@ func Run(runOpts RunOptions) {
}
}

var graphiteStorageOpts graphite.M3WrappedStorageOptions
if cfg.Carbon != nil {
graphiteStorageOpts.AggregateNamespacesAllData =
cfg.Carbon.AggregateNamespacesAllData
}

prometheusEngine := newPromQLEngine(cfg.Query, prometheusEngineRegistry,
instrumentOptions)
handlerOptions, err := options.NewHandlerOptions(downsamplerAndWriter,
tagOptions, engine, prometheusEngine, m3dbClusters, clusterClient, cfg,
runOpts.DBConfig, chainedEnforcer, fetchOptsBuilder, queryCtxOpts,
instrumentOptions, cpuProfileDuration, []string{handleroptions.M3DBServiceName},
serviceOptionDefaults, httpd.NewQueryRouter(), httpd.NewQueryRouter())
serviceOptionDefaults, httpd.NewQueryRouter(), httpd.NewQueryRouter(),
graphiteStorageOpts)
if err != nil {
logger.Fatal("unable to set up handler options", zap.Error(err))
}
Expand Down

0 comments on commit 41b4bc4

Please sign in to comment.