Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Add parse expression functionality #2278

Merged
merged 4 commits into from
Apr 23, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/query/api/v1/handler/prometheus/native/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,16 @@ func (h *promReadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

RenderResultsJSON(w, result, RenderResultsOptions{
err = RenderResultsJSON(w, result, RenderResultsOptions{
Start: parsedOptions.Params.Start,
End: parsedOptions.Params.End,
KeepNaNs: h.opts.Config().ResultOptions.KeepNans,
})

if err != nil {
w.WriteHeader(http.StatusInternalServerError)
logger.Error("failed to render results", zap.Error(err))
} else {
w.WriteHeader(http.StatusOK)
}
}
173 changes: 95 additions & 78 deletions src/query/api/v1/handler/prometheus/remote/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -259,103 +260,119 @@ type ReadResult struct {
Result []*prompb.QueryResult
}

// ParseRequest parses the compressed request
func ParseRequest(
ctx context.Context,
r *http.Request,
opts options.HandlerOptions,
) (*prompb.ReadRequest, *storage.FetchOptions, *xhttp.ParseError) {
// ParseExpr parses a prometheus request expression into the constituent
// fetches, rather than the full query application.
func ParseExpr(r *http.Request) (*prompb.ReadRequest, *xhttp.ParseError) {
var req *prompb.ReadRequest
switch {
case r.Method == http.MethodGet && strings.TrimSpace(r.FormValue("expr")) != "":
exprParam := strings.TrimSpace(r.FormValue("expr"))
queryStart, err := util.ParseTimeString(r.FormValue("start"))
if err != nil {
return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest)
}
exprParam := strings.TrimSpace(r.FormValue("expr"))
if len(exprParam) == 0 {
return nil, xhttp.NewParseError(
fmt.Errorf("cannot parse params: no expr"),
http.StatusBadRequest)
}

queryEnd, err := util.ParseTimeString(r.FormValue("end"))
if err != nil {
return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest)
}
queryStart, err := util.ParseTimeString(r.FormValue("start"))
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

queryEnd, err := util.ParseTimeString(r.FormValue("end"))
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

req = &prompb.ReadRequest{}
req = &prompb.ReadRequest{}
expr, err := promql.ParseExpr(exprParam)
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

expr, err := promql.ParseExpr(exprParam)
if err != nil {
return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest)
}
var (
evalRange time.Duration
)

promql.Inspect(expr, func(node promql.Node, path []promql.Node) error {
var (
evalRange time.Duration
start = queryStart
end = queryEnd
)
promql.Inspect(expr, func(node promql.Node, path []promql.Node) error {
var (
start = queryStart
end = queryEnd
)

switch n := node.(type) {
case *promql.VectorSelector:
if evalRange > 0 {
start = start.Add(-1 * evalRange)
evalRange = 0
}

if n.Offset > 0 {
offsetMilliseconds := time.Duration(n.Offset) * time.Millisecond
start = start.Add(-1 * offsetMilliseconds)
end = end.Add(-1 * offsetMilliseconds)
}

matchers, err := toLabelMatchers(n.LabelMatchers)
if err != nil {
return err
}
switch n := node.(type) {
case *promql.VectorSelector:
if evalRange > 0 {
start = start.Add(-1 * evalRange)
evalRange = 0
}

query := &prompb.Query{
StartTimestampMs: storage.TimeToPromTimestamp(start),
EndTimestampMs: storage.TimeToPromTimestamp(end),
Matchers: matchers,
}
if n.Offset > 0 {
offset := time.Duration(n.Offset)
start = start.Add(-1 * offset)
end = end.Add(-1 * offset)
}

req.Queries = append(req.Queries, query)
matchers, err := toLabelMatchers(n.LabelMatchers)
if err != nil {
return err
}

case *promql.MatrixSelector:
evalRange = n.Range
if evalRange > 0 {
start = start.Add(-1 * evalRange)
evalRange = 0
}
query := &prompb.Query{
StartTimestampMs: storage.TimeToPromTimestamp(start),
EndTimestampMs: storage.TimeToPromTimestamp(end),
Matchers: matchers,
}

if n.Offset > 0 {
offsetMilliseconds := time.Duration(n.Offset) * time.Millisecond
start = start.Add(-1 * offsetMilliseconds)
end = end.Add(-1 * offsetMilliseconds)
}
req.Queries = append(req.Queries, query)

matchers, err := toLabelMatchers(n.LabelMatchers)
if err != nil {
return err
}
case *promql.MatrixSelector:
robskillington marked this conversation as resolved.
Show resolved Hide resolved
evalRange = n.Range
if evalRange > 0 {
start = start.Add(-1 * evalRange)
evalRange = 0
}

query := &prompb.Query{
StartTimestampMs: storage.TimeToPromTimestamp(start),
EndTimestampMs: storage.TimeToPromTimestamp(end),
Matchers: matchers,
}
if n.Offset > 0 {
offset := time.Duration(n.Offset)
start = start.Add(-1 * offset)
end = end.Add(-1 * offset)
}

req.Queries = append(req.Queries, query)
matchers, err := toLabelMatchers(n.LabelMatchers)
if err != nil {
return err
}

query := &prompb.Query{
StartTimestampMs: storage.TimeToPromTimestamp(start),
EndTimestampMs: storage.TimeToPromTimestamp(end),
Matchers: matchers,
}
return nil
})

req.Queries = append(req.Queries, query)
}

return nil
})

return req, nil
}

// ParseRequest parses the compressed request
func ParseRequest(
ctx context.Context,
r *http.Request,
opts options.HandlerOptions,
) (*prompb.ReadRequest, *storage.FetchOptions, *xhttp.ParseError) {
var req *prompb.ReadRequest
var rErr *xhttp.ParseError
switch {
case r.Method == http.MethodGet && strings.TrimSpace(r.FormValue("expr")) != "":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to change this to "query" or what have you to make it compatible with the PromQL query client library methods?

req, rErr = ParseExpr(r)
default:
var rErr *xhttp.ParseError
req, rErr = parseCompressedRequest(r)
if rErr != nil {
return nil, nil, rErr
}
}

if rErr != nil {
return nil, nil, rErr
}

timeout := opts.TimeoutOpts().FetchTimeout
Expand Down
71 changes: 71 additions & 0 deletions src/query/api/v1/handler/prometheus/remote/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
package remote

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -64,6 +67,74 @@ var (
}
)

type testVals struct {
start time.Time
query string
}

func buildBody(query string, start time.Time) io.Reader {
vals := url.Values{}
vals.Add("expr", query)
vals.Add("start", start.Format(time.RFC3339))
vals.Add("end", start.Add(time.Hour).Format(time.RFC3339))
qs := vals.Encode()
return bytes.NewBuffer([]byte(qs))
}

func TestParseExpr(t *testing.T) {
query := "" +
`up{a="b"} + 7 - sum(rate(down{c!="d"}[2m])) + ` +
`left{e=~"f"} offset 30m and right{g!~"h"} + ` + `
max_over_time(foo[1m] offset 1h)`

start := time.Now().Truncate(time.Hour)
req := httptest.NewRequest(http.MethodPost, "/", buildBody(query, start))
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
readReq, err := ParseExpr(req)
require.NoError(t, err)

q := func(start, end time.Time, matchers []*prompb.LabelMatcher) *prompb.Query {
return &prompb.Query{
StartTimestampMs: start.Unix() * 1000,
EndTimestampMs: end.Unix() * 1000,
Matchers: matchers,
}
}

b := func(s string) []byte { return []byte(s) }
expected := []*prompb.Query{
q(start, start.Add(time.Hour),
[]*prompb.LabelMatcher{
{Name: b("a"), Value: b("b"), Type: prompb.LabelMatcher_EQ},
{Name: b("__name__"), Value: b("up"), Type: prompb.LabelMatcher_EQ}}),
q(start.Add(time.Minute*-2), start.Add(time.Hour),
[]*prompb.LabelMatcher{
{Name: b("c"), Value: b("d"), Type: prompb.LabelMatcher_NEQ},
{Name: b("__name__"), Value: b("down"), Type: prompb.LabelMatcher_EQ}}),
q(start.Add(time.Minute*-30), start.Add(time.Minute*30),
[]*prompb.LabelMatcher{
{Name: b("e"), Value: b("f"), Type: prompb.LabelMatcher_RE},
{Name: b("__name__"), Value: b("left"), Type: prompb.LabelMatcher_EQ}}),
q(start, start.Add(time.Hour),
[]*prompb.LabelMatcher{
{Name: b("g"), Value: b("h"), Type: prompb.LabelMatcher_NRE},
{Name: b("__name__"), Value: b("right"), Type: prompb.LabelMatcher_EQ}}),
q(start.Add(time.Minute*-61), start,
[]*prompb.LabelMatcher{
{Name: b("__name__"), Value: b("foo"), Type: prompb.LabelMatcher_EQ}}),
}

for i, q := range expected {
fmt.Println(i, q.String())
}

for i, q := range readReq.Queries {
fmt.Println(i, q.String())
}

assert.Equal(t, expected, readReq.Queries)
}

func newEngine(
s storage.Storage,
lookbackDuration time.Duration,
Expand Down