Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prom read instant handler refactoring #2928

Merged
merged 14 commits into from
Nov 24, 2020
9 changes: 6 additions & 3 deletions src/query/api/v1/handler/prom/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ const (

type errorType string

type queryData struct {
// QueryData struct to be used when responding from HTTP handler.
type QueryData struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, it doesn't look like queryData needs to be public, at least in the scope of this PR (ditto for Respond and RespondError). Am I missing something?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we can now override default handlers, we might need these for such use-cases.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should take a bit more care when exposing things. For example, there seems to be a duplicate of this struct in prometheus/response.go, which doesn't surface types from prometheus repository. Maybe we should pick one of those to be public, and not both?

type data struct {
// ResultType is the type of Result (matrix, vector, etc.).
ResultType string
// Result contains the query result (concrete type depends on ResultType).
Result result
}
type result interface {
matches(other result) (MatchInformation, error)
}
// MatrixResult contains a list matrixRow.
type MatrixResult struct {
Result []matrixRow `json:"result"`
}
// VectorResult contains a list of vectorItem.
type VectorResult struct {
Result []vectorItem `json:"result"`
}
// ScalarResult is the scalar Value for the response.
type ScalarResult struct {
Result Value `json:"result"`
}
// StringResult is the string Value for the response.
type StringResult struct {
Result Value `json:"result"`
}

Similar question with Respond and RespondError – are these the methods we want to have exposed? Aren't there better candidates?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

response is not made public in this PR so no changes there.
As for Respond and RespondError, I don't think exposing them would make any harm. They do not belong to any class and could be reused by custom handlers without a need to copy the same code (as we are currently doing in our private codebase).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Need to add comment to exported types // QueryData is ....

ResultType promql.ValueType `json:"resultType"`
Result promql.Value `json:"result"`
}
Expand All @@ -66,7 +67,8 @@ type response struct {
Warnings []string `json:"warnings,omitempty"`
}

func respond(w http.ResponseWriter, data interface{}, warnings promstorage.Warnings) {
// Responds with HTTP OK status code and writes response JSON to response body.
func Respond(w http.ResponseWriter, data interface{}, warnings promstorage.Warnings) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Need to add comment to exported types // Respond ....

statusMessage := statusSuccess
var warningStrings []string
for _, warning := range warnings {
Expand All @@ -88,7 +90,8 @@ func respond(w http.ResponseWriter, data interface{}, warnings promstorage.Warni
w.Write(b)
}

func respondError(w http.ResponseWriter, err error) {
// Responds with error status code and writes error JSON to response body.
func RespondError(w http.ResponseWriter, err error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Need to add comment to exported types // RespondError ....

json := jsoniter.ConfigCompatibleWithStandardLibrary
b, marshalErr := json.Marshal(&response{
Status: statusError,
Expand Down
67 changes: 54 additions & 13 deletions src/query/api/v1/handler/prom/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/graphite/errors"
"github.com/m3db/m3/src/query/storage/prometheus"

"github.com/prometheus/prometheus/promql"
Expand All @@ -38,29 +39,69 @@ func init() {
promql.SetDefaultEvaluationInterval(time.Minute)
}

// Options defines options for PromQL handler.
type Options struct {
PromQLEngine *promql.Engine
// opts defines options for PromQL handler.
type opts struct {
promQLEngine *promql.Engine
instant bool
newQueryFn NewQueryFn
}

type Option interface {
apply(*opts) error
}

func WithEngine(promQLEngine *promql.Engine) Option {
return instantEngineOption{promQLEngine: promQLEngine, instant: false}
}

func WithInstantEngine(promQLEngine *promql.Engine) Option {
return instantEngineOption{promQLEngine: promQLEngine, instant: true}
}

type instantEngineOption struct {
promQLEngine *promql.Engine
instant bool
}

func (o instantEngineOption) apply(options *opts) error {
if o.promQLEngine == nil {
return errors.New("invalid engine")
}
options.instant = o.instant
options.promQLEngine = o.promQLEngine
options.newQueryFn = newRangeQueryFn
if o.instant {
options.newQueryFn = newInstantQueryFn
}
return nil
}

func newDefaultOptions(hOpts options.HandlerOptions) opts {
return opts{
promQLEngine: hOpts.PrometheusEngine(),
instant: false,
newQueryFn: newRangeQueryFn,
}
Copy link
Collaborator

@robskillington robskillington Nov 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to do parameterized options for this change, we can simplify this a bit (also note the WithFoo exported symbols need comments):

type opts struct {
  promQLEngine *promql.Engine
  instant      bool
  newQueryFn   NewQueryFn
}

func newDefaultOptions(hOpts options.HandlerOptions) opts {
  return opts{
    promQLEngine: hOpts.PrometheusEngine(),
    instant:      false,
    newQueryFn:   newRangeQueryFn,
  }
}

// Option is a Prometheus handler option.
type Option func(*opts)

// WithEngine sets the PromQL engine.
func WithEngine(promQLEngine *promql.Engine) Option {
  return withEngine(promQLEngine, false)
}

// WithInstantEngine sets the PromQL instant engine.
func WithInstantEngine(promQLEngine *promql.Engine) Option {
  return withEngine(promQLEngine, true)
}

func withEngine(promQLEngine *promql.Engine, instant bool) Option {
  return func(o *opts) error {
    if promQLEngine == nil {
      return errors.New("invalid engine")
    }
    o.instant = instant
    o.promQLEngine = promQLEngine
    o.newQueryFn = newRangeQueryFn
    if instant {
      o.newQueryFn = newInstantQueryFn
    }
    return nil
  }
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks much simpler to me 👍🏻

}

// NewReadHandler creates a handler to handle PromQL requests.
func NewReadHandler(opts Options, hOpts options.HandlerOptions) http.Handler {
queryable := prometheus.NewPrometheusQueryable(
prometheus.PrometheusOptions{
Storage: hOpts.Storage(),
InstrumentOptions: hOpts.InstrumentOpts(),
})
return newReadHandler(opts, hOpts, queryable)
func NewReadHandler(hOpts options.HandlerOptions, options ...Option) (http.Handler, error) {
return NewReadHandlerWithHooks(hOpts, &noopReadHandlerHooks{}, options...)
}

// NewReadInstantHandler creates a handler to handle PromQL requests.
func NewReadInstantHandler(opts Options, hOpts options.HandlerOptions) http.Handler {
// NewReadHandlerWithHooks creates a handler for PromQL requests that accepts ReadHandlerHooks.
func NewReadHandlerWithHooks(
hOpts options.HandlerOptions,
hooks ReadHandlerHooks,
options ...Option,
) (http.Handler, error) {
queryable := prometheus.NewPrometheusQueryable(
prometheus.PrometheusOptions{
Storage: hOpts.Storage(),
InstrumentOptions: hOpts.InstrumentOpts(),
})
return newReadInstantHandler(opts, hOpts, queryable)

return newReadHandler(hOpts, hooks, queryable, options...)
}

// ApplyRangeWarnings applies warnings encountered during execution.
Expand Down
136 changes: 103 additions & 33 deletions src/query/api/v1/handler/prom/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/native"
"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage/prometheus"
xerrors "github.com/m3db/m3/src/x/errors"

Expand All @@ -40,43 +41,104 @@ import (
"go.uber.org/zap"
)

// ReadHandlerHooks allows dynamic plugging into read request processing.
type ReadHandlerHooks interface {
// OnParsedRequest gets invoked after parsing request arguments.
OnParsedRequest(
context.Context,
*http.Request,
models.RequestParams,
) (models.RequestParams, error)
}

// NewQueryFn creates a new promql Query.
type NewQueryFn func(
engine *promql.Engine,
queryable promstorage.Queryable,
params models.RequestParams,
) (promql.Query, error)

var (
newRangeQueryFn = func(
engine *promql.Engine,
queryable promstorage.Queryable,
params models.RequestParams,
) (promql.Query, error) {
return engine.NewRangeQuery(
queryable,
params.Query,
params.Start,
params.End,
params.Step)
}

newInstantQueryFn = func(
engine *promql.Engine,
queryable promstorage.Queryable,
params models.RequestParams,
) (promql.Query, error) {
return engine.NewInstantQuery(
queryable,
params.Query,
params.Now)
}
vpranckaitis marked this conversation as resolved.
Show resolved Hide resolved
)

type readHandler struct {
engine *promql.Engine
queryable promstorage.Queryable
hOpts options.HandlerOptions
scope tally.Scope
logger *zap.Logger
engine *promql.Engine
hooks ReadHandlerHooks
queryable promstorage.Queryable
hOpts options.HandlerOptions
scope tally.Scope
logger *zap.Logger
opts opts
}

func newReadHandler(
opts Options,
hOpts options.HandlerOptions,
hooks ReadHandlerHooks,
queryable promstorage.Queryable,
) http.Handler {
options ...Option,
) (http.Handler, error) {
scope := hOpts.InstrumentOpts().MetricsScope().Tagged(
map[string]string{"handler": "prometheus-read"},
)
return &readHandler{
engine: opts.PromQLEngine,
queryable: queryable,
hOpts: hOpts,
scope: scope,
logger: hOpts.InstrumentOpts().Logger(),
opts := newDefaultOptions(hOpts)
for _, option := range options {
if err := option.apply(&opts); err != nil {
return nil, err
}
}

return &readHandler{
engine: opts.promQLEngine,
hooks: hooks,
queryable: queryable,
hOpts: hOpts,
opts: opts,
scope: scope,
logger: hOpts.InstrumentOpts().Logger(),
}, nil
}

func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

fetchOptions, err := h.hOpts.FetchOptionsBuilder().NewFetchOptions(r)
if err != nil {
respondError(w, err)
RespondError(w, err)
return
}

request, err := native.ParseRequest(ctx, r, false, h.hOpts)
request, err := native.ParseRequest(ctx, r, h.opts.instant, h.hOpts)
if err != nil {
respondError(w, err)
RespondError(w, err)
return
}

params, err := h.hooks.OnParsedRequest(ctx, r, request.Params)
if err != nil {
RespondError(w, err)
return
}

Expand All @@ -87,48 +149,56 @@ func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx = context.WithValue(ctx, prometheus.FetchOptionsContextKey, fetchOptions)
ctx = context.WithValue(ctx, prometheus.BlockResultMetadataKey, &resultMetadata)

if request.Params.Timeout > 0 {
if params.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, request.Params.Timeout)
ctx, cancel = context.WithTimeout(ctx, params.Timeout)
defer cancel()
}

qry, err := h.engine.NewRangeQuery(
h.queryable,
request.Params.Query,
request.Params.Start,
request.Params.End,
request.Params.Step)
qry, err := h.opts.newQueryFn(h.engine, h.queryable, params)
if err != nil {
h.logger.Error("error creating range query",
zap.Error(err), zap.String("query", request.Params.Query))
respondError(w, xerrors.NewInvalidParamsError(err))
h.logger.Error("error creating query",
zap.Error(err), zap.String("query", params.Query),
zap.Bool("instant", h.opts.instant))
RespondError(w, xerrors.NewInvalidParamsError(err))
return
}
defer qry.Close()

res := qry.Exec(ctx)
if res.Err != nil {
h.logger.Error("error executing range query",
zap.Error(res.Err), zap.String("query", request.Params.Query))
respondError(w, res.Err)
h.logger.Error("error executing query",
zap.Error(res.Err), zap.String("query", params.Query),
zap.Bool("instant", h.opts.instant))
RespondError(w, res.Err)
return
}

for _, warn := range resultMetadata.Warnings {
res.Warnings = append(res.Warnings, errors.New(warn.Message))
}

query := request.Params.Query
query := params.Query
err = ApplyRangeWarnings(query, &resultMetadata)
if err != nil {
h.logger.Warn("error applying range warnings",
zap.Error(err), zap.String("query", query))
zap.Error(err), zap.String("query", query),
zap.Bool("instant", h.opts.instant))
}

handleroptions.AddWarningHeaders(w, resultMetadata)
respond(w, &queryData{
Respond(w, &QueryData{
Result: res.Value,
ResultType: res.Value.Type(),
}, res.Warnings)
}

type noopReadHandlerHooks struct{}

func (h *noopReadHandlerHooks) OnParsedRequest(
_ context.Context,
_ *http.Request,
params models.RequestParams,
) (models.RequestParams, error) {
return params, nil
}
Loading