diff --git a/src/query/api/v1/handler/prometheus/native/read.go b/src/query/api/v1/handler/prometheus/native/read.go index 572d344fb3..eac3abdc88 100644 --- a/src/query/api/v1/handler/prometheus/native/read.go +++ b/src/query/api/v1/handler/prometheus/native/read.go @@ -145,9 +145,16 @@ func (h *promReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - RenderResultsJSON(w, result, RenderResultsOptions{ + err = RenderResultsJSON(w, result, RenderResultsOptions{ Start: parsedOptions.Params.Start, End: parsedOptions.Params.End, KeepNaNs: h.opts.Config().ResultOptions.KeepNans, }) + + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + logger.Error("failed to render results", zap.Error(err)) + } else { + w.WriteHeader(http.StatusOK) + } } diff --git a/src/query/api/v1/handler/prometheus/remote/read.go b/src/query/api/v1/handler/prometheus/remote/read.go index b025719990..159440613d 100644 --- a/src/query/api/v1/handler/prometheus/remote/read.go +++ b/src/query/api/v1/handler/prometheus/remote/read.go @@ -25,6 +25,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "net/http" "strings" "sync" @@ -259,103 +260,95 @@ type ReadResult struct { Result []*prompb.QueryResult } -// ParseRequest parses the compressed request -func ParseRequest( - ctx context.Context, - r *http.Request, - opts options.HandlerOptions, -) (*prompb.ReadRequest, *storage.FetchOptions, *xhttp.ParseError) { +// ParseExpr parses a prometheus request expression into the constituent +// fetches, rather than the full query application. +func ParseExpr(r *http.Request) (*prompb.ReadRequest, *xhttp.ParseError) { var req *prompb.ReadRequest - switch { - case r.Method == http.MethodGet && strings.TrimSpace(r.FormValue("expr")) != "": - exprParam := strings.TrimSpace(r.FormValue("expr")) - queryStart, err := util.ParseTimeString(r.FormValue("start")) - if err != nil { - return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest) - } + exprParam := strings.TrimSpace(r.FormValue("query")) + if len(exprParam) == 0 { + return nil, xhttp.NewParseError( + fmt.Errorf("cannot parse params: no expr"), + http.StatusBadRequest) + } - queryEnd, err := util.ParseTimeString(r.FormValue("end")) - if err != nil { - return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest) - } + queryStart, err := util.ParseTimeString(r.FormValue("start")) + if err != nil { + return nil, xhttp.NewParseError(err, http.StatusBadRequest) + } - req = &prompb.ReadRequest{} + queryEnd, err := util.ParseTimeString(r.FormValue("end")) + if err != nil { + return nil, xhttp.NewParseError(err, http.StatusBadRequest) + } - expr, err := promql.ParseExpr(exprParam) - if err != nil { - return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest) - } + req = &prompb.ReadRequest{} + expr, err := promql.ParseExpr(exprParam) + if err != nil { + return nil, xhttp.NewParseError(err, http.StatusBadRequest) + } + promql.Inspect(expr, func(node promql.Node, path []promql.Node) error { var ( - evalRange time.Duration + start = queryStart + end = queryEnd + offset time.Duration + labelMatchers []*labels.Matcher ) - promql.Inspect(expr, func(node promql.Node, path []promql.Node) error { - var ( - start = queryStart - end = queryEnd - ) - - switch n := node.(type) { - case *promql.VectorSelector: - if evalRange > 0 { - start = start.Add(-1 * evalRange) - evalRange = 0 - } - - if n.Offset > 0 { - offsetMilliseconds := time.Duration(n.Offset) * time.Millisecond - start = start.Add(-1 * offsetMilliseconds) - end = end.Add(-1 * offsetMilliseconds) - } - matchers, err := toLabelMatchers(n.LabelMatchers) - if err != nil { - return err - } - - query := &prompb.Query{ - StartTimestampMs: storage.TimeToPromTimestamp(start), - EndTimestampMs: storage.TimeToPromTimestamp(end), - Matchers: matchers, - } + if n, ok := node.(*promql.MatrixSelector); ok { + if n.Range > 0 { + start = start.Add(-1 * n.Range) + } - req.Queries = append(req.Queries, query) + offset = n.Offset + labelMatchers = n.LabelMatchers + } else if n, ok := node.(*promql.VectorSelector); ok { + offset = n.Offset + labelMatchers = n.LabelMatchers + } else { + return nil + } - case *promql.MatrixSelector: - evalRange = n.Range - if evalRange > 0 { - start = start.Add(-1 * evalRange) - evalRange = 0 - } + if offset > 0 { + start = start.Add(-1 * offset) + end = end.Add(-1 * offset) + } - if n.Offset > 0 { - offsetMilliseconds := time.Duration(n.Offset) * time.Millisecond - start = start.Add(-1 * offsetMilliseconds) - end = end.Add(-1 * offsetMilliseconds) - } + matchers, err := toLabelMatchers(labelMatchers) + if err != nil { + return err + } - matchers, err := toLabelMatchers(n.LabelMatchers) - if err != nil { - return err - } + query := &prompb.Query{ + StartTimestampMs: storage.TimeToPromTimestamp(start), + EndTimestampMs: storage.TimeToPromTimestamp(end), + Matchers: matchers, + } - query := &prompb.Query{ - StartTimestampMs: storage.TimeToPromTimestamp(start), - EndTimestampMs: storage.TimeToPromTimestamp(end), - Matchers: matchers, - } + req.Queries = append(req.Queries, query) + return nil + }) - req.Queries = append(req.Queries, query) + return req, nil +} - } - return nil - }) +// ParseRequest parses the compressed request +func ParseRequest( + ctx context.Context, + r *http.Request, + opts options.HandlerOptions, +) (*prompb.ReadRequest, *storage.FetchOptions, *xhttp.ParseError) { + var req *prompb.ReadRequest + var rErr *xhttp.ParseError + switch { + case r.Method == http.MethodGet && strings.TrimSpace(r.FormValue("query")) != "": + req, rErr = ParseExpr(r) default: - var rErr *xhttp.ParseError req, rErr = parseCompressedRequest(r) - if rErr != nil { - return nil, nil, rErr - } + } + + if rErr != nil { + return nil, nil, rErr } timeout := opts.TimeoutOpts().FetchTimeout @@ -402,8 +395,12 @@ func Read( for i, promQuery := range r.Queries { i, promQuery := i, promQuery // Capture vars for lambda. go func() { - defer wg.Done() ctx, cancel := context.WithTimeout(ctx, fetchOpts.Timeout) + defer func() { + wg.Done() + cancel() + }() + cancelFuncs[i] = cancel query, err := storage.PromReadQueryToM3(promQuery) if err != nil { diff --git a/src/query/api/v1/handler/prometheus/remote/read_test.go b/src/query/api/v1/handler/prometheus/remote/read_test.go index b9fe0614b5..aa7b75cf43 100644 --- a/src/query/api/v1/handler/prometheus/remote/read_test.go +++ b/src/query/api/v1/handler/prometheus/remote/read_test.go @@ -21,10 +21,13 @@ package remote import ( + "bytes" "context" "fmt" + "io" "net/http" "net/http/httptest" + "net/url" "strings" "sync" "testing" @@ -64,6 +67,66 @@ var ( } ) +type testVals struct { + start time.Time + query string +} + +func buildBody(query string, start time.Time) io.Reader { + vals := url.Values{} + vals.Add("query", query) + vals.Add("start", start.Format(time.RFC3339)) + vals.Add("end", start.Add(time.Hour).Format(time.RFC3339)) + qs := vals.Encode() + return bytes.NewBuffer([]byte(qs)) +} + +func TestParseExpr(t *testing.T) { + query := "" + + `up{a="b"} + 7 - sum(rate(down{c!="d"}[2m])) + ` + + `left{e=~"f"} offset 30m and right{g!~"h"} + ` + ` + max_over_time(foo[1m] offset 1h)` + + start := time.Now().Truncate(time.Hour) + req := httptest.NewRequest(http.MethodPost, "/", buildBody(query, start)) + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + readReq, err := ParseExpr(req) + require.NoError(t, err) + + q := func(start, end time.Time, matchers []*prompb.LabelMatcher) *prompb.Query { + return &prompb.Query{ + StartTimestampMs: start.Unix() * 1000, + EndTimestampMs: end.Unix() * 1000, + Matchers: matchers, + } + } + + b := func(s string) []byte { return []byte(s) } + expected := []*prompb.Query{ + q(start, start.Add(time.Hour), + []*prompb.LabelMatcher{ + {Name: b("a"), Value: b("b"), Type: prompb.LabelMatcher_EQ}, + {Name: b("__name__"), Value: b("up"), Type: prompb.LabelMatcher_EQ}}), + q(start.Add(time.Minute*-2), start.Add(time.Hour), + []*prompb.LabelMatcher{ + {Name: b("c"), Value: b("d"), Type: prompb.LabelMatcher_NEQ}, + {Name: b("__name__"), Value: b("down"), Type: prompb.LabelMatcher_EQ}}), + q(start.Add(time.Minute*-30), start.Add(time.Minute*30), + []*prompb.LabelMatcher{ + {Name: b("e"), Value: b("f"), Type: prompb.LabelMatcher_RE}, + {Name: b("__name__"), Value: b("left"), Type: prompb.LabelMatcher_EQ}}), + q(start, start.Add(time.Hour), + []*prompb.LabelMatcher{ + {Name: b("g"), Value: b("h"), Type: prompb.LabelMatcher_NRE}, + {Name: b("__name__"), Value: b("right"), Type: prompb.LabelMatcher_EQ}}), + q(start.Add(time.Minute*-61), start, + []*prompb.LabelMatcher{ + {Name: b("__name__"), Value: b("foo"), Type: prompb.LabelMatcher_EQ}}), + } + + assert.Equal(t, expected, readReq.Queries) +} + func newEngine( s storage.Storage, lookbackDuration time.Duration,