Skip to content

Commit

Permalink
[query] Parse programmatic min/max time as [start_retention, end_rete…
Browse files Browse the repository at this point in the history
…ntion) (#2487)
  • Loading branch information
robskillington authored Jul 24, 2020
1 parent 6a2ee8c commit 5e40989
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 82 deletions.
24 changes: 24 additions & 0 deletions scripts/docker-integration-tests/prometheus/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,29 @@ function test_query_restrict_tags {
'[[ $(curl -s 0.0.0.0:7201/api/v1/query?query=\\{restricted_metrics_type=\"hidden\"\\} | jq -r ".data.result | length") -eq 0 ]]'
}

function test_series {
# Test series search with start/end specified
ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s "0.0.0.0:7201/api/v1/series?match[]=prometheus_remote_storage_succeeded_samples_total&start=0&end=9999999999999.99999" | jq -r ".data | length") -eq 1 ]]'

# Test series search with no start/end specified
ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s "0.0.0.0:7201/api/v1/series?match[]=prometheus_remote_storage_succeeded_samples_total" | jq -r ".data | length") -eq 1 ]]'

# Test series search with min/max start time using the Prometheus Go
# min/max formatted timestamps, which is sent as part of a Prometheus
# remote query.
# minTime = time.Unix(math.MinInt64/1000+62135596801, 0).UTC()
# maxTime = time.Unix(math.MaxInt64/1000-62135596801, 999999999).UTC()
# minTimeFormatted = minTime.Format(time.RFC3339Nano)
# maxTimeFormatted = maxTime.Format(time.RFC3339Nano)
# Which:
# minTimeFormatted="-292273086-05-16T16:47:06Z"
# maxTimeFormatted="292277025-08-18T07:12:54.999999999Z"
ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s "0.0.0.0:7201/api/v1/series?match[]=prometheus_remote_storage_succeeded_samples_total&start=-292273086-05-16T16:47:06Z&end=292277025-08-18T07:12:54.999999999Z" | jq -r ".data | length") -eq 1 ]]'
}

echo "Running prometheus tests"
test_prometheus_remote_read
test_prometheus_remote_write_multi_namespaces
Expand All @@ -316,6 +339,7 @@ test_query_limits_applied
test_query_restrict_metrics_type
test_query_restrict_tags
test_prometheus_remote_write_map_tags
test_series

echo "Running function correctness tests"
test_correctness
Expand Down
40 changes: 28 additions & 12 deletions src/query/api/v1/handler/prom/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,40 +35,56 @@ import (
"go.uber.org/zap/zapcore"
)

type mockQuerier struct{}
type mockQuerier struct {
mockOptions
}

type mockSeriesSet struct {
mockOptions
promstorage.SeriesSet
}

func (m mockSeriesSet) Next() bool { return false }
func (m mockSeriesSet) At() promstorage.Series { return nil }
func (m mockSeriesSet) Err() error { return nil }
func (m *mockSeriesSet) Next() bool { return false }
func (m *mockSeriesSet) At() promstorage.Series { return nil }
func (m *mockSeriesSet) Err() error { return nil }

func (mockQuerier) Select(
func (q *mockQuerier) Select(
sortSeries bool,
hints *promstorage.SelectHints,
labelMatchers ...*labels.Matcher,
) (promstorage.SeriesSet, promstorage.Warnings, error) {
return mockSeriesSet{}, nil, nil
if q.mockOptions.selectFn != nil {
return q.mockOptions.selectFn(sortSeries, hints, labelMatchers...)
}
return &mockSeriesSet{mockOptions: q.mockOptions}, nil, nil
}

func (mockQuerier) LabelValues(name string) ([]string, promstorage.Warnings, error) {
func (*mockQuerier) LabelValues(name string) ([]string, promstorage.Warnings, error) {
return nil, nil, errors.New("not implemented")
}

func (mockQuerier) LabelNames() ([]string, promstorage.Warnings, error) {
func (*mockQuerier) LabelNames() ([]string, promstorage.Warnings, error) {
return nil, nil, errors.New("not implemented")
}

func (mockQuerier) Close() error {
func (*mockQuerier) Close() error {
return nil
}

type mockQueryable struct{}
type mockOptions struct {
selectFn func(
sortSeries bool,
hints *promstorage.SelectHints,
labelMatchers ...*labels.Matcher,
) (promstorage.SeriesSet, promstorage.Warnings, error)
}

type mockQueryable struct {
mockOptions
}

func (mockQueryable) Querier(_ context.Context, _, _ int64) (promstorage.Querier, error) {
return mockQuerier{}, nil
func (q *mockQueryable) Querier(_ context.Context, _, _ int64) (promstorage.Querier, error) {
return &mockQuerier{mockOptions: q.mockOptions}, nil
}

func newMockPromQLEngine() *promql.Engine {
Expand Down
58 changes: 6 additions & 52 deletions src/query/api/v1/handler/prom/read_instant.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,15 @@ package prom
import (
"context"
"fmt"
"math"
"net/http"
"strconv"
"time"

"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/storage/prometheus"
"github.com/m3db/m3/src/query/util"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql"
promstorage "github.com/prometheus/prometheus/storage"
"github.com/uber-go/tally"
Expand Down Expand Up @@ -66,16 +64,10 @@ func newReadInstantHandler(
}

func (h *readInstantHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var ts time.Time
if t := r.FormValue("time"); t != "" {
var err error
ts, err = parseTime(t)
if err != nil {
respondError(w, err, http.StatusBadRequest)
return
}
} else {
ts = time.Now()
ts, err := util.ParseTimeStringWithDefault(r.FormValue("time"), time.Now())
if err != nil {
respondError(w, err, http.StatusBadRequest)
return
}

fetchOptions, fetchErr := h.hOpts.FetchOptionsBuilder().NewFetchOptions(r)
Expand All @@ -93,7 +85,7 @@ func (h *readInstantHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx = context.WithValue(ctx, prometheus.BlockResultMetadataKey, &resultMetadata)

if t := r.FormValue("timeout"); t != "" {
timeout, err := parseDuration(t)
timeout, err := util.ParseDurationString(t)
if err != nil {
err = fmt.Errorf("invalid parameter 'timeout': %v", err)
respondError(w, err, http.StatusBadRequest)
Expand Down Expand Up @@ -130,41 +122,3 @@ func (h *readInstantHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ResultType: res.Value.Type(),
}, res.Warnings)
}

func parseTime(s string) (time.Time, error) {
if t, err := strconv.ParseFloat(s, 64); err == nil {
s, ns := math.Modf(t)
ns = math.Round(ns*1000) / 1000
return time.Unix(int64(s), int64(ns*float64(time.Second))), nil
}
if t, err := time.Parse(time.RFC3339Nano, s); err == nil {
return t, nil
}

// Stdlib's time parser can only handle 4 digit years. As a workaround until
// that is fixed we want to at least support our own boundary times.
// Context: https://github.com/prometheus/client_golang/issues/614
// Upstream issue: https://github.com/golang/go/issues/20555
switch s {
case minTimeFormatted:
return time.Time{}, nil
case maxTimeFormatted:
return time.Now(), nil
}

return time.Time{}, fmt.Errorf("cannot parse %q to a valid timestamp", s)
}

func parseDuration(s string) (time.Duration, error) {
if d, err := strconv.ParseFloat(s, 64); err == nil {
ts := d * float64(time.Second)
if ts > float64(math.MaxInt64) || ts < float64(math.MinInt64) {
return 0, fmt.Errorf("cannot parse %q to a valid duration. It overflows int64", s)
}
return time.Duration(ts), nil
}
if d, err := model.ParseDuration(s); err == nil {
return time.Duration(d), nil
}
return 0, fmt.Errorf("cannot parse %q to a valid duration", s)
}
112 changes: 111 additions & 1 deletion src/query/api/v1/handler/prom/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package prom

import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
Expand All @@ -34,6 +35,8 @@ import (
"github.com/m3db/m3/src/query/api/v1/options"
"github.com/m3db/m3/src/query/executor"
"github.com/m3db/m3/src/x/instrument"
"github.com/prometheus/prometheus/pkg/labels"
promstorage "github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/require"
)

Expand All @@ -42,6 +45,7 @@ const promQuery = `http_requests_total{job="prometheus",group="canary"}`
var testPromQLEngine = newMockPromQLEngine()

type testHandlers struct {
queryable *mockQueryable
readHandler http.Handler
readInstantHandler http.Handler
}
Expand All @@ -66,10 +70,11 @@ func setupTest(t *testing.T) testHandlers {
SetFetchOptionsBuilder(fetchOptsBuilder).
SetEngine(engine).
SetTimeoutOpts(timeoutOpts)
queryable := mockQueryable{}
queryable := &mockQueryable{}
readHandler := newReadHandler(opts, hOpts, queryable)
readInstantHandler := newReadInstantHandler(opts, hOpts, queryable)
return testHandlers{
queryable: queryable,
readHandler: readHandler,
readInstantHandler: readInstantHandler,
}
Expand Down Expand Up @@ -149,3 +154,108 @@ func TestPromReadInstantHandlerInvalidQuery(t *testing.T) {
require.NoError(t, json.Unmarshal(recorder.Body.Bytes(), &resp))
require.Equal(t, statusError, resp.Status)
}

func TestPromReadInstantHandlerParseMinTime(t *testing.T) {
setup := setupTest(t)

var (
query *promstorage.SelectHints
selects int
)
setup.queryable.selectFn = func(
sortSeries bool,
hints *promstorage.SelectHints,
labelMatchers ...*labels.Matcher,
) (promstorage.SeriesSet, promstorage.Warnings, error) {
selects++
query = hints
return &mockSeriesSet{}, nil, nil
}

req, _ := http.NewRequest("GET", native.PromReadInstantURL, nil)
params := defaultParams()
params.Set("time", minTimeFormatted)
req.URL.RawQuery = params.Encode()

var resp response
recorder := httptest.NewRecorder()

setup.readInstantHandler.ServeHTTP(recorder, req)

require.NoError(t, json.Unmarshal(recorder.Body.Bytes(), &resp))
require.Equal(t, statusSuccess, resp.Status)

require.Equal(t, 1, selects)

fudge := 5 * time.Minute // Need to account for lookback
expected := time.Unix(0, 0)
actual := millisTime(query.Start)
require.True(t, abs(expected.Sub(actual)) <= fudge,
fmt.Sprintf("expected=%v, actual=%v, fudge=%v, delta=%v",
expected, actual, fudge, expected.Sub(actual)))

fudge = 5 * time.Minute // Need to account for lookback
expected = time.Unix(0, 0)
actual = millisTime(query.Start)
require.True(t, abs(expected.Sub(actual)) <= fudge,
fmt.Sprintf("expected=%v, actual=%v, fudge=%v, delta=%v",
expected, actual, fudge, expected.Sub(actual)))
}

func TestPromReadInstantHandlerParseMaxTime(t *testing.T) {
setup := setupTest(t)

var (
query *promstorage.SelectHints
selects int
)
setup.queryable.selectFn = func(
sortSeries bool,
hints *promstorage.SelectHints,
labelMatchers ...*labels.Matcher,
) (promstorage.SeriesSet, promstorage.Warnings, error) {
selects++
query = hints
return &mockSeriesSet{}, nil, nil
}

req, _ := http.NewRequest("GET", native.PromReadInstantURL, nil)
params := defaultParams()
params.Set("time", maxTimeFormatted)
req.URL.RawQuery = params.Encode()

var resp response
recorder := httptest.NewRecorder()

setup.readInstantHandler.ServeHTTP(recorder, req)

require.NoError(t, json.Unmarshal(recorder.Body.Bytes(), &resp))
require.Equal(t, statusSuccess, resp.Status)

require.Equal(t, 1, selects)

fudge := 6 * time.Minute // Need to account for lookback + time.Now() skew
expected := time.Now()
actual := millisTime(query.Start)
require.True(t, abs(expected.Sub(actual)) <= fudge,
fmt.Sprintf("expected=%v, actual=%v, fudge=%v, delta=%v",
expected, actual, fudge, expected.Sub(actual)))

fudge = 6 * time.Minute // Need to account for lookback + time.Now() skew
expected = time.Now()
actual = millisTime(query.Start)
require.True(t, abs(expected.Sub(actual)) <= fudge,
fmt.Sprintf("expected=%v, actual=%v, fudge=%v, delta=%v",
expected, actual, fudge, expected.Sub(actual)))
}

func abs(v time.Duration) time.Duration {
if v < 0 {
return v * -1
}
return v
}

func millisTime(timestampMilliseconds int64) time.Time {
return time.Unix(0, timestampMilliseconds*int64(time.Millisecond))
}
24 changes: 8 additions & 16 deletions src/query/api/v1/handler/prometheus/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,14 @@ func ParseTagCompletionParamsToQueries(
r *http.Request,
) (TagCompletionQueries, *xhttp.ParseError) {
tagCompletionQueries := TagCompletionQueries{}
start, err := parseTimeWithDefault(r, "start", time.Time{})
start, err := util.ParseTimeStringWithDefault(r.FormValue("start"),
time.Unix(0, 0))
if err != nil {
return tagCompletionQueries, xhttp.NewParseError(err, http.StatusBadRequest)
}

end, err := parseTimeWithDefault(r, "end", time.Now())
end, err := util.ParseTimeStringWithDefault(r.FormValue("end"),
time.Now())
if err != nil {
return tagCompletionQueries, xhttp.NewParseError(err, http.StatusBadRequest)
}
Expand Down Expand Up @@ -218,18 +220,6 @@ func parseTagCompletionQueries(r *http.Request) ([]string, error) {
return queries, nil
}

func parseTimeWithDefault(
r *http.Request,
key string,
defaultTime time.Time,
) (time.Time, error) {
if t := r.FormValue(key); t != "" {
return util.ParseTimeString(t)
}

return defaultTime, nil
}

// ParseSeriesMatchQuery parses all params from the GET request.
func ParseSeriesMatchQuery(
r *http.Request,
Expand All @@ -241,12 +231,14 @@ func ParseSeriesMatchQuery(
return nil, xhttp.NewParseError(errors.ErrInvalidMatchers, http.StatusBadRequest)
}

start, err := parseTimeWithDefault(r, "start", time.Time{})
start, err := util.ParseTimeStringWithDefault(r.FormValue("start"),
time.Unix(0, 0))
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}

end, err := parseTimeWithDefault(r, "end", time.Now())
end, err := util.ParseTimeStringWithDefault(r.FormValue("end"),
time.Now())
if err != nil {
return nil, xhttp.NewParseError(err, http.StatusBadRequest)
}
Expand Down
Loading

0 comments on commit 5e40989

Please sign in to comment.