Skip to content

Commit

Permalink
[query] Add parse expression functionality.
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola committed Apr 23, 2020
1 parent 7fd2abb commit 91aa625
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 79 deletions.
9 changes: 8 additions & 1 deletion src/query/api/v1/handler/prometheus/native/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,16 @@ func (h *promReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

RenderResultsJSON(w, result, RenderResultsOptions{
err = RenderResultsJSON(w, result, RenderResultsOptions{
Start: parsedOptions.Params.Start,
End: parsedOptions.Params.End,
KeepNaNs: h.opts.Config().ResultOptions.KeepNans,
})

if err != nil {
w.WriteHeader(http.StatusInternalServerError)
logger.Error("failed to render results", zap.Error(err))
} else {
w.WriteHeader(http.StatusOK)
}
}
173 changes: 95 additions & 78 deletions src/query/api/v1/handler/prometheus/remote/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -259,103 +260,119 @@ type ReadResult struct {
Result []*prompb.QueryResult
}

// ParseRequest parses the compressed request
func ParseRequest(
ctx context.Context,
r *http.Request,
opts options.HandlerOptions,
) (*prompb.ReadRequest, *storage.FetchOptions, *xhttp.ParseError) {
// ParseExpr parses a prometheus request expression into the constituent
// fetches, rather than the full query application.
func ParseExpr(r *http.Request) (*prompb.ReadRequest, *xhttp.ParseError) {
var req *prompb.ReadRequest
switch {
case r.Method == http.MethodGet && strings.TrimSpace(r.FormValue("expr")) != "":
exprParam := strings.TrimSpace(r.FormValue("expr"))
queryStart, err := util.ParseTimeString(r.FormValue("start"))
if err != nil {
return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest)
}
exprParam := strings.TrimSpace(r.FormValue("expr"))
if len(exprParam) == 0 {
return nil, xhttp.NewParseError(
fmt.Errorf("cannot parse params: no expr"),
http.StatusBadRequest)
}

queryEnd, err := util.ParseTimeString(r.FormValue("end"))
if err != nil {
return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest)
}
queryStart, err := util.ParseTimeString(r.FormValue("start"))
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

queryEnd, err := util.ParseTimeString(r.FormValue("end"))
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

req = &prompb.ReadRequest{}
req = &prompb.ReadRequest{}
expr, err := promql.ParseExpr(exprParam)
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

expr, err := promql.ParseExpr(exprParam)
if err != nil {
return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest)
}
var (
evalRange time.Duration
)

promql.Inspect(expr, func(node promql.Node, path []promql.Node) error {
var (
evalRange time.Duration
start = queryStart
end = queryEnd
)
promql.Inspect(expr, func(node promql.Node, path []promql.Node) error {
var (
start = queryStart
end = queryEnd
)

switch n := node.(type) {
case *promql.VectorSelector:
if evalRange > 0 {
start = start.Add(-1 * evalRange)
evalRange = 0
}

if n.Offset > 0 {
offsetMilliseconds := time.Duration(n.Offset) * time.Millisecond
start = start.Add(-1 * offsetMilliseconds)
end = end.Add(-1 * offsetMilliseconds)
}

matchers, err := toLabelMatchers(n.LabelMatchers)
if err != nil {
return err
}
switch n := node.(type) {
case *promql.VectorSelector:
if evalRange > 0 {
start = start.Add(-1 * evalRange)
evalRange = 0
}

query := &prompb.Query{
StartTimestampMs: storage.TimeToPromTimestamp(start),
EndTimestampMs: storage.TimeToPromTimestamp(end),
Matchers: matchers,
}
if n.Offset > 0 {
offset := time.Duration(n.Offset)
start = start.Add(-1 * offset)
end = end.Add(-1 * offset)
}

req.Queries = append(req.Queries, query)
matchers, err := toLabelMatchers(n.LabelMatchers)
if err != nil {
return err
}

case *promql.MatrixSelector:
evalRange = n.Range
if evalRange > 0 {
start = start.Add(-1 * evalRange)
evalRange = 0
}
query := &prompb.Query{
StartTimestampMs: storage.TimeToPromTimestamp(start),
EndTimestampMs: storage.TimeToPromTimestamp(end),
Matchers: matchers,
}

if n.Offset > 0 {
offsetMilliseconds := time.Duration(n.Offset) * time.Millisecond
start = start.Add(-1 * offsetMilliseconds)
end = end.Add(-1 * offsetMilliseconds)
}
req.Queries = append(req.Queries, query)

matchers, err := toLabelMatchers(n.LabelMatchers)
if err != nil {
return err
}
case *promql.MatrixSelector:
evalRange = n.Range
if evalRange > 0 {
start = start.Add(-1 * evalRange)
evalRange = 0
}

query := &prompb.Query{
StartTimestampMs: storage.TimeToPromTimestamp(start),
EndTimestampMs: storage.TimeToPromTimestamp(end),
Matchers: matchers,
}
if n.Offset > 0 {
offset := time.Duration(n.Offset)
start = start.Add(-1 * offset)
end = end.Add(-1 * offset)
}

req.Queries = append(req.Queries, query)
matchers, err := toLabelMatchers(n.LabelMatchers)
if err != nil {
return err
}

query := &prompb.Query{
StartTimestampMs: storage.TimeToPromTimestamp(start),
EndTimestampMs: storage.TimeToPromTimestamp(end),
Matchers: matchers,
}
return nil
})

req.Queries = append(req.Queries, query)
}

return nil
})

return req, nil
}

// ParseRequest parses the compressed request
func ParseRequest(
ctx context.Context,
r *http.Request,
opts options.HandlerOptions,
) (*prompb.ReadRequest, *storage.FetchOptions, *xhttp.ParseError) {
var req *prompb.ReadRequest
var rErr *xhttp.ParseError
switch {
case r.Method == http.MethodGet && strings.TrimSpace(r.FormValue("expr")) != "":
req, rErr = ParseExpr(r)
default:
var rErr *xhttp.ParseError
req, rErr = parseCompressedRequest(r)
if rErr != nil {
return nil, nil, rErr
}
}

if rErr != nil {
return nil, nil, rErr
}

timeout := opts.TimeoutOpts().FetchTimeout
Expand Down
71 changes: 71 additions & 0 deletions src/query/api/v1/handler/prometheus/remote/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
package remote

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -64,6 +67,74 @@ var (
}
)

type testVals struct {
start time.Time
query string
}

func buildBody(query string, start time.Time) io.Reader {
vals := url.Values{}
vals.Add("expr", query)
vals.Add("start", start.Format(time.RFC3339))
vals.Add("end", start.Add(time.Hour).Format(time.RFC3339))
qs := vals.Encode()
return bytes.NewBuffer([]byte(qs))
}

func TestParseExpr(t *testing.T) {
query := "" +
`up{a="b"} + 7 - sum(rate(down{c!="d"}[2m])) + ` +
`left{e=~"f"} offset 30m and right{g!~"h"} + ` + `
max_over_time(foo[1m] offset 1h)`

start := time.Now().Truncate(time.Hour)
req := httptest.NewRequest(http.MethodPost, "/", buildBody(query, start))
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
readReq, err := ParseExpr(req)
require.NoError(t, err)

q := func(start, end time.Time, matchers []*prompb.LabelMatcher) *prompb.Query {
return &prompb.Query{
StartTimestampMs: start.Unix() * 1000,
EndTimestampMs: end.Unix() * 1000,
Matchers: matchers,
}
}

b := func(s string) []byte { return []byte(s) }
expected := []*prompb.Query{
q(start, start.Add(time.Hour),
[]*prompb.LabelMatcher{
{Name: b("a"), Value: b("b"), Type: prompb.LabelMatcher_EQ},
{Name: b("__name__"), Value: b("up"), Type: prompb.LabelMatcher_EQ}}),
q(start.Add(time.Minute*-2), start.Add(time.Hour),
[]*prompb.LabelMatcher{
{Name: b("c"), Value: b("d"), Type: prompb.LabelMatcher_NEQ},
{Name: b("__name__"), Value: b("down"), Type: prompb.LabelMatcher_EQ}}),
q(start.Add(time.Minute*-30), start.Add(time.Minute*30),
[]*prompb.LabelMatcher{
{Name: b("e"), Value: b("f"), Type: prompb.LabelMatcher_RE},
{Name: b("__name__"), Value: b("left"), Type: prompb.LabelMatcher_EQ}}),
q(start, start.Add(time.Hour),
[]*prompb.LabelMatcher{
{Name: b("g"), Value: b("h"), Type: prompb.LabelMatcher_NRE},
{Name: b("__name__"), Value: b("right"), Type: prompb.LabelMatcher_EQ}}),
q(start.Add(time.Minute*-61), start,
[]*prompb.LabelMatcher{
{Name: b("__name__"), Value: b("foo"), Type: prompb.LabelMatcher_EQ}}),
}

for i, q := range expected {
fmt.Println(i, q.String())
}

for i, q := range readReq.Queries {
fmt.Println(i, q.String())
}

assert.Equal(t, expected, readReq.Queries)
}

func newEngine(
s storage.Storage,
lookbackDuration time.Duration,
Expand Down

0 comments on commit 91aa625

Please sign in to comment.