Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prom read instant handler refactoring #2928

Merged
merged 14 commits into from
Nov 24, 2020
6 changes: 3 additions & 3 deletions src/query/api/v1/handler/prom/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ const (

type errorType string

type queryData struct {
type QueryData struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, it doesn't look like queryData needs to be public, at least in the scope of this PR (ditto for Respond and RespondError). Am I missing something?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we can now override default handlers, we might need these for such use-cases.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should take a bit more care when exposing things. For example, there seems to be a duplicate of this struct in prometheus/response.go, which doesn't surface types from prometheus repository. Maybe we should pick one of those to be public, and not both?

type data struct {
// ResultType is the type of Result (matrix, vector, etc.).
ResultType string
// Result contains the query result (concrete type depends on ResultType).
Result result
}
type result interface {
matches(other result) (MatchInformation, error)
}
// MatrixResult contains a list matrixRow.
type MatrixResult struct {
Result []matrixRow `json:"result"`
}
// VectorResult contains a list of vectorItem.
type VectorResult struct {
Result []vectorItem `json:"result"`
}
// ScalarResult is the scalar Value for the response.
type ScalarResult struct {
Result Value `json:"result"`
}
// StringResult is the string Value for the response.
type StringResult struct {
Result Value `json:"result"`
}

Similar question with Respond and RespondError – are these the methods we want to have exposed? Aren't there better candidates?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

response is not made public in this PR so no changes there.
As for Respond and RespondError, I don't think exposing them would make any harm. They do not belong to any class and could be reused by custom handlers without a need to copy the same code (as we are currently doing in our private codebase).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Need to add comment to exported types // QueryData is ....

ResultType promql.ValueType `json:"resultType"`
Result promql.Value `json:"result"`
}
Expand All @@ -66,7 +66,7 @@ type response struct {
Warnings []string `json:"warnings,omitempty"`
}

func respond(w http.ResponseWriter, data interface{}, warnings promstorage.Warnings) {
func Respond(w http.ResponseWriter, data interface{}, warnings promstorage.Warnings) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Need to add comment to exported types // Respond ....

statusMessage := statusSuccess
var warningStrings []string
for _, warning := range warnings {
Expand All @@ -88,7 +88,7 @@ func respond(w http.ResponseWriter, data interface{}, warnings promstorage.Warni
w.Write(b)
}

func respondError(w http.ResponseWriter, err error) {
func RespondError(w http.ResponseWriter, err error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Need to add comment to exported types // RespondError ....

json := jsoniter.ConfigCompatibleWithStandardLibrary
b, marshalErr := json.Marshal(&response{
Status: statusError,
Expand Down
19 changes: 9 additions & 10 deletions src/query/api/v1/handler/prom/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ func init() {
// Options defines options for PromQL handler.
type Options struct {
PromQLEngine *promql.Engine
instant bool
}
vpranckaitis marked this conversation as resolved.
Show resolved Hide resolved

func (o Options) WithInstant(instant bool) Options {
return Options{
PromQLEngine: o.PromQLEngine,
instant: instant,
}
vpranckaitis marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

@robskillington robskillington Nov 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to do parameterized options for this change, we can simplify this a bit (also note the WithFoo exported symbols need comments):

type opts struct {
  promQLEngine *promql.Engine
  instant      bool
  newQueryFn   NewQueryFn
}

func newDefaultOptions(hOpts options.HandlerOptions) opts {
  return opts{
    promQLEngine: hOpts.PrometheusEngine(),
    instant:      false,
    newQueryFn:   newRangeQueryFn,
  }
}

// Option is a Prometheus handler option.
type Option func(*opts)

// WithEngine sets the PromQL engine.
func WithEngine(promQLEngine *promql.Engine) Option {
  return withEngine(promQLEngine, false)
}

// WithInstantEngine sets the PromQL instant engine.
func WithInstantEngine(promQLEngine *promql.Engine) Option {
  return withEngine(promQLEngine, true)
}

func withEngine(promQLEngine *promql.Engine, instant bool) Option {
  return func(o *opts) error {
    if promQLEngine == nil {
      return errors.New("invalid engine")
    }
    o.instant = instant
    o.promQLEngine = promQLEngine
    o.newQueryFn = newRangeQueryFn
    if instant {
      o.newQueryFn = newInstantQueryFn
    }
    return nil
  }
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks much simpler to me 👍🏻

}

// NewReadHandler creates a handler to handle PromQL requests.
Expand All @@ -59,17 +67,8 @@ func NewReadHandlerWithHooks(
Storage: hOpts.Storage(),
InstrumentOptions: hOpts.InstrumentOpts(),
})
return newReadHandler(opts, hOpts, hooks, queryable)
}

// NewReadInstantHandler creates a handler to handle PromQL requests.
func NewReadInstantHandler(opts Options, hOpts options.HandlerOptions) http.Handler {
queryable := prometheus.NewPrometheusQueryable(
prometheus.PrometheusOptions{
Storage: hOpts.Storage(),
InstrumentOptions: hOpts.InstrumentOpts(),
})
return newReadInstantHandler(opts, hOpts, queryable)
return newReadHandler(opts, hOpts, hooks, queryable)
}

// ApplyRangeWarnings applies warnings encountered during execution.
Expand Down
100 changes: 70 additions & 30 deletions src/query/api/v1/handler/prom/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,47 @@ type ReadHandlerHooks interface {
) (models.RequestParams, error)
}

type NewQueryFn func(
engine *promql.Engine,
queryable promstorage.Queryable,
params models.RequestParams,
) (promql.Query, error)

var (
newRangeQueryFn = func(
engine *promql.Engine,
queryable promstorage.Queryable,
params models.RequestParams,
) (promql.Query, error) {
return engine.NewRangeQuery(
queryable,
params.Query,
params.Start,
params.End,
params.Step)
}

newInstantQueryFn = func(
engine *promql.Engine,
queryable promstorage.Queryable,
params models.RequestParams,
) (promql.Query, error) {
return engine.NewInstantQuery(
queryable,
params.Query,
params.Now)
}
vpranckaitis marked this conversation as resolved.
Show resolved Hide resolved
)

type readHandler struct {
engine *promql.Engine
hooks ReadHandlerHooks
queryable promstorage.Queryable
hOpts options.HandlerOptions
scope tally.Scope
logger *zap.Logger
engine *promql.Engine
hooks ReadHandlerHooks
queryable promstorage.Queryable
newQueryFn NewQueryFn
hOpts options.HandlerOptions
scope tally.Scope
logger *zap.Logger
opts Options
}

func newReadHandler(
Expand All @@ -68,13 +102,21 @@ func newReadHandler(
scope := hOpts.InstrumentOpts().MetricsScope().Tagged(
map[string]string{"handler": "prometheus-read"},
)

newQueryFn := newRangeQueryFn
if opts.instant {
newQueryFn = newInstantQueryFn
}

return &readHandler{
engine: opts.PromQLEngine,
hooks: hooks,
queryable: queryable,
hOpts: hOpts,
scope: scope,
logger: hOpts.InstrumentOpts().Logger(),
engine: opts.PromQLEngine,
hooks: hooks,
queryable: queryable,
newQueryFn: newQueryFn,
hOpts: hOpts,
opts: opts,
scope: scope,
logger: hOpts.InstrumentOpts().Logger(),
}
}

Expand All @@ -83,19 +125,19 @@ func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

fetchOptions, err := h.hOpts.FetchOptionsBuilder().NewFetchOptions(r)
if err != nil {
respondError(w, err)
RespondError(w, err)
return
}

request, err := native.ParseRequest(ctx, r, false, h.hOpts)
request, err := native.ParseRequest(ctx, r, h.opts.instant, h.hOpts)
if err != nil {
respondError(w, err)
RespondError(w, err)
return
}

params, err := h.hooks.OnParsedRequest(ctx, r, request.Params)
if err != nil {
respondError(w, err)
RespondError(w, err)
return
}

Expand All @@ -112,25 +154,22 @@ func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer cancel()
}

qry, err := h.engine.NewRangeQuery(
h.queryable,
params.Query,
params.Start,
params.End,
params.Step)
qry, err := h.newQueryFn(h.engine, h.queryable, params)
if err != nil {
h.logger.Error("error creating range query",
zap.Error(err), zap.String("query", params.Query))
respondError(w, err)
h.logger.Error("error creating query",
zap.Error(err), zap.String("query", params.Query),
zap.Bool("instant", h.opts.instant))
RespondError(w, err)
return
}
defer qry.Close()

res := qry.Exec(ctx)
if res.Err != nil {
h.logger.Error("error executing range query",
zap.Error(res.Err), zap.String("query", params.Query))
respondError(w, res.Err)
h.logger.Error("error executing query",
zap.Error(res.Err), zap.String("query", params.Query),
zap.Bool("instant", h.opts.instant))
RespondError(w, res.Err)
return
}

Expand All @@ -142,11 +181,12 @@ func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
err = ApplyRangeWarnings(query, &resultMetadata)
if err != nil {
h.logger.Warn("error applying range warnings",
zap.Error(err), zap.String("query", query))
zap.Error(err), zap.String("query", query),
zap.Bool("instant", h.opts.instant))
}

handleroptions.AddWarningHeaders(w, resultMetadata)
respond(w, &queryData{
Respond(w, &QueryData{
Result: res.Value,
ResultType: res.Value.Type(),
}, res.Warnings)
Expand Down
138 changes: 0 additions & 138 deletions src/query/api/v1/handler/prom/read_instant.go

This file was deleted.

4 changes: 3 additions & 1 deletion src/query/api/v1/handler/prom/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/m3db/m3/src/query/executor"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/instrument"

"github.com/prometheus/prometheus/pkg/labels"
promstorage "github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -78,7 +79,8 @@ func setupTest(_ *testing.T) testHandlers {
SetTimeoutOpts(timeoutOpts)
queryable := &mockQueryable{}
readHandler := newReadHandler(opts, hOpts, &noopReadHandlerHooks{}, queryable)
readInstantHandler := newReadInstantHandler(opts, hOpts, queryable)
readInstantHandler := newReadHandler(opts.WithInstant(true), hOpts,
&noopReadHandlerHooks{}, queryable)
return testHandlers{
queryable: queryable,
readHandler: readHandler,
Expand Down
2 changes: 1 addition & 1 deletion src/query/api/v1/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (h *Handler) RegisterRoutes() error {
PromQLEngine: h.options.PrometheusEngine(),
}
promqlQueryHandler := wrapped(prom.NewReadHandler(opts, nativeSourceOpts))
promqlInstantQueryHandler := wrapped(prom.NewReadInstantHandler(opts, nativeSourceOpts))
promqlInstantQueryHandler := wrapped(prom.NewReadHandler(opts.WithInstant(true), nativeSourceOpts))
nativePromReadHandler := wrapped(native.NewPromReadHandler(nativeSourceOpts))
nativePromReadInstantHandler := wrapped(native.NewPromReadInstantHandler(nativeSourceOpts))

Expand Down