Skip to content

Commit

Permalink
[query] Add cost enforcer to graphite queries (#1449)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola authored Mar 13, 2019
1 parent b1213c2 commit 4610505
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 47 deletions.
4 changes: 3 additions & 1 deletion src/query/api/v1/handler/graphite/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"sync"

"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/cost"
"github.com/m3db/m3/src/query/graphite/common"
"github.com/m3db/m3/src/query/graphite/errors"
"github.com/m3db/m3/src/query/graphite/native"
Expand Down Expand Up @@ -62,8 +63,9 @@ type respError struct {
// NewRenderHandler returns a new render handler around the given storage.
func NewRenderHandler(
storage storage.Storage,
enforcer cost.ChainedEnforcer,
) http.Handler {
wrappedStore := graphite.NewM3WrappedStorage(storage)
wrappedStore := graphite.NewM3WrappedStorage(storage, enforcer)
return &renderHandler{
engine: native.NewEngine(wrappedStore),
}
Expand Down
10 changes: 5 additions & 5 deletions src/query/api/v1/handler/graphite/render_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (

func TestParseNoQuery(t *testing.T) {
mockStorage := mock.NewMockStorage()
handler := NewRenderHandler(mockStorage)
handler := NewRenderHandler(mockStorage, nil)

recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, newGraphiteReadHTTPRequest(t))
Expand All @@ -51,7 +51,7 @@ func TestParseNoQuery(t *testing.T) {
func TestParseQueryNoResults(t *testing.T) {
mockStorage := mock.NewMockStorage()
mockStorage.SetFetchResult(&storage.FetchResult{}, nil)
handler := NewRenderHandler(mockStorage)
handler := NewRenderHandler(mockStorage, nil)

req := newGraphiteReadHTTPRequest(t)
req.URL.RawQuery = "target=foo.bar&from=-2h&until=now"
Expand Down Expand Up @@ -82,7 +82,7 @@ func TestParseQueryResults(t *testing.T) {
}

mockStorage.SetFetchResult(&storage.FetchResult{SeriesList: seriesList}, nil)
handler := NewRenderHandler(mockStorage)
handler := NewRenderHandler(mockStorage, nil)

req := newGraphiteReadHTTPRequest(t)
req.URL.RawQuery = fmt.Sprintf("target=foo.bar&from=%d&until=%d",
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestParseQueryResultsMaxDatapoints(t *testing.T) {
}

mockStorage.SetFetchResult(&storage.FetchResult{SeriesList: seriesList}, nil)
handler := NewRenderHandler(mockStorage)
handler := NewRenderHandler(mockStorage, nil)

req := newGraphiteReadHTTPRequest(t)
req.URL.RawQuery = "target=foo.bar&from=" + startStr + "&until=" + endStr + "&maxDataPoints=1"
Expand Down Expand Up @@ -158,7 +158,7 @@ func TestParseQueryResultsMultiTarget(t *testing.T) {
}

mockStorage.SetFetchResult(&storage.FetchResult{SeriesList: seriesList}, nil)
handler := NewRenderHandler(mockStorage)
handler := NewRenderHandler(mockStorage, nil)

req := newGraphiteReadHTTPRequest(t)
req.URL.RawQuery = fmt.Sprintf("target=foo.bar&target=baz.qux&from=%d&until=%d",
Expand Down
6 changes: 5 additions & 1 deletion src/query/api/v1/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/remote"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/validator"
"github.com/m3db/m3/src/query/api/v1/handler/topic"
"github.com/m3db/m3/src/query/cost"
"github.com/m3db/m3/src/query/executor"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
Expand Down Expand Up @@ -85,6 +86,7 @@ type Handler struct {
createdAt time.Time
tagOptions models.TagOptions
timeoutOpts *prometheus.TimeoutOpts
enforcer cost.ChainedEnforcer
}

// Router returns the http handler registered with all relevant routes for query.
Expand All @@ -101,6 +103,7 @@ func NewHandler(
clusterClient clusterclient.Client,
cfg config.Configuration,
embeddedDbCfg *dbconfig.DBConfiguration,
enforcer cost.ChainedEnforcer,
scope tally.Scope,
) (*Handler, error) {
r := mux.NewRouter()
Expand Down Expand Up @@ -132,6 +135,7 @@ func NewHandler(
createdAt: time.Now(),
tagOptions: tagOptions,
timeoutOpts: timeoutOpts,
enforcer: enforcer,
}
return h, nil
}
Expand Down Expand Up @@ -224,7 +228,7 @@ func (h *Handler) RegisterRoutes() error {

// Graphite endpoints
h.router.HandleFunc(graphite.ReadURL,
wrapped(graphite.NewRenderHandler(h.storage)).ServeHTTP,
wrapped(graphite.NewRenderHandler(h.storage, h.enforcer)).ServeHTTP,
).Methods(graphite.ReadHTTPMethods...)

h.router.HandleFunc(graphite.FindURL,
Expand Down
14 changes: 10 additions & 4 deletions src/query/api/v1/httpd/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func setupHandler(store storage.Storage) (*Handler, error) {
nil,
config.Configuration{LookbackDuration: &defaultLookbackDuration},
nil,
nil,
tally.NewTestScope("", nil))
}

Expand All @@ -83,8 +84,11 @@ func TestHandlerFetchTimeoutError(t *testing.T) {

negValue := -1 * time.Second
dbconfig := &dbconfig.DBConfiguration{Client: client.Configuration{FetchTimeout: &negValue}}
_, err := NewHandler(downsamplerAndWriter, makeTagOptions(), executor.NewEngine(storage, tally.NewTestScope("test", nil), time.Minute, nil), nil, nil,
config.Configuration{LookbackDuration: &defaultLookbackDuration}, dbconfig, tally.NewTestScope("", nil))
engine := executor.NewEngine(storage, tally.NewTestScope("test", nil), time.Minute, nil)
cfg := config.Configuration{LookbackDuration: &defaultLookbackDuration}
_, err := NewHandler(downsamplerAndWriter, makeTagOptions(), engine, nil, nil,
cfg, dbconfig, nil, tally.NewTestScope("", nil))

require.Error(t, err)
}

Expand All @@ -97,8 +101,10 @@ func TestHandlerFetchTimeout(t *testing.T) {

fourMin := 4 * time.Minute
dbconfig := &dbconfig.DBConfiguration{Client: client.Configuration{FetchTimeout: &fourMin}}
h, err := NewHandler(downsamplerAndWriter, makeTagOptions(), executor.NewEngine(storage, tally.NewTestScope("test", nil), time.Minute, nil), nil, nil,
config.Configuration{LookbackDuration: &defaultLookbackDuration}, dbconfig, tally.NewTestScope("", nil))
engine := executor.NewEngine(storage, tally.NewTestScope("test", nil), time.Minute, nil)
cfg := config.Configuration{LookbackDuration: &defaultLookbackDuration}
h, err := NewHandler(downsamplerAndWriter, makeTagOptions(), engine,
nil, nil, cfg, dbconfig, nil, tally.NewTestScope("", nil))
require.NoError(t, err)
assert.Equal(t, 4*time.Minute, h.timeoutOpts.FetchTimeout)
}
Expand Down
16 changes: 13 additions & 3 deletions src/query/graphite/storage/m3_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"errors"
"time"

"github.com/m3db/m3/src/query/cost"
xctx "github.com/m3db/m3/src/query/graphite/context"
"github.com/m3db/m3/src/query/graphite/graphite"
"github.com/m3db/m3/src/query/graphite/ts"
Expand All @@ -38,13 +39,21 @@ var (
)

type m3WrappedStore struct {
m3 storage.Storage
m3 storage.Storage
enforcer cost.ChainedEnforcer
}

// NewM3WrappedStorage creates a graphite storage wrapper around an m3query
// storage instance.
func NewM3WrappedStorage(m3storage storage.Storage) Storage {
return &m3WrappedStore{m3: m3storage}
func NewM3WrappedStorage(
m3storage storage.Storage,
enforcer cost.ChainedEnforcer,
) Storage {
if enforcer == nil {
enforcer = cost.NoopChainedEnforcer()
}

return &m3WrappedStore{m3: m3storage, enforcer: enforcer}
}

// TranslateQueryToMatchers converts a graphite query to tag matcher pairs.
Expand Down Expand Up @@ -130,6 +139,7 @@ func (s *m3WrappedStore) FetchByQuery(
m3ctx, cancel := context.WithTimeout(ctx.RequestContext(), opts.Timeout)
defer cancel()
fetchOptions := storage.NewFetchOptions()
fetchOptions.Enforcer = s.enforcer
fetchOptions.FanoutOptions = &storage.FanoutOptions{
FanoutUnaggregated: storage.FanoutForceDisable,
FanoutAggregated: storage.FanoutDefault,
Expand Down
10 changes: 9 additions & 1 deletion src/query/graphite/storage/m3_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ import (
"testing"
"time"

"github.com/m3db/m3/src/query/cost"
xctx "github.com/m3db/m3/src/query/graphite/context"
"github.com/m3db/m3/src/query/graphite/graphite"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/storage/mock"
m3ts "github.com/m3db/m3/src/query/ts"
xcost "github.com/m3db/m3/src/x/cost"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -137,7 +139,10 @@ func TestFetchByQuery(t *testing.T) {
}

store.SetFetchResult(&storage.FetchResult{SeriesList: seriesList}, nil)
wrapper := NewM3WrappedStorage(store)
enforcers := []xcost.Enforcer{xcost.NewEnforcer(nil, nil, nil)}
enforcer, err := cost.NewChainedEnforcer("name", enforcers)
require.NoError(t, err)
wrapper := NewM3WrappedStorage(store, enforcer)
ctx := xctx.New()
ctx.SetRequestContext(context.TODO())
end := time.Now()
Expand All @@ -156,4 +161,7 @@ func TestFetchByQuery(t *testing.T) {
series := result.SeriesList[0]
assert.Equal(t, "a", series.Name())
assert.Equal(t, []float64{3, 3, 3}, series.SafeValues())

// NB: ensure the fetch was called with enforcer propagated correctly
assert.Equal(t, enforcer, store.LastFetchOptions().Enforcer)
}
2 changes: 1 addition & 1 deletion src/query/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func Run(runOpts RunOptions) {
}

handler, err := httpd.NewHandler(downsamplerAndWriter, tagOptions, engine,
m3dbClusters, clusterClient, cfg, runOpts.DBConfig, scope)
m3dbClusters, clusterClient, cfg, runOpts.DBConfig, perQueryEnforcer, scope)
if err != nil {
logger.Fatal("unable to set up handlers", zap.Error(err))
}
Expand Down
20 changes: 16 additions & 4 deletions src/query/storage/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ import (
"time"

"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/query/cost"
"github.com/m3db/m3/src/query/generated/proto/prompb"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/ts"
xcost "github.com/m3db/m3/src/x/cost"
xsync "github.com/m3db/m3x/sync"
xtime "github.com/m3db/m3x/time"
)
Expand Down Expand Up @@ -266,6 +268,7 @@ func SeriesToPromSamples(series *ts.Series) []*prompb.Sample {

func iteratorToTsSeries(
iter encoding.SeriesIterator,
enforcer cost.ChainedEnforcer,
tagOptions models.TagOptions,
) (*ts.Series, error) {
metric, err := FromM3IdentToMetric(iter.ID(), iter.Tags(), tagOptions)
Expand All @@ -283,18 +286,24 @@ func iteratorToTsSeries(
return nil, err
}

r := enforcer.Add(xcost.Cost(len(datapoints)))
if r.Error != nil {
return nil, r.Error
}

return ts.NewSeries(metric.ID, datapoints, metric.Tags), nil
}

// Fall back to sequential decompression if unable to decompress concurrently
func decompressSequentially(
iterLength int,
iters []encoding.SeriesIterator,
enforcer cost.ChainedEnforcer,
tagOptions models.TagOptions,
) (*FetchResult, error) {
seriesList := make([]*ts.Series, 0, len(iters))
for _, iter := range iters {
series, err := iteratorToTsSeries(iter, tagOptions)
series, err := iteratorToTsSeries(iter, enforcer, tagOptions)
if err != nil {
return nil, err
}
Expand All @@ -310,6 +319,7 @@ func decompressConcurrently(
iterLength int,
iters []encoding.SeriesIterator,
readWorkerPool xsync.PooledWorkerPool,
enforcer cost.ChainedEnforcer,
tagOptions models.TagOptions,
) (*FetchResult, error) {
seriesList := make([]*ts.Series, iterLength)
Expand All @@ -336,7 +346,7 @@ func decompressConcurrently(
return
}

series, err := iteratorToTsSeries(iter, tagOptions)
series, err := iteratorToTsSeries(iter, enforcer, tagOptions)
if err != nil {
// Return the first error that is encountered.
select {
Expand Down Expand Up @@ -366,6 +376,7 @@ func SeriesIteratorsToFetchResult(
seriesIterators encoding.SeriesIterators,
readWorkerPool xsync.PooledWorkerPool,
cleanupSeriesIters bool,
enforcer cost.ChainedEnforcer,
tagOptions models.TagOptions,
) (*FetchResult, error) {
if cleanupSeriesIters {
Expand All @@ -375,8 +386,9 @@ func SeriesIteratorsToFetchResult(
iters := seriesIterators.Iters()
iterLength := seriesIterators.Len()
if readWorkerPool == nil {
return decompressSequentially(iterLength, iters, tagOptions)
return decompressSequentially(iterLength, iters, enforcer, tagOptions)
}

return decompressConcurrently(iterLength, iters, readWorkerPool, tagOptions)
return decompressConcurrently(iterLength, iters, readWorkerPool,
enforcer, tagOptions)
}
Loading

0 comments on commit 4610505

Please sign in to comment.