Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP [query] Add debug params to Prometheus Remote Read to return raw query data for PromQL queries as input as well as JSON formatted results #2267

24 changes: 16 additions & 8 deletions src/query/api/v1/handler/prometheus/native/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,20 +289,27 @@ func filterNaNSeries(
return filtered
}

func renderResultsJSON(
// RenderResultsOptions is a set of options for rendering the result.
type RenderResultsOptions struct {
Start time.Time
End time.Time
KeepNaNs bool
}

// RenderResultsJSON renders results in JSON for range queries.
func RenderResultsJSON(
w io.Writer,
result ReadResult,
params models.RequestParams,
keepNans bool,
opts RenderResultsOptions,
) {
var (
series = result.Series
warnings = result.Meta.WarningStrings()
)

// NB: if dropping NaNs, drop series with only NaNs from output entirely.
if !keepNans {
series = filterNaNSeries(series, params.Start, params.End)
if !opts.KeepNaNs {
series = filterNaNSeries(series, opts.Start, opts.End)
}

jw := json.NewWriter(w)
Expand Down Expand Up @@ -347,15 +354,15 @@ func renderResultsJSON(
dp := vals.DatapointAt(i)

// If keepNaNs is set to false and the value is NaN, drop it from the response.
if !keepNans && math.IsNaN(dp.Value) {
if !opts.KeepNaNs && math.IsNaN(dp.Value) {
continue
}

// Skip points before the query boundary. Ideal place to adjust these
// would be at the result node but that would make it inefficient since
// we would need to create another block just for the sake of restricting
// the bounds.
if dp.Timestamp.Before(params.Start) {
if dp.Timestamp.Before(opts.Start) {
continue
}

Expand All @@ -380,7 +387,8 @@ func renderResultsJSON(
jw.Close()
}

func renderResultsInstantaneousJSON(
// RenderResultsInstantaneousJSON renders results in JSON for instant queries.
func RenderResultsInstantaneousJSON(
w io.Writer,
result ReadResult,
) {
Expand Down
6 changes: 5 additions & 1 deletion src/query/api/v1/handler/prometheus/native/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,11 @@ func (h *PromReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.promReadMetrics.fetchSuccess.Inc(1)
keepNans := h.opts.Config().ResultOptions.KeepNans
// TODO: Support multiple result types
renderResultsJSON(w, result, parsed.params, keepNans)
RenderResultsJSON(w, result, RenderResultsOptions{
Start: parsed.params.Start,
End: parsed.params.End,
KeepNaNs: keepNans,
})
}

type parsed struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,5 @@ func (h *PromReadInstantHandler) ServeHTTP(
// TODO: Support multiple result types
w.Header().Set("Content-Type", "application/json")
handleroptions.AddWarningHeaders(w, result.Meta)
renderResultsInstantaneousJSON(w, result)
RenderResultsInstantaneousJSON(w, result)
}
231 changes: 225 additions & 6 deletions src/query/api/v1/handler/prometheus/remote/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@ package remote
import (
"bytes"
"context"
"encoding/json"
"errors"
"net/http"
"strings"
"sync"
"time"

comparator "github.com/m3db/m3/src/cmd/services/m3comparator/main/parser"
"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus"
"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
Expand All @@ -35,22 +40,28 @@ import (
"github.com/m3db/m3/src/query/generated/proto/prompb"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/ts"
"github.com/m3db/m3/src/query/util"
"github.com/m3db/m3/src/query/util/logging"
xerrors "github.com/m3db/m3/src/x/errors"
xhttp "github.com/m3db/m3/src/x/net/http"

"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/uber-go/tally"
"go.uber.org/zap"
)

const (
// PromReadURL is the url for remote prom read handler
PromReadURL = handler.RoutePrefixV1 + "/prom/remote/read"
)

// PromReadHTTPMethod is the HTTP method used with this resource.
PromReadHTTPMethod = http.MethodPost
var (
// PromReadHTTPMethods are the HTTP methods used with this resource.
PromReadHTTPMethods = []string{http.MethodPost, http.MethodGet}
)

// promReadHandler is a handler for the prometheus remote read endpoint.
Expand Down Expand Up @@ -121,14 +132,80 @@ func (h *promReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// NB: if this errors, all relevant headers and information should already
// be sent to the writer; so it is not necessary to do anything here other
// than increment success/failure metrics.
err = WriteSnappyCompressed(w, readResult, logger)
switch r.FormValue("format") {
case "json":
result := readResultsJSON{
Queries: make([]queryResultsJSON, 0, len(req.Queries)),
}
for i, q := range req.Queries {
start := storage.PromTimestampToTime(q.StartTimestampMs)
end := storage.PromTimestampToTime(q.EndTimestampMs)

all := readResult.Result[i].Timeseries
timeseries := make([]comparator.Series, 0, len(all))
for _, s := range all {
datapoints := storage.PromSamplesToM3Datapoints(s.Samples)
tags := storage.PromLabelsToM3Tags(s.Labels, h.opts.TagOptions())
series := toSeries(datapoints, tags)
series.Start = start
series.End = end
timeseries = append(timeseries, series)
}

matchers := make([]labelMatcherJSON, 0, len(q.Matchers))
for _, m := range q.Matchers {
matcher := labelMatcherJSON{
Type: m.Type.String(),
Name: string(m.Name),
Value: string(m.Value),
}
matchers = append(matchers, matcher)
}

result.Queries = append(result.Queries, queryResultsJSON{
Query: queryJSON{
Matchers: matchers,
},
Start: start,
End: end,
Series: timeseries,
})
}

w.Header().Set("Content-Type", "application/json")
err = json.NewEncoder(w).Encode(result)
default:
err = WriteSnappyCompressed(w, readResult, logger)
}

if err != nil {
h.promReadMetrics.fetchErrorsServer.Inc(1)
} else {
h.promReadMetrics.fetchSuccess.Inc(1)
}
}

type readResultsJSON struct {
Queries []queryResultsJSON `json:"queries"`
}

type queryResultsJSON struct {
Query queryJSON `json:"query"`
Start time.Time `json:"start"`
End time.Time `json:"end"`
Series []comparator.Series `json:"series"`
}

type queryJSON struct {
Matchers []labelMatcherJSON `json:"matchers"`
}

type labelMatcherJSON struct {
Type string `json:"type"`
Name string `json:"name"`
Value string `json:"value"`
}

// WriteSnappyCompressed writes snappy compressed results to the given writer.
func WriteSnappyCompressed(
w http.ResponseWriter,
Expand Down Expand Up @@ -188,9 +265,97 @@ func parseRequest(
r *http.Request,
opts options.HandlerOptions,
) (*prompb.ReadRequest, *storage.FetchOptions, *xhttp.ParseError) {
req, rErr := parseCompressedRequest(r)
if rErr != nil {
return nil, nil, rErr
var req *prompb.ReadRequest
switch {
case r.Method == http.MethodGet && strings.TrimSpace(r.FormValue("expr")) != "":
exprParam := strings.TrimSpace(r.FormValue("expr"))
queryStart, err := util.ParseTimeString(r.FormValue("start"))
if err != nil {
return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

queryEnd, err := util.ParseTimeString(r.FormValue("end"))
if err != nil {
return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

req = &prompb.ReadRequest{}

expr, err := promql.ParseExpr(exprParam)
if err != nil {
return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

var (
evalRange time.Duration
)
promql.Inspect(expr, func(node promql.Node, path []promql.Node) error {
var (
start = queryStart
end = queryEnd
)

switch n := node.(type) {
case *promql.VectorSelector:
if evalRange > 0 {
start = start.Add(-1 * evalRange)
evalRange = 0
}

if n.Offset > 0 {
offsetMilliseconds := time.Duration(n.Offset) * time.Millisecond
start = start.Add(-1 * offsetMilliseconds)
end = end.Add(-1 * offsetMilliseconds)
}

matchers, err := toLabelMatchers(n.LabelMatchers)
if err != nil {
return err
}

query := &prompb.Query{
StartTimestampMs: storage.TimeToPromTimestamp(start),
EndTimestampMs: storage.TimeToPromTimestamp(end),
Matchers: matchers,
}

req.Queries = append(req.Queries, query)

case *promql.MatrixSelector:
evalRange = n.Range
if evalRange > 0 {
start = start.Add(-1 * evalRange)
evalRange = 0
}

if n.Offset > 0 {
offsetMilliseconds := time.Duration(n.Offset) * time.Millisecond
start = start.Add(-1 * offsetMilliseconds)
end = end.Add(-1 * offsetMilliseconds)
}

matchers, err := toLabelMatchers(n.LabelMatchers)
if err != nil {
return err
}

query := &prompb.Query{
StartTimestampMs: storage.TimeToPromTimestamp(start),
EndTimestampMs: storage.TimeToPromTimestamp(end),
Matchers: matchers,
}

req.Queries = append(req.Queries, query)

}
return nil
})
default:
var rErr *xhttp.ParseError
req, rErr = parseCompressedRequest(r)
if rErr != nil {
return nil, nil, rErr
}
}

timeout := opts.TimeoutOpts().FetchTimeout
Expand Down Expand Up @@ -330,3 +495,57 @@ func filterLabels(

return filtered
}

func tagsConvert(ts models.Tags) comparator.Tags {
tags := make(comparator.Tags, ts.Len())
for _, t := range ts.Tags {
tags[string(t.Name)] = string(t.Value)
}

return tags
}

func datapointsConvert(dps ts.Datapoints) comparator.Datapoints {
datapoints := make(comparator.Datapoints, 0, dps.Len())
for _, dp := range dps.Datapoints() {
val := comparator.Datapoint{
Value: comparator.Value(dp.Value),
Timestamp: dp.Timestamp,
}
datapoints = append(datapoints, val)
}

return datapoints
}

func toSeries(dps ts.Datapoints, tags models.Tags) comparator.Series {
return comparator.Series{
Tags: tagsConvert(tags),
Datapoints: datapointsConvert(dps),
}
}

func toLabelMatchers(matchers []*labels.Matcher) ([]*prompb.LabelMatcher, error) {
pbMatchers := make([]*prompb.LabelMatcher, 0, len(matchers))
for _, m := range matchers {
var mType prompb.LabelMatcher_Type
switch m.Type {
case labels.MatchEqual:
mType = prompb.LabelMatcher_EQ
case labels.MatchNotEqual:
mType = prompb.LabelMatcher_NEQ
case labels.MatchRegexp:
mType = prompb.LabelMatcher_RE
case labels.MatchNotRegexp:
mType = prompb.LabelMatcher_NRE
default:
return nil, errors.New("invalid matcher type")
}
pbMatchers = append(pbMatchers, &prompb.LabelMatcher{
Type: mType,
Name: []byte(m.Name),
Value: []byte(m.Value),
})
}
return pbMatchers, nil
}
2 changes: 1 addition & 1 deletion src/query/api/v1/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (h *Handler) RegisterRoutes() error {
nativePromReadHandler := native.NewPromReadHandler(nativeSourceOpts)
h.router.HandleFunc(remote.PromReadURL,
wrapped(promRemoteReadHandler).ServeHTTP,
).Methods(remote.PromReadHTTPMethod)
).Methods(remote.PromReadHTTPMethods...)
h.router.HandleFunc(remote.PromWriteURL,
panicOnly(promRemoteWriteHandler).ServeHTTP,
).Methods(remote.PromWriteHTTPMethod)
Expand Down