From 91aa6250b7cc6c2b56d81885b66e5e3730418217 Mon Sep 17 00:00:00 2001 From: Artem Date: Tue, 21 Apr 2020 20:22:18 -0400 Subject: [PATCH 1/3] [query] Add parse expression functionality. --- .../api/v1/handler/prometheus/native/read.go | 9 +- .../api/v1/handler/prometheus/remote/read.go | 173 ++++++++++-------- .../v1/handler/prometheus/remote/read_test.go | 71 +++++++ 3 files changed, 174 insertions(+), 79 deletions(-) diff --git a/src/query/api/v1/handler/prometheus/native/read.go b/src/query/api/v1/handler/prometheus/native/read.go index 572d344fb3..eac3abdc88 100644 --- a/src/query/api/v1/handler/prometheus/native/read.go +++ b/src/query/api/v1/handler/prometheus/native/read.go @@ -145,9 +145,16 @@ func (h *promReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - RenderResultsJSON(w, result, RenderResultsOptions{ + err = RenderResultsJSON(w, result, RenderResultsOptions{ Start: parsedOptions.Params.Start, End: parsedOptions.Params.End, KeepNaNs: h.opts.Config().ResultOptions.KeepNans, }) + + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + logger.Error("failed to render results", zap.Error(err)) + } else { + w.WriteHeader(http.StatusOK) + } } diff --git a/src/query/api/v1/handler/prometheus/remote/read.go b/src/query/api/v1/handler/prometheus/remote/read.go index b025719990..2c895be097 100644 --- a/src/query/api/v1/handler/prometheus/remote/read.go +++ b/src/query/api/v1/handler/prometheus/remote/read.go @@ -25,6 +25,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "net/http" "strings" "sync" @@ -259,103 +260,119 @@ type ReadResult struct { Result []*prompb.QueryResult } -// ParseRequest parses the compressed request -func ParseRequest( - ctx context.Context, - r *http.Request, - opts options.HandlerOptions, -) (*prompb.ReadRequest, *storage.FetchOptions, *xhttp.ParseError) { +// ParseExpr parses a prometheus request expression into the constituent +// fetches, rather than the full query application. +func ParseExpr(r *http.Request) (*prompb.ReadRequest, *xhttp.ParseError) { var req *prompb.ReadRequest - switch { - case r.Method == http.MethodGet && strings.TrimSpace(r.FormValue("expr")) != "": - exprParam := strings.TrimSpace(r.FormValue("expr")) - queryStart, err := util.ParseTimeString(r.FormValue("start")) - if err != nil { - return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest) - } + exprParam := strings.TrimSpace(r.FormValue("expr")) + if len(exprParam) == 0 { + return nil, xhttp.NewParseError( + fmt.Errorf("cannot parse params: no expr"), + http.StatusBadRequest) + } - queryEnd, err := util.ParseTimeString(r.FormValue("end")) - if err != nil { - return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest) - } + queryStart, err := util.ParseTimeString(r.FormValue("start")) + if err != nil { + return nil, xhttp.NewParseError(err, http.StatusBadRequest) + } + + queryEnd, err := util.ParseTimeString(r.FormValue("end")) + if err != nil { + return nil, xhttp.NewParseError(err, http.StatusBadRequest) + } - req = &prompb.ReadRequest{} + req = &prompb.ReadRequest{} + expr, err := promql.ParseExpr(exprParam) + if err != nil { + return nil, xhttp.NewParseError(err, http.StatusBadRequest) + } - expr, err := promql.ParseExpr(exprParam) - if err != nil { - return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest) - } + var ( + evalRange time.Duration + ) + promql.Inspect(expr, func(node promql.Node, path []promql.Node) error { var ( - evalRange time.Duration + start = queryStart + end = queryEnd ) - promql.Inspect(expr, func(node promql.Node, path []promql.Node) error { - var ( - start = queryStart - end = queryEnd - ) - - switch n := node.(type) { - case *promql.VectorSelector: - if evalRange > 0 { - start = start.Add(-1 * evalRange) - evalRange = 0 - } - - if n.Offset > 0 { - offsetMilliseconds := time.Duration(n.Offset) * time.Millisecond - start = start.Add(-1 * offsetMilliseconds) - end = end.Add(-1 * offsetMilliseconds) - } - matchers, err := toLabelMatchers(n.LabelMatchers) - if err != nil { - return err - } + switch n := node.(type) { + case *promql.VectorSelector: + if evalRange > 0 { + start = start.Add(-1 * evalRange) + evalRange = 0 + } - query := &prompb.Query{ - StartTimestampMs: storage.TimeToPromTimestamp(start), - EndTimestampMs: storage.TimeToPromTimestamp(end), - Matchers: matchers, - } + if n.Offset > 0 { + offset := time.Duration(n.Offset) + start = start.Add(-1 * offset) + end = end.Add(-1 * offset) + } - req.Queries = append(req.Queries, query) + matchers, err := toLabelMatchers(n.LabelMatchers) + if err != nil { + return err + } - case *promql.MatrixSelector: - evalRange = n.Range - if evalRange > 0 { - start = start.Add(-1 * evalRange) - evalRange = 0 - } + query := &prompb.Query{ + StartTimestampMs: storage.TimeToPromTimestamp(start), + EndTimestampMs: storage.TimeToPromTimestamp(end), + Matchers: matchers, + } - if n.Offset > 0 { - offsetMilliseconds := time.Duration(n.Offset) * time.Millisecond - start = start.Add(-1 * offsetMilliseconds) - end = end.Add(-1 * offsetMilliseconds) - } + req.Queries = append(req.Queries, query) - matchers, err := toLabelMatchers(n.LabelMatchers) - if err != nil { - return err - } + case *promql.MatrixSelector: + evalRange = n.Range + if evalRange > 0 { + start = start.Add(-1 * evalRange) + evalRange = 0 + } - query := &prompb.Query{ - StartTimestampMs: storage.TimeToPromTimestamp(start), - EndTimestampMs: storage.TimeToPromTimestamp(end), - Matchers: matchers, - } + if n.Offset > 0 { + offset := time.Duration(n.Offset) + start = start.Add(-1 * offset) + end = end.Add(-1 * offset) + } - req.Queries = append(req.Queries, query) + matchers, err := toLabelMatchers(n.LabelMatchers) + if err != nil { + return err + } + query := &prompb.Query{ + StartTimestampMs: storage.TimeToPromTimestamp(start), + EndTimestampMs: storage.TimeToPromTimestamp(end), + Matchers: matchers, } - return nil - }) + + req.Queries = append(req.Queries, query) + } + + return nil + }) + + return req, nil +} + +// ParseRequest parses the compressed request +func ParseRequest( + ctx context.Context, + r *http.Request, + opts options.HandlerOptions, +) (*prompb.ReadRequest, *storage.FetchOptions, *xhttp.ParseError) { + var req *prompb.ReadRequest + var rErr *xhttp.ParseError + switch { + case r.Method == http.MethodGet && strings.TrimSpace(r.FormValue("expr")) != "": + req, rErr = ParseExpr(r) default: - var rErr *xhttp.ParseError req, rErr = parseCompressedRequest(r) - if rErr != nil { - return nil, nil, rErr - } + } + + if rErr != nil { + return nil, nil, rErr } timeout := opts.TimeoutOpts().FetchTimeout diff --git a/src/query/api/v1/handler/prometheus/remote/read_test.go b/src/query/api/v1/handler/prometheus/remote/read_test.go index b9fe0614b5..7a9c1d4c67 100644 --- a/src/query/api/v1/handler/prometheus/remote/read_test.go +++ b/src/query/api/v1/handler/prometheus/remote/read_test.go @@ -21,10 +21,13 @@ package remote import ( + "bytes" "context" "fmt" + "io" "net/http" "net/http/httptest" + "net/url" "strings" "sync" "testing" @@ -64,6 +67,74 @@ var ( } ) +type testVals struct { + start time.Time + query string +} + +func buildBody(query string, start time.Time) io.Reader { + vals := url.Values{} + vals.Add("expr", query) + vals.Add("start", start.Format(time.RFC3339)) + vals.Add("end", start.Add(time.Hour).Format(time.RFC3339)) + qs := vals.Encode() + return bytes.NewBuffer([]byte(qs)) +} + +func TestParseExpr(t *testing.T) { + query := "" + + `up{a="b"} + 7 - sum(rate(down{c!="d"}[2m])) + ` + + `left{e=~"f"} offset 30m and right{g!~"h"} + ` + ` + max_over_time(foo[1m] offset 1h)` + + start := time.Now().Truncate(time.Hour) + req := httptest.NewRequest(http.MethodPost, "/", buildBody(query, start)) + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + readReq, err := ParseExpr(req) + require.NoError(t, err) + + q := func(start, end time.Time, matchers []*prompb.LabelMatcher) *prompb.Query { + return &prompb.Query{ + StartTimestampMs: start.Unix() * 1000, + EndTimestampMs: end.Unix() * 1000, + Matchers: matchers, + } + } + + b := func(s string) []byte { return []byte(s) } + expected := []*prompb.Query{ + q(start, start.Add(time.Hour), + []*prompb.LabelMatcher{ + {Name: b("a"), Value: b("b"), Type: prompb.LabelMatcher_EQ}, + {Name: b("__name__"), Value: b("up"), Type: prompb.LabelMatcher_EQ}}), + q(start.Add(time.Minute*-2), start.Add(time.Hour), + []*prompb.LabelMatcher{ + {Name: b("c"), Value: b("d"), Type: prompb.LabelMatcher_NEQ}, + {Name: b("__name__"), Value: b("down"), Type: prompb.LabelMatcher_EQ}}), + q(start.Add(time.Minute*-30), start.Add(time.Minute*30), + []*prompb.LabelMatcher{ + {Name: b("e"), Value: b("f"), Type: prompb.LabelMatcher_RE}, + {Name: b("__name__"), Value: b("left"), Type: prompb.LabelMatcher_EQ}}), + q(start, start.Add(time.Hour), + []*prompb.LabelMatcher{ + {Name: b("g"), Value: b("h"), Type: prompb.LabelMatcher_NRE}, + {Name: b("__name__"), Value: b("right"), Type: prompb.LabelMatcher_EQ}}), + q(start.Add(time.Minute*-61), start, + []*prompb.LabelMatcher{ + {Name: b("__name__"), Value: b("foo"), Type: prompb.LabelMatcher_EQ}}), + } + + for i, q := range expected { + fmt.Println(i, q.String()) + } + + for i, q := range readReq.Queries { + fmt.Println(i, q.String()) + } + + assert.Equal(t, expected, readReq.Queries) +} + func newEngine( s storage.Storage, lookbackDuration time.Duration, From 866e936ffb0ddd96ac0e2ae345b2cb78cc9fce8a Mon Sep 17 00:00:00 2001 From: Artem Date: Wed, 22 Apr 2020 23:56:25 -0400 Subject: [PATCH 2/3] PR response --- .../api/v1/handler/prometheus/remote/read.go | 84 +++++++------------ .../v1/handler/prometheus/remote/read_test.go | 10 +-- 2 files changed, 31 insertions(+), 63 deletions(-) diff --git a/src/query/api/v1/handler/prometheus/remote/read.go b/src/query/api/v1/handler/prometheus/remote/read.go index 2c895be097..8a5b711be5 100644 --- a/src/query/api/v1/handler/prometheus/remote/read.go +++ b/src/query/api/v1/handler/prometheus/remote/read.go @@ -264,7 +264,7 @@ type ReadResult struct { // fetches, rather than the full query application. func ParseExpr(r *http.Request) (*prompb.ReadRequest, *xhttp.ParseError) { var req *prompb.ReadRequest - exprParam := strings.TrimSpace(r.FormValue("expr")) + exprParam := strings.TrimSpace(r.FormValue("query")) if len(exprParam) == 0 { return nil, xhttp.NewParseError( fmt.Errorf("cannot parse params: no expr"), @@ -287,69 +287,45 @@ func ParseExpr(r *http.Request) (*prompb.ReadRequest, *xhttp.ParseError) { return nil, xhttp.NewParseError(err, http.StatusBadRequest) } - var ( - evalRange time.Duration - ) - promql.Inspect(expr, func(node promql.Node, path []promql.Node) error { var ( - start = queryStart - end = queryEnd + start = queryStart + end = queryEnd + offset time.Duration + labelMatchers []*labels.Matcher ) - switch n := node.(type) { - case *promql.VectorSelector: - if evalRange > 0 { - start = start.Add(-1 * evalRange) - evalRange = 0 - } - - if n.Offset > 0 { - offset := time.Duration(n.Offset) - start = start.Add(-1 * offset) - end = end.Add(-1 * offset) - } - - matchers, err := toLabelMatchers(n.LabelMatchers) - if err != nil { - return err - } - - query := &prompb.Query{ - StartTimestampMs: storage.TimeToPromTimestamp(start), - EndTimestampMs: storage.TimeToPromTimestamp(end), - Matchers: matchers, + if n, ok := node.(*promql.MatrixSelector); ok { + if n.Range > 0 { + start = start.Add(-1 * n.Range) } - req.Queries = append(req.Queries, query) - - case *promql.MatrixSelector: - evalRange = n.Range - if evalRange > 0 { - start = start.Add(-1 * evalRange) - evalRange = 0 - } - - if n.Offset > 0 { - offset := time.Duration(n.Offset) - start = start.Add(-1 * offset) - end = end.Add(-1 * offset) - } + offset = n.Offset + labelMatchers = n.LabelMatchers + } else if n, ok := node.(*promql.VectorSelector); ok { + offset = n.Offset + labelMatchers = n.LabelMatchers + } else { + return nil + } - matchers, err := toLabelMatchers(n.LabelMatchers) - if err != nil { - return err - } + if offset > 0 { + start = start.Add(-1 * offset) + end = end.Add(-1 * offset) + } - query := &prompb.Query{ - StartTimestampMs: storage.TimeToPromTimestamp(start), - EndTimestampMs: storage.TimeToPromTimestamp(end), - Matchers: matchers, - } + matchers, err := toLabelMatchers(labelMatchers) + if err != nil { + return err + } - req.Queries = append(req.Queries, query) + query := &prompb.Query{ + StartTimestampMs: storage.TimeToPromTimestamp(start), + EndTimestampMs: storage.TimeToPromTimestamp(end), + Matchers: matchers, } + req.Queries = append(req.Queries, query) return nil }) @@ -365,7 +341,7 @@ func ParseRequest( var req *prompb.ReadRequest var rErr *xhttp.ParseError switch { - case r.Method == http.MethodGet && strings.TrimSpace(r.FormValue("expr")) != "": + case r.Method == http.MethodGet && strings.TrimSpace(r.FormValue("query")) != "": req, rErr = ParseExpr(r) default: req, rErr = parseCompressedRequest(r) diff --git a/src/query/api/v1/handler/prometheus/remote/read_test.go b/src/query/api/v1/handler/prometheus/remote/read_test.go index 7a9c1d4c67..aa7b75cf43 100644 --- a/src/query/api/v1/handler/prometheus/remote/read_test.go +++ b/src/query/api/v1/handler/prometheus/remote/read_test.go @@ -74,7 +74,7 @@ type testVals struct { func buildBody(query string, start time.Time) io.Reader { vals := url.Values{} - vals.Add("expr", query) + vals.Add("query", query) vals.Add("start", start.Format(time.RFC3339)) vals.Add("end", start.Add(time.Hour).Format(time.RFC3339)) qs := vals.Encode() @@ -124,14 +124,6 @@ func TestParseExpr(t *testing.T) { {Name: b("__name__"), Value: b("foo"), Type: prompb.LabelMatcher_EQ}}), } - for i, q := range expected { - fmt.Println(i, q.String()) - } - - for i, q := range readReq.Queries { - fmt.Println(i, q.String()) - } - assert.Equal(t, expected, readReq.Queries) } From 5742109ec078e0c6169f439dfd49029a515d702c Mon Sep 17 00:00:00 2001 From: Artem Date: Thu, 23 Apr 2020 01:00:24 -0400 Subject: [PATCH 3/3] Add cancel to missing path --- src/query/api/v1/handler/prometheus/remote/read.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/query/api/v1/handler/prometheus/remote/read.go b/src/query/api/v1/handler/prometheus/remote/read.go index 8a5b711be5..159440613d 100644 --- a/src/query/api/v1/handler/prometheus/remote/read.go +++ b/src/query/api/v1/handler/prometheus/remote/read.go @@ -395,8 +395,12 @@ func Read( for i, promQuery := range r.Queries { i, promQuery := i, promQuery // Capture vars for lambda. go func() { - defer wg.Done() ctx, cancel := context.WithTimeout(ctx, fetchOpts.Timeout) + defer func() { + wg.Done() + cancel() + }() + cancelFuncs[i] = cancel query, err := storage.PromReadQueryToM3(promQuery) if err != nil {