Skip to content

Commit

Permalink
[query] Add parse expression functionality (#2278)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola authored Apr 23, 2020
1 parent 167a52a commit 4ef0a70
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 82 deletions.
9 changes: 8 additions & 1 deletion src/query/api/v1/handler/prometheus/native/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,16 @@ func (h *promReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

RenderResultsJSON(w, result, RenderResultsOptions{
err = RenderResultsJSON(w, result, RenderResultsOptions{
Start: parsedOptions.Params.Start,
End: parsedOptions.Params.End,
KeepNaNs: h.opts.Config().ResultOptions.KeepNans,
})

if err != nil {
w.WriteHeader(http.StatusInternalServerError)
logger.Error("failed to render results", zap.Error(err))
} else {
w.WriteHeader(http.StatusOK)
}
}
159 changes: 78 additions & 81 deletions src/query/api/v1/handler/prometheus/remote/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -259,103 +260,95 @@ type ReadResult struct {
Result []*prompb.QueryResult
}

// ParseRequest parses the compressed request
func ParseRequest(
ctx context.Context,
r *http.Request,
opts options.HandlerOptions,
) (*prompb.ReadRequest, *storage.FetchOptions, *xhttp.ParseError) {
// ParseExpr parses a prometheus request expression into the constituent
// fetches, rather than the full query application.
func ParseExpr(r *http.Request) (*prompb.ReadRequest, *xhttp.ParseError) {
var req *prompb.ReadRequest
switch {
case r.Method == http.MethodGet && strings.TrimSpace(r.FormValue("expr")) != "":
exprParam := strings.TrimSpace(r.FormValue("expr"))
queryStart, err := util.ParseTimeString(r.FormValue("start"))
if err != nil {
return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest)
}
exprParam := strings.TrimSpace(r.FormValue("query"))
if len(exprParam) == 0 {
return nil, xhttp.NewParseError(
fmt.Errorf("cannot parse params: no expr"),
http.StatusBadRequest)
}

queryEnd, err := util.ParseTimeString(r.FormValue("end"))
if err != nil {
return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest)
}
queryStart, err := util.ParseTimeString(r.FormValue("start"))
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

req = &prompb.ReadRequest{}
queryEnd, err := util.ParseTimeString(r.FormValue("end"))
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

expr, err := promql.ParseExpr(exprParam)
if err != nil {
return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest)
}
req = &prompb.ReadRequest{}
expr, err := promql.ParseExpr(exprParam)
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

promql.Inspect(expr, func(node promql.Node, path []promql.Node) error {
var (
evalRange time.Duration
start = queryStart
end = queryEnd
offset time.Duration
labelMatchers []*labels.Matcher
)
promql.Inspect(expr, func(node promql.Node, path []promql.Node) error {
var (
start = queryStart
end = queryEnd
)

switch n := node.(type) {
case *promql.VectorSelector:
if evalRange > 0 {
start = start.Add(-1 * evalRange)
evalRange = 0
}

if n.Offset > 0 {
offsetMilliseconds := time.Duration(n.Offset) * time.Millisecond
start = start.Add(-1 * offsetMilliseconds)
end = end.Add(-1 * offsetMilliseconds)
}

matchers, err := toLabelMatchers(n.LabelMatchers)
if err != nil {
return err
}

query := &prompb.Query{
StartTimestampMs: storage.TimeToPromTimestamp(start),
EndTimestampMs: storage.TimeToPromTimestamp(end),
Matchers: matchers,
}
if n, ok := node.(*promql.MatrixSelector); ok {
if n.Range > 0 {
start = start.Add(-1 * n.Range)
}

req.Queries = append(req.Queries, query)
offset = n.Offset
labelMatchers = n.LabelMatchers
} else if n, ok := node.(*promql.VectorSelector); ok {
offset = n.Offset
labelMatchers = n.LabelMatchers
} else {
return nil
}

case *promql.MatrixSelector:
evalRange = n.Range
if evalRange > 0 {
start = start.Add(-1 * evalRange)
evalRange = 0
}
if offset > 0 {
start = start.Add(-1 * offset)
end = end.Add(-1 * offset)
}

if n.Offset > 0 {
offsetMilliseconds := time.Duration(n.Offset) * time.Millisecond
start = start.Add(-1 * offsetMilliseconds)
end = end.Add(-1 * offsetMilliseconds)
}
matchers, err := toLabelMatchers(labelMatchers)
if err != nil {
return err
}

matchers, err := toLabelMatchers(n.LabelMatchers)
if err != nil {
return err
}
query := &prompb.Query{
StartTimestampMs: storage.TimeToPromTimestamp(start),
EndTimestampMs: storage.TimeToPromTimestamp(end),
Matchers: matchers,
}

query := &prompb.Query{
StartTimestampMs: storage.TimeToPromTimestamp(start),
EndTimestampMs: storage.TimeToPromTimestamp(end),
Matchers: matchers,
}
req.Queries = append(req.Queries, query)
return nil
})

req.Queries = append(req.Queries, query)
return req, nil
}

}
return nil
})
// ParseRequest parses the compressed request
func ParseRequest(
ctx context.Context,
r *http.Request,
opts options.HandlerOptions,
) (*prompb.ReadRequest, *storage.FetchOptions, *xhttp.ParseError) {
var req *prompb.ReadRequest
var rErr *xhttp.ParseError
switch {
case r.Method == http.MethodGet && strings.TrimSpace(r.FormValue("query")) != "":
req, rErr = ParseExpr(r)
default:
var rErr *xhttp.ParseError
req, rErr = parseCompressedRequest(r)
if rErr != nil {
return nil, nil, rErr
}
}

if rErr != nil {
return nil, nil, rErr
}

timeout := opts.TimeoutOpts().FetchTimeout
Expand Down Expand Up @@ -402,8 +395,12 @@ func Read(
for i, promQuery := range r.Queries {
i, promQuery := i, promQuery // Capture vars for lambda.
go func() {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, fetchOpts.Timeout)
defer func() {
wg.Done()
cancel()
}()

cancelFuncs[i] = cancel
query, err := storage.PromReadQueryToM3(promQuery)
if err != nil {
Expand Down
63 changes: 63 additions & 0 deletions src/query/api/v1/handler/prometheus/remote/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
package remote

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -64,6 +67,66 @@ var (
}
)

type testVals struct {
start time.Time
query string
}

func buildBody(query string, start time.Time) io.Reader {
vals := url.Values{}
vals.Add("query", query)
vals.Add("start", start.Format(time.RFC3339))
vals.Add("end", start.Add(time.Hour).Format(time.RFC3339))
qs := vals.Encode()
return bytes.NewBuffer([]byte(qs))
}

func TestParseExpr(t *testing.T) {
query := "" +
`up{a="b"} + 7 - sum(rate(down{c!="d"}[2m])) + ` +
`left{e=~"f"} offset 30m and right{g!~"h"} + ` + `
max_over_time(foo[1m] offset 1h)`

start := time.Now().Truncate(time.Hour)
req := httptest.NewRequest(http.MethodPost, "/", buildBody(query, start))
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
readReq, err := ParseExpr(req)
require.NoError(t, err)

q := func(start, end time.Time, matchers []*prompb.LabelMatcher) *prompb.Query {
return &prompb.Query{
StartTimestampMs: start.Unix() * 1000,
EndTimestampMs: end.Unix() * 1000,
Matchers: matchers,
}
}

b := func(s string) []byte { return []byte(s) }
expected := []*prompb.Query{
q(start, start.Add(time.Hour),
[]*prompb.LabelMatcher{
{Name: b("a"), Value: b("b"), Type: prompb.LabelMatcher_EQ},
{Name: b("__name__"), Value: b("up"), Type: prompb.LabelMatcher_EQ}}),
q(start.Add(time.Minute*-2), start.Add(time.Hour),
[]*prompb.LabelMatcher{
{Name: b("c"), Value: b("d"), Type: prompb.LabelMatcher_NEQ},
{Name: b("__name__"), Value: b("down"), Type: prompb.LabelMatcher_EQ}}),
q(start.Add(time.Minute*-30), start.Add(time.Minute*30),
[]*prompb.LabelMatcher{
{Name: b("e"), Value: b("f"), Type: prompb.LabelMatcher_RE},
{Name: b("__name__"), Value: b("left"), Type: prompb.LabelMatcher_EQ}}),
q(start, start.Add(time.Hour),
[]*prompb.LabelMatcher{
{Name: b("g"), Value: b("h"), Type: prompb.LabelMatcher_NRE},
{Name: b("__name__"), Value: b("right"), Type: prompb.LabelMatcher_EQ}}),
q(start.Add(time.Minute*-61), start,
[]*prompb.LabelMatcher{
{Name: b("__name__"), Value: b("foo"), Type: prompb.LabelMatcher_EQ}}),
}

assert.Equal(t, expected, readReq.Queries)
}

func newEngine(
s storage.Storage,
lookbackDuration time.Duration,
Expand Down

0 comments on commit 4ef0a70

Please sign in to comment.