diff --git a/src/query/api/v1/handler/prometheus/native/common.go b/src/query/api/v1/handler/prometheus/native/common.go index 4ece65135a..9d75046caf 100644 --- a/src/query/api/v1/handler/prometheus/native/common.go +++ b/src/query/api/v1/handler/prometheus/native/common.go @@ -289,11 +289,18 @@ func filterNaNSeries( return filtered } -func renderResultsJSON( +// RenderResultsOptions is a set of options for rendering the result. +type RenderResultsOptions struct { + Start time.Time + End time.Time + KeepNaNs bool +} + +// RenderResultsJSON renders results in JSON for range queries. +func RenderResultsJSON( w io.Writer, result ReadResult, - params models.RequestParams, - keepNans bool, + opts RenderResultsOptions, ) { var ( series = result.Series @@ -301,8 +308,8 @@ func renderResultsJSON( ) // NB: if dropping NaNs, drop series with only NaNs from output entirely. - if !keepNans { - series = filterNaNSeries(series, params.Start, params.End) + if !opts.KeepNaNs { + series = filterNaNSeries(series, opts.Start, opts.End) } jw := json.NewWriter(w) @@ -347,7 +354,7 @@ func renderResultsJSON( dp := vals.DatapointAt(i) // If keepNaNs is set to false and the value is NaN, drop it from the response. - if !keepNans && math.IsNaN(dp.Value) { + if !opts.KeepNaNs && math.IsNaN(dp.Value) { continue } @@ -355,7 +362,7 @@ func renderResultsJSON( // would be at the result node but that would make it inefficient since // we would need to create another block just for the sake of restricting // the bounds. - if dp.Timestamp.Before(params.Start) { + if dp.Timestamp.Before(opts.Start) { continue } @@ -380,7 +387,8 @@ func renderResultsJSON( jw.Close() } -func renderResultsInstantaneousJSON( +// RenderResultsInstantaneousJSON renders results in JSON for instant queries. +func RenderResultsInstantaneousJSON( w io.Writer, result ReadResult, ) { diff --git a/src/query/api/v1/handler/prometheus/native/read.go b/src/query/api/v1/handler/prometheus/native/read.go index b913b937e7..45429f1b08 100644 --- a/src/query/api/v1/handler/prometheus/native/read.go +++ b/src/query/api/v1/handler/prometheus/native/read.go @@ -165,7 +165,11 @@ func (h *PromReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.promReadMetrics.fetchSuccess.Inc(1) keepNans := h.opts.Config().ResultOptions.KeepNans // TODO: Support multiple result types - renderResultsJSON(w, result, parsed.params, keepNans) + RenderResultsJSON(w, result, RenderResultsOptions{ + Start: parsed.params.Start, + End: parsed.params.End, + KeepNaNs: keepNans, + }) } type parsed struct { diff --git a/src/query/api/v1/handler/prometheus/native/read_instantaneous.go b/src/query/api/v1/handler/prometheus/native/read_instantaneous.go index 149681d321..55a00f431b 100644 --- a/src/query/api/v1/handler/prometheus/native/read_instantaneous.go +++ b/src/query/api/v1/handler/prometheus/native/read_instantaneous.go @@ -129,5 +129,5 @@ func (h *PromReadInstantHandler) ServeHTTP( // TODO: Support multiple result types w.Header().Set("Content-Type", "application/json") handleroptions.AddWarningHeaders(w, result.Meta) - renderResultsInstantaneousJSON(w, result) + RenderResultsInstantaneousJSON(w, result) } diff --git a/src/query/api/v1/handler/prometheus/remote/read.go b/src/query/api/v1/handler/prometheus/remote/read.go index 68043dcd74..0fe8ff9b33 100644 --- a/src/query/api/v1/handler/prometheus/remote/read.go +++ b/src/query/api/v1/handler/prometheus/remote/read.go @@ -23,9 +23,14 @@ package remote import ( "bytes" "context" + "encoding/json" + "errors" "net/http" + "strings" "sync" + "time" + comparator "github.com/m3db/m3/src/cmd/services/m3comparator/main/parser" "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/api/v1/handler/prometheus" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" @@ -35,12 +40,16 @@ import ( "github.com/m3db/m3/src/query/generated/proto/prompb" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" + "github.com/m3db/m3/src/query/ts" + "github.com/m3db/m3/src/query/util" "github.com/m3db/m3/src/query/util/logging" xerrors "github.com/m3db/m3/src/x/errors" xhttp "github.com/m3db/m3/src/x/net/http" "github.com/golang/protobuf/proto" "github.com/golang/snappy" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/promql" "github.com/uber-go/tally" "go.uber.org/zap" ) @@ -48,9 +57,11 @@ import ( const ( // PromReadURL is the url for remote prom read handler PromReadURL = handler.RoutePrefixV1 + "/prom/remote/read" +) - // PromReadHTTPMethod is the HTTP method used with this resource. - PromReadHTTPMethod = http.MethodPost +var ( + // PromReadHTTPMethods are the HTTP methods used with this resource. + PromReadHTTPMethods = []string{http.MethodPost, http.MethodGet} ) // promReadHandler is a handler for the prometheus remote read endpoint. @@ -121,7 +132,52 @@ func (h *promReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // NB: if this errors, all relevant headers and information should already // be sent to the writer; so it is not necessary to do anything here other // than increment success/failure metrics. - err = WriteSnappyCompressed(w, readResult, logger) + switch r.FormValue("format") { + case "json": + result := readResultsJSON{ + Queries: make([]queryResultsJSON, 0, len(req.Queries)), + } + for i, q := range req.Queries { + start := storage.PromTimestampToTime(q.StartTimestampMs) + end := storage.PromTimestampToTime(q.EndTimestampMs) + + all := readResult.Result[i].Timeseries + timeseries := make([]comparator.Series, 0, len(all)) + for _, s := range all { + datapoints := storage.PromSamplesToM3Datapoints(s.Samples) + tags := storage.PromLabelsToM3Tags(s.Labels, h.opts.TagOptions()) + series := toSeries(datapoints, tags) + series.Start = start + series.End = end + timeseries = append(timeseries, series) + } + + matchers := make([]labelMatcherJSON, 0, len(q.Matchers)) + for _, m := range q.Matchers { + matcher := labelMatcherJSON{ + Type: m.Type.String(), + Name: string(m.Name), + Value: string(m.Value), + } + matchers = append(matchers, matcher) + } + + result.Queries = append(result.Queries, queryResultsJSON{ + Query: queryJSON{ + Matchers: matchers, + }, + Start: start, + End: end, + Series: timeseries, + }) + } + + w.Header().Set("Content-Type", "application/json") + err = json.NewEncoder(w).Encode(result) + default: + err = WriteSnappyCompressed(w, readResult, logger) + } + if err != nil { h.promReadMetrics.fetchErrorsServer.Inc(1) } else { @@ -129,6 +185,27 @@ func (h *promReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } +type readResultsJSON struct { + Queries []queryResultsJSON `json:"queries"` +} + +type queryResultsJSON struct { + Query queryJSON `json:"query"` + Start time.Time `json:"start"` + End time.Time `json:"end"` + Series []comparator.Series `json:"series"` +} + +type queryJSON struct { + Matchers []labelMatcherJSON `json:"matchers"` +} + +type labelMatcherJSON struct { + Type string `json:"type"` + Name string `json:"name"` + Value string `json:"value"` +} + // WriteSnappyCompressed writes snappy compressed results to the given writer. func WriteSnappyCompressed( w http.ResponseWriter, @@ -188,9 +265,97 @@ func parseRequest( r *http.Request, opts options.HandlerOptions, ) (*prompb.ReadRequest, *storage.FetchOptions, *xhttp.ParseError) { - req, rErr := parseCompressedRequest(r) - if rErr != nil { - return nil, nil, rErr + var req *prompb.ReadRequest + switch { + case r.Method == http.MethodGet && strings.TrimSpace(r.FormValue("expr")) != "": + exprParam := strings.TrimSpace(r.FormValue("expr")) + queryStart, err := util.ParseTimeString(r.FormValue("start")) + if err != nil { + return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest) + } + + queryEnd, err := util.ParseTimeString(r.FormValue("end")) + if err != nil { + return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest) + } + + req = &prompb.ReadRequest{} + + expr, err := promql.ParseExpr(exprParam) + if err != nil { + return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest) + } + + var ( + evalRange time.Duration + ) + promql.Inspect(expr, func(node promql.Node, path []promql.Node) error { + var ( + start = queryStart + end = queryEnd + ) + + switch n := node.(type) { + case *promql.VectorSelector: + if evalRange > 0 { + start = start.Add(-1 * evalRange) + evalRange = 0 + } + + if n.Offset > 0 { + offsetMilliseconds := time.Duration(n.Offset) * time.Millisecond + start = start.Add(-1 * offsetMilliseconds) + end = end.Add(-1 * offsetMilliseconds) + } + + matchers, err := toLabelMatchers(n.LabelMatchers) + if err != nil { + return err + } + + query := &prompb.Query{ + StartTimestampMs: storage.TimeToPromTimestamp(start), + EndTimestampMs: storage.TimeToPromTimestamp(end), + Matchers: matchers, + } + + req.Queries = append(req.Queries, query) + + case *promql.MatrixSelector: + evalRange = n.Range + if evalRange > 0 { + start = start.Add(-1 * evalRange) + evalRange = 0 + } + + if n.Offset > 0 { + offsetMilliseconds := time.Duration(n.Offset) * time.Millisecond + start = start.Add(-1 * offsetMilliseconds) + end = end.Add(-1 * offsetMilliseconds) + } + + matchers, err := toLabelMatchers(n.LabelMatchers) + if err != nil { + return err + } + + query := &prompb.Query{ + StartTimestampMs: storage.TimeToPromTimestamp(start), + EndTimestampMs: storage.TimeToPromTimestamp(end), + Matchers: matchers, + } + + req.Queries = append(req.Queries, query) + + } + return nil + }) + default: + var rErr *xhttp.ParseError + req, rErr = parseCompressedRequest(r) + if rErr != nil { + return nil, nil, rErr + } } timeout := opts.TimeoutOpts().FetchTimeout @@ -330,3 +495,57 @@ func filterLabels( return filtered } + +func tagsConvert(ts models.Tags) comparator.Tags { + tags := make(comparator.Tags, ts.Len()) + for _, t := range ts.Tags { + tags[string(t.Name)] = string(t.Value) + } + + return tags +} + +func datapointsConvert(dps ts.Datapoints) comparator.Datapoints { + datapoints := make(comparator.Datapoints, 0, dps.Len()) + for _, dp := range dps.Datapoints() { + val := comparator.Datapoint{ + Value: comparator.Value(dp.Value), + Timestamp: dp.Timestamp, + } + datapoints = append(datapoints, val) + } + + return datapoints +} + +func toSeries(dps ts.Datapoints, tags models.Tags) comparator.Series { + return comparator.Series{ + Tags: tagsConvert(tags), + Datapoints: datapointsConvert(dps), + } +} + +func toLabelMatchers(matchers []*labels.Matcher) ([]*prompb.LabelMatcher, error) { + pbMatchers := make([]*prompb.LabelMatcher, 0, len(matchers)) + for _, m := range matchers { + var mType prompb.LabelMatcher_Type + switch m.Type { + case labels.MatchEqual: + mType = prompb.LabelMatcher_EQ + case labels.MatchNotEqual: + mType = prompb.LabelMatcher_NEQ + case labels.MatchRegexp: + mType = prompb.LabelMatcher_RE + case labels.MatchNotRegexp: + mType = prompb.LabelMatcher_NRE + default: + return nil, errors.New("invalid matcher type") + } + pbMatchers = append(pbMatchers, &prompb.LabelMatcher{ + Type: mType, + Name: []byte(m.Name), + Value: []byte(m.Value), + }) + } + return pbMatchers, nil +} diff --git a/src/query/api/v1/httpd/handler.go b/src/query/api/v1/httpd/handler.go index 6a37244a32..a54fa1ec6a 100644 --- a/src/query/api/v1/httpd/handler.go +++ b/src/query/api/v1/httpd/handler.go @@ -159,7 +159,7 @@ func (h *Handler) RegisterRoutes() error { nativePromReadHandler := native.NewPromReadHandler(nativeSourceOpts) h.router.HandleFunc(remote.PromReadURL, wrapped(promRemoteReadHandler).ServeHTTP, - ).Methods(remote.PromReadHTTPMethod) + ).Methods(remote.PromReadHTTPMethods...) h.router.HandleFunc(remote.PromWriteURL, panicOnly(promRemoteWriteHandler).ServeHTTP, ).Methods(remote.PromWriteHTTPMethod)