diff --git a/src/query/api/v1/handler/prom/common.go b/src/query/api/v1/handler/prom/common.go index cfabf76290..7c4a66a0d6 100644 --- a/src/query/api/v1/handler/prom/common.go +++ b/src/query/api/v1/handler/prom/common.go @@ -53,7 +53,8 @@ const ( type errorType string -type queryData struct { +// QueryData struct to be used when responding from HTTP handler. +type QueryData struct { ResultType promql.ValueType `json:"resultType"` Result promql.Value `json:"result"` } @@ -66,7 +67,8 @@ type response struct { Warnings []string `json:"warnings,omitempty"` } -func respond(w http.ResponseWriter, data interface{}, warnings promstorage.Warnings) { +// Responds with HTTP OK status code and writes response JSON to response body. +func Respond(w http.ResponseWriter, data interface{}, warnings promstorage.Warnings) { statusMessage := statusSuccess var warningStrings []string for _, warning := range warnings { @@ -88,7 +90,8 @@ func respond(w http.ResponseWriter, data interface{}, warnings promstorage.Warni w.Write(b) } -func respondError(w http.ResponseWriter, err error) { +// Responds with error status code and writes error JSON to response body. +func RespondError(w http.ResponseWriter, err error) { json := jsoniter.ConfigCompatibleWithStandardLibrary b, marshalErr := json.Marshal(&response{ Status: statusError, diff --git a/src/query/api/v1/handler/prom/prom.go b/src/query/api/v1/handler/prom/prom.go index 9ea523f18e..d731ac92c1 100644 --- a/src/query/api/v1/handler/prom/prom.go +++ b/src/query/api/v1/handler/prom/prom.go @@ -24,8 +24,11 @@ import ( "net/http" "time" + promstorage "github.com/prometheus/prometheus/storage" + "github.com/m3db/m3/src/query/api/v1/options" "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/graphite/errors" "github.com/m3db/m3/src/query/storage/prometheus" "github.com/prometheus/prometheus/promql" @@ -38,29 +41,65 @@ func init() { promql.SetDefaultEvaluationInterval(time.Minute) } -// Options defines options for PromQL handler. -type Options struct { - PromQLEngine *promql.Engine +// opts defines options for PromQL handler. +type opts struct { + promQLEngine *promql.Engine + instant bool + queryable promstorage.Queryable + newQueryFn NewQueryFn } -// NewReadHandler creates a handler to handle PromQL requests. -func NewReadHandler(opts Options, hOpts options.HandlerOptions) http.Handler { - queryable := prometheus.NewPrometheusQueryable( - prometheus.PrometheusOptions{ - Storage: hOpts.Storage(), - InstrumentOptions: hOpts.InstrumentOpts(), - }) - return newReadHandler(opts, hOpts, queryable) +// Option is a Prometheus handler option. +type Option func(*opts) error + +// WithEngine sets the PromQL engine. +func WithEngine(promQLEngine *promql.Engine) Option { + return withEngine(promQLEngine, false) +} + +// WithInstantEngine sets the PromQL instant engine. +func WithInstantEngine(promQLEngine *promql.Engine) Option { + return withEngine(promQLEngine, true) +} + +func withEngine(promQLEngine *promql.Engine, instant bool) Option { + return func(o *opts) error { + if promQLEngine == nil { + return errors.New("invalid engine") + } + o.instant = instant + o.promQLEngine = promQLEngine + o.newQueryFn = newRangeQueryFn(promQLEngine, o.queryable) + if instant { + o.newQueryFn = newInstantQueryFn(promQLEngine, o.queryable) + } + return nil + } } -// NewReadInstantHandler creates a handler to handle PromQL requests. -func NewReadInstantHandler(opts Options, hOpts options.HandlerOptions) http.Handler { +func newDefaultOptions(hOpts options.HandlerOptions) opts { queryable := prometheus.NewPrometheusQueryable( prometheus.PrometheusOptions{ Storage: hOpts.Storage(), InstrumentOptions: hOpts.InstrumentOpts(), }) - return newReadInstantHandler(opts, hOpts, queryable) + return opts{ + promQLEngine: hOpts.PrometheusEngine(), + queryable: queryable, + instant: false, + newQueryFn: newRangeQueryFn(hOpts.PrometheusEngine(), queryable), + } +} + +// NewReadHandler creates a handler to handle PromQL requests. +func NewReadHandler(hOpts options.HandlerOptions, options ...Option) (http.Handler, error) { + opts := newDefaultOptions(hOpts) + for _, optionFn := range options { + if err := optionFn(&opts); err != nil { + return nil, err + } + } + return newReadHandler(hOpts, opts) } // ApplyRangeWarnings applies warnings encountered during execution. diff --git a/src/query/api/v1/handler/prom/read.go b/src/query/api/v1/handler/prom/read.go index 3d379e22fb..b69cdecc39 100644 --- a/src/query/api/v1/handler/prom/read.go +++ b/src/query/api/v1/handler/prom/read.go @@ -31,6 +31,7 @@ import ( "github.com/m3db/m3/src/query/api/v1/handler/prometheus/native" "github.com/m3db/m3/src/query/api/v1/options" "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage/prometheus" xerrors "github.com/m3db/m3/src/x/errors" @@ -40,29 +41,57 @@ import ( "go.uber.org/zap" ) +// NewQueryFn creates a new promql Query. +type NewQueryFn func(params models.RequestParams) (promql.Query, error) + +var ( + newRangeQueryFn = func( + engine *promql.Engine, + queryable promstorage.Queryable, + ) NewQueryFn { + return func(params models.RequestParams) (promql.Query, error) { + return engine.NewRangeQuery( + queryable, + params.Query, + params.Start, + params.End, + params.Step) + } + } + + newInstantQueryFn = func( + engine *promql.Engine, + queryable promstorage.Queryable, + ) NewQueryFn { + return func(params models.RequestParams) (promql.Query, error) { + return engine.NewInstantQuery( + queryable, + params.Query, + params.Now) + } + } +) + type readHandler struct { - engine *promql.Engine - queryable promstorage.Queryable - hOpts options.HandlerOptions - scope tally.Scope - logger *zap.Logger + hOpts options.HandlerOptions + scope tally.Scope + logger *zap.Logger + opts opts } func newReadHandler( - opts Options, hOpts options.HandlerOptions, - queryable promstorage.Queryable, -) http.Handler { + options opts, +) (http.Handler, error) { scope := hOpts.InstrumentOpts().MetricsScope().Tagged( map[string]string{"handler": "prometheus-read"}, ) return &readHandler{ - engine: opts.PromQLEngine, - queryable: queryable, - hOpts: hOpts, - scope: scope, - logger: hOpts.InstrumentOpts().Logger(), - } + hOpts: hOpts, + opts: options, + scope: scope, + logger: hOpts.InstrumentOpts().Logger(), + }, nil } func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -70,16 +99,18 @@ func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { fetchOptions, err := h.hOpts.FetchOptionsBuilder().NewFetchOptions(r) if err != nil { - respondError(w, err) + RespondError(w, err) return } - request, err := native.ParseRequest(ctx, r, false, h.hOpts) + request, err := native.ParseRequest(ctx, r, h.opts.instant, h.hOpts) if err != nil { - respondError(w, err) + RespondError(w, err) return } + params := request.Params + // NB (@shreyas): We put the FetchOptions in context so it can be // retrieved in the queryable object as there is no other way to pass // that through. @@ -87,31 +118,28 @@ func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx = context.WithValue(ctx, prometheus.FetchOptionsContextKey, fetchOptions) ctx = context.WithValue(ctx, prometheus.BlockResultMetadataKey, &resultMetadata) - if request.Params.Timeout > 0 { + if params.Timeout > 0 { var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, request.Params.Timeout) + ctx, cancel = context.WithTimeout(ctx, params.Timeout) defer cancel() } - qry, err := h.engine.NewRangeQuery( - h.queryable, - request.Params.Query, - request.Params.Start, - request.Params.End, - request.Params.Step) + qry, err := h.opts.newQueryFn(params) if err != nil { - h.logger.Error("error creating range query", - zap.Error(err), zap.String("query", request.Params.Query)) - respondError(w, xerrors.NewInvalidParamsError(err)) + h.logger.Error("error creating query", + zap.Error(err), zap.String("query", params.Query), + zap.Bool("instant", h.opts.instant)) + RespondError(w, xerrors.NewInvalidParamsError(err)) return } defer qry.Close() res := qry.Exec(ctx) if res.Err != nil { - h.logger.Error("error executing range query", - zap.Error(res.Err), zap.String("query", request.Params.Query)) - respondError(w, res.Err) + h.logger.Error("error executing query", + zap.Error(res.Err), zap.String("query", params.Query), + zap.Bool("instant", h.opts.instant)) + RespondError(w, res.Err) return } @@ -119,15 +147,16 @@ func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { res.Warnings = append(res.Warnings, errors.New(warn.Message)) } - query := request.Params.Query + query := params.Query err = ApplyRangeWarnings(query, &resultMetadata) if err != nil { h.logger.Warn("error applying range warnings", - zap.Error(err), zap.String("query", query)) + zap.Error(err), zap.String("query", query), + zap.Bool("instant", h.opts.instant)) } handleroptions.AddResponseHeaders(w, resultMetadata, fetchOptions) - respond(w, &queryData{ + Respond(w, &QueryData{ Result: res.Value, ResultType: res.Value.Type(), }, res.Warnings) diff --git a/src/query/api/v1/handler/prom/read_instant.go b/src/query/api/v1/handler/prom/read_instant.go deleted file mode 100644 index 96a0d98b25..0000000000 --- a/src/query/api/v1/handler/prom/read_instant.go +++ /dev/null @@ -1,138 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package prom - -import ( - "context" - "errors" - "fmt" - "net/http" - "time" - - "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" - "github.com/m3db/m3/src/query/api/v1/options" - "github.com/m3db/m3/src/query/block" - "github.com/m3db/m3/src/query/storage/prometheus" - "github.com/m3db/m3/src/query/util" - xerrors "github.com/m3db/m3/src/x/errors" - - "github.com/prometheus/prometheus/promql" - promstorage "github.com/prometheus/prometheus/storage" - "github.com/uber-go/tally" - "go.uber.org/zap" -) - -type readInstantHandler struct { - queryable promstorage.Queryable - engine *promql.Engine - hOpts options.HandlerOptions - scope tally.Scope - logger *zap.Logger -} - -func newReadInstantHandler( - opts Options, - hOpts options.HandlerOptions, - queryable promstorage.Queryable, -) http.Handler { - scope := hOpts.InstrumentOpts().MetricsScope().Tagged( - map[string]string{"handler": "prometheus-read-instantaneous"}, - ) - return &readInstantHandler{ - engine: opts.PromQLEngine, - queryable: queryable, - hOpts: hOpts, - scope: scope, - logger: hOpts.InstrumentOpts().Logger(), - } -} - -func (h *readInstantHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - ts, err := util.ParseTimeStringWithDefault(r.FormValue("time"), time.Now()) - if err != nil { - respondError(w, err) - return - } - - fetchOptions, err := h.hOpts.FetchOptionsBuilder().NewFetchOptions(r) - if err != nil { - respondError(w, err) - return - } - - ctx := r.Context() - // NB (@shreyas): We put the FetchOptions in context so it can be - // retrieved in the queryable object as there is no other way to pass - // that through. - var resultMetadata block.ResultMetadata - ctx = context.WithValue(ctx, prometheus.FetchOptionsContextKey, fetchOptions) - ctx = context.WithValue(ctx, prometheus.BlockResultMetadataKey, &resultMetadata) - - if t := r.FormValue("timeout"); t != "" { - timeout, err := util.ParseDurationString(t) - if err != nil { - err = xerrors.NewInvalidParamsError( - fmt.Errorf("invalid parameter 'timeout': %v", err)) - respondError(w, err) - return - } - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, timeout) - defer cancel() - } - - query := r.FormValue("query") - qry, err := h.engine.NewInstantQuery( - h.queryable, - query, - ts) - if err != nil { - h.logger.Error("error creating instant query", - zap.Error(err), zap.String("query", query)) - respondError(w, xerrors.NewInvalidParamsError(err)) - return - } - defer qry.Close() - - res := qry.Exec(ctx) - if res.Err != nil { - h.logger.Error("error executing instant query", - zap.Error(res.Err), zap.String("query", query)) - respondError(w, res.Err) - return - } - - for _, warn := range resultMetadata.Warnings { - res.Warnings = append(res.Warnings, errors.New(warn.Message)) - } - - err = ApplyRangeWarnings(query, &resultMetadata) - if err != nil { - h.logger.Warn("error applying range warnings", - zap.Error(err), zap.String("query", query)) - } - handleroptions.AddResponseHeaders(w, resultMetadata, fetchOptions) - - respond(w, &queryData{ - Result: res.Value, - ResultType: res.Value.Type(), - }, res.Warnings) -} diff --git a/src/query/api/v1/handler/prom/read_test.go b/src/query/api/v1/handler/prom/read_test.go index f18815c60f..f67621b38b 100644 --- a/src/query/api/v1/handler/prom/read_test.go +++ b/src/query/api/v1/handler/prom/read_test.go @@ -35,6 +35,7 @@ import ( "github.com/m3db/m3/src/query/executor" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/instrument" + "github.com/prometheus/prometheus/pkg/labels" promstorage "github.com/prometheus/prometheus/storage" "github.com/stretchr/testify/require" @@ -57,10 +58,6 @@ type testHandlers struct { } func setupTest(t *testing.T) testHandlers { - opts := Options{ - PromQLEngine: testPromQLEngine, - } - fetchOptsBuilderCfg := handleroptions.FetchOptionsBuilderOptions{ Timeout: 15 * time.Second, } @@ -75,8 +72,20 @@ func setupTest(t *testing.T) testHandlers { SetFetchOptionsBuilder(fetchOptsBuilder). SetEngine(engine) queryable := &mockQueryable{} - readHandler := newReadHandler(opts, hOpts, queryable) - readInstantHandler := newReadInstantHandler(opts, hOpts, queryable) + readHandler, err := newReadHandler(hOpts, opts{ + promQLEngine: testPromQLEngine, + queryable: queryable, + instant: false, + newQueryFn: newRangeQueryFn(testPromQLEngine, queryable), + }) + require.NoError(t, err) + readInstantHandler, err := newReadHandler(hOpts, opts{ + promQLEngine: testPromQLEngine, + queryable: queryable, + instant: true, + newQueryFn: newInstantQueryFn(testPromQLEngine, queryable), + }) + require.NoError(t, err) return testHandlers{ queryable: queryable, readHandler: readHandler, diff --git a/src/query/api/v1/httpd/handler.go b/src/query/api/v1/httpd/handler.go index 2f8d07a9b2..83b34ec6bf 100644 --- a/src/query/api/v1/httpd/handler.go +++ b/src/query/api/v1/httpd/handler.go @@ -170,11 +170,16 @@ func (h *Handler) RegisterRoutes() error { Tagged(v1APIGroup), )) - opts := prom.Options{ - PromQLEngine: h.options.PrometheusEngine(), + promqlQueryHandler, err := prom.NewReadHandler(nativeSourceOpts, + prom.WithEngine(h.options.PrometheusEngine())) + if err != nil { + return err + } + promqlInstantQueryHandler, err := prom.NewReadHandler(nativeSourceOpts, + prom.WithInstantEngine(h.options.PrometheusEngine())) + if err != nil { + return err } - promqlQueryHandler := prom.NewReadHandler(opts, nativeSourceOpts) - promqlInstantQueryHandler := prom.NewReadInstantHandler(opts, nativeSourceOpts) nativePromReadHandler := native.NewPromReadHandler(nativeSourceOpts) nativePromReadInstantHandler := native.NewPromReadInstantHandler(nativeSourceOpts) diff --git a/src/query/api/v1/httpd/handler_test.go b/src/query/api/v1/httpd/handler_test.go index 51b0b52e1f..53558d7192 100644 --- a/src/query/api/v1/httpd/handler_test.go +++ b/src/query/api/v1/httpd/handler_test.go @@ -48,6 +48,7 @@ import ( "github.com/golang/mock/gomock" "github.com/gorilla/mux" "github.com/opentracing/opentracing-go/mocktracer" + "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -101,7 +102,7 @@ func setupHandler( downsamplerAndWriter, makeTagOptions(), engine, - nil, + newPromEngine(), nil, nil, config.Configuration{LookbackDuration: &defaultLookbackDuration}, @@ -124,6 +125,13 @@ func setupHandler( return NewHandler(opts, customHandlers...), nil } +func newPromEngine() *promql.Engine { + return promql.NewEngine(promql.EngineOpts{ + MaxSamples: 10000, + Timeout: 100 * time.Second, + }) +} + func TestPromRemoteReadGet(t *testing.T) { req := httptest.NewRequest("GET", remote.PromReadURL, nil) res := httptest.NewRecorder() @@ -394,7 +402,8 @@ func TestCustomRoutes(t *testing.T) { }) require.NoError(t, err) opts, err := options.NewHandlerOptions( - downsamplerAndWriter, makeTagOptions().SetMetricName([]byte("z")), engine, nil, nil, nil, + downsamplerAndWriter, makeTagOptions().SetMetricName([]byte("z")), + engine, newPromEngine(), nil, nil, config.Configuration{LookbackDuration: &defaultLookbackDuration}, nil, fetchOptsBuilder, models.QueryContextOptions{}, instrumentOpts, defaultCPUProfileduration, defaultPlacementServices, svcDefaultOptions, NewQueryRouter(), NewQueryRouter(),