Skip to content

Commit

Permalink
Add query range API in promclient (#2995)
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored Aug 6, 2020
1 parent 987b746 commit 65049b1
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 0 deletions.
67 changes: 67 additions & 0 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,73 @@ func (c *Client) PromqlQueryInstant(ctx context.Context, base *url.URL, query st
return vec, warnings, nil
}

// QueryRange performs a range query using a default HTTP client and returns results in model.Matrix type.
func (c *Client) QueryRange(ctx context.Context, base *url.URL, query string, startTime, endTime, step int64, opts QueryOptions) (model.Matrix, []string, error) {
params, err := url.ParseQuery(base.RawQuery)
if err != nil {
return nil, nil, errors.Wrapf(err, "parse raw query %s", base.RawQuery)
}
params.Add("query", query)
params.Add("start", formatTime(timestamp.Time(startTime)))
params.Add("end", formatTime(timestamp.Time(endTime)))
params.Add("step", strconv.FormatInt(step, 10))
if err := opts.AddTo(params); err != nil {
return nil, nil, errors.Wrap(err, "add thanos opts query params")
}

u := *base
u.Path = path.Join(u.Path, "/api/v1/query_range")
u.RawQuery = params.Encode()

level.Debug(c.logger).Log("msg", "range query", "url", u.String())

span, ctx := tracing.StartSpan(ctx, "/prom_query_range HTTP[client]")
defer span.Finish()

body, _, err := c.get2xx(ctx, &u)
if err != nil {
return nil, nil, errors.Wrap(err, "read query range response")
}

// Decode only ResultType and load Result only as RawJson since we don't know
// structure of the Result yet.
var m struct {
Data struct {
ResultType string `json:"resultType"`
Result json.RawMessage `json:"result"`
} `json:"data"`

Error string `json:"error,omitempty"`
ErrorType string `json:"errorType,omitempty"`
// Extra field supported by Thanos Querier.
Warnings []string `json:"warnings"`
}

if err = json.Unmarshal(body, &m); err != nil {
return nil, nil, errors.Wrap(err, "unmarshal query range response")
}

var matrixResult model.Matrix

// Decode the Result depending on the ResultType
switch m.Data.ResultType {
case parser.ValueTypeMatrix:
if err = json.Unmarshal(m.Data.Result, &matrixResult); err != nil {
return nil, nil, errors.Wrap(err, "decode result into ValueTypeMatrix")
}
default:
if m.Warnings != nil {
return nil, nil, errors.Errorf("error: %s, type: %s, warning: %s", m.Error, m.ErrorType, strings.Join(m.Warnings, ", "))
}
if m.Error != "" {
return nil, nil, errors.Errorf("error: %s, type: %s", m.Error, m.ErrorType)
}

return nil, nil, errors.Errorf("received status code: 200, unknown response type: '%q'", m.Data.ResultType)
}
return matrixResult, m.Warnings, nil
}

// Scalar response consists of array with mixed types so it needs to be
// unmarshaled separately.
func convertScalarJSONToVector(scalarJSONResult json.RawMessage) (model.Vector, error) {
Expand Down
38 changes: 38 additions & 0 deletions pkg/promclient/promclient_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,41 @@ func TestRule_UnmarshalScalarResponse(t *testing.T) {
_, err = convertScalarJSONToVector(invalidDataScalarJSONResult)
testutil.NotOk(t, err)
}

func TestQueryRange_e2e(t *testing.T) {
e2eutil.ForeachPrometheus(t, func(t testing.TB, p *e2eutil.Prometheus) {
now := time.Now()

ctx := context.Background()
// Create artificial block.
_, err := e2eutil.CreateBlock(
ctx,
p.Dir(),
[]labels.Labels{labels.FromStrings("a", "b")},
10,
timestamp.FromTime(now.Add(-2*time.Hour)),
timestamp.FromTime(now),
nil,
0,
)
testutil.Ok(t, err)

testutil.Ok(t, p.Start())

u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

res, _, err := NewDefaultClient().QueryRange(
ctx,
u,
`{a="b"}`,
timestamp.FromTime(now.Add(-2*time.Hour)),
timestamp.FromTime(now),
14,
QueryOptions{},
)
testutil.Ok(t, err)

testutil.Equals(t, len(res) > 0, true)
})
}

0 comments on commit 65049b1

Please sign in to comment.