From 5e40989427b2a62d6ba2e5c38787fa02ecc32d8e Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Thu, 23 Jul 2020 20:56:11 -0400 Subject: [PATCH] [query] Parse programmatic min/max time as [start_retention, end_retention) (#2487) --- .../prometheus/test.sh | 24 ++++ src/query/api/v1/handler/prom/mocks.go | 40 +++++-- src/query/api/v1/handler/prom/read_instant.go | 58 +-------- src/query/api/v1/handler/prom/read_test.go | 112 +++++++++++++++++- src/query/api/v1/handler/prometheus/common.go | 24 ++-- src/query/util/timing.go | 51 +++++++- src/query/util/timing_test.go | 35 ++++++ 7 files changed, 262 insertions(+), 82 deletions(-) diff --git a/scripts/docker-integration-tests/prometheus/test.sh b/scripts/docker-integration-tests/prometheus/test.sh index 0a34b79437..524724434b 100755 --- a/scripts/docker-integration-tests/prometheus/test.sh +++ b/scripts/docker-integration-tests/prometheus/test.sh @@ -304,6 +304,29 @@ function test_query_restrict_tags { '[[ $(curl -s 0.0.0.0:7201/api/v1/query?query=\\{restricted_metrics_type=\"hidden\"\\} | jq -r ".data.result | length") -eq 0 ]]' } +function test_series { + # Test series search with start/end specified + ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s "0.0.0.0:7201/api/v1/series?match[]=prometheus_remote_storage_succeeded_samples_total&start=0&end=9999999999999.99999" | jq -r ".data | length") -eq 1 ]]' + + # Test series search with no start/end specified + ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s "0.0.0.0:7201/api/v1/series?match[]=prometheus_remote_storage_succeeded_samples_total" | jq -r ".data | length") -eq 1 ]]' + + # Test series search with min/max start time using the Prometheus Go + # min/max formatted timestamps, which is sent as part of a Prometheus + # remote query. + # minTime = time.Unix(math.MinInt64/1000+62135596801, 0).UTC() + # maxTime = time.Unix(math.MaxInt64/1000-62135596801, 999999999).UTC() + # minTimeFormatted = minTime.Format(time.RFC3339Nano) + # maxTimeFormatted = maxTime.Format(time.RFC3339Nano) + # Which: + # minTimeFormatted="-292273086-05-16T16:47:06Z" + # maxTimeFormatted="292277025-08-18T07:12:54.999999999Z" + ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s "0.0.0.0:7201/api/v1/series?match[]=prometheus_remote_storage_succeeded_samples_total&start=-292273086-05-16T16:47:06Z&end=292277025-08-18T07:12:54.999999999Z" | jq -r ".data | length") -eq 1 ]]' +} + echo "Running prometheus tests" test_prometheus_remote_read test_prometheus_remote_write_multi_namespaces @@ -316,6 +339,7 @@ test_query_limits_applied test_query_restrict_metrics_type test_query_restrict_tags test_prometheus_remote_write_map_tags +test_series echo "Running function correctness tests" test_correctness diff --git a/src/query/api/v1/handler/prom/mocks.go b/src/query/api/v1/handler/prom/mocks.go index 71963420d8..7e6e7da830 100644 --- a/src/query/api/v1/handler/prom/mocks.go +++ b/src/query/api/v1/handler/prom/mocks.go @@ -35,40 +35,56 @@ import ( "go.uber.org/zap/zapcore" ) -type mockQuerier struct{} +type mockQuerier struct { + mockOptions +} type mockSeriesSet struct { + mockOptions promstorage.SeriesSet } -func (m mockSeriesSet) Next() bool { return false } -func (m mockSeriesSet) At() promstorage.Series { return nil } -func (m mockSeriesSet) Err() error { return nil } +func (m *mockSeriesSet) Next() bool { return false } +func (m *mockSeriesSet) At() promstorage.Series { return nil } +func (m *mockSeriesSet) Err() error { return nil } -func (mockQuerier) Select( +func (q *mockQuerier) Select( sortSeries bool, hints *promstorage.SelectHints, labelMatchers ...*labels.Matcher, ) (promstorage.SeriesSet, promstorage.Warnings, error) { - return mockSeriesSet{}, nil, nil + if q.mockOptions.selectFn != nil { + return q.mockOptions.selectFn(sortSeries, hints, labelMatchers...) + } + return &mockSeriesSet{mockOptions: q.mockOptions}, nil, nil } -func (mockQuerier) LabelValues(name string) ([]string, promstorage.Warnings, error) { +func (*mockQuerier) LabelValues(name string) ([]string, promstorage.Warnings, error) { return nil, nil, errors.New("not implemented") } -func (mockQuerier) LabelNames() ([]string, promstorage.Warnings, error) { +func (*mockQuerier) LabelNames() ([]string, promstorage.Warnings, error) { return nil, nil, errors.New("not implemented") } -func (mockQuerier) Close() error { +func (*mockQuerier) Close() error { return nil } -type mockQueryable struct{} +type mockOptions struct { + selectFn func( + sortSeries bool, + hints *promstorage.SelectHints, + labelMatchers ...*labels.Matcher, + ) (promstorage.SeriesSet, promstorage.Warnings, error) +} + +type mockQueryable struct { + mockOptions +} -func (mockQueryable) Querier(_ context.Context, _, _ int64) (promstorage.Querier, error) { - return mockQuerier{}, nil +func (q *mockQueryable) Querier(_ context.Context, _, _ int64) (promstorage.Querier, error) { + return &mockQuerier{mockOptions: q.mockOptions}, nil } func newMockPromQLEngine() *promql.Engine { diff --git a/src/query/api/v1/handler/prom/read_instant.go b/src/query/api/v1/handler/prom/read_instant.go index 6f443b7c0c..53682dce19 100644 --- a/src/query/api/v1/handler/prom/read_instant.go +++ b/src/query/api/v1/handler/prom/read_instant.go @@ -23,17 +23,15 @@ package prom import ( "context" "fmt" - "math" "net/http" - "strconv" "time" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/api/v1/options" "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/storage/prometheus" + "github.com/m3db/m3/src/query/util" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/promql" promstorage "github.com/prometheus/prometheus/storage" "github.com/uber-go/tally" @@ -66,16 +64,10 @@ func newReadInstantHandler( } func (h *readInstantHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - var ts time.Time - if t := r.FormValue("time"); t != "" { - var err error - ts, err = parseTime(t) - if err != nil { - respondError(w, err, http.StatusBadRequest) - return - } - } else { - ts = time.Now() + ts, err := util.ParseTimeStringWithDefault(r.FormValue("time"), time.Now()) + if err != nil { + respondError(w, err, http.StatusBadRequest) + return } fetchOptions, fetchErr := h.hOpts.FetchOptionsBuilder().NewFetchOptions(r) @@ -93,7 +85,7 @@ func (h *readInstantHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx = context.WithValue(ctx, prometheus.BlockResultMetadataKey, &resultMetadata) if t := r.FormValue("timeout"); t != "" { - timeout, err := parseDuration(t) + timeout, err := util.ParseDurationString(t) if err != nil { err = fmt.Errorf("invalid parameter 'timeout': %v", err) respondError(w, err, http.StatusBadRequest) @@ -130,41 +122,3 @@ func (h *readInstantHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ResultType: res.Value.Type(), }, res.Warnings) } - -func parseTime(s string) (time.Time, error) { - if t, err := strconv.ParseFloat(s, 64); err == nil { - s, ns := math.Modf(t) - ns = math.Round(ns*1000) / 1000 - return time.Unix(int64(s), int64(ns*float64(time.Second))), nil - } - if t, err := time.Parse(time.RFC3339Nano, s); err == nil { - return t, nil - } - - // Stdlib's time parser can only handle 4 digit years. As a workaround until - // that is fixed we want to at least support our own boundary times. - // Context: https://github.com/prometheus/client_golang/issues/614 - // Upstream issue: https://github.com/golang/go/issues/20555 - switch s { - case minTimeFormatted: - return time.Time{}, nil - case maxTimeFormatted: - return time.Now(), nil - } - - return time.Time{}, fmt.Errorf("cannot parse %q to a valid timestamp", s) -} - -func parseDuration(s string) (time.Duration, error) { - if d, err := strconv.ParseFloat(s, 64); err == nil { - ts := d * float64(time.Second) - if ts > float64(math.MaxInt64) || ts < float64(math.MinInt64) { - return 0, fmt.Errorf("cannot parse %q to a valid duration. It overflows int64", s) - } - return time.Duration(ts), nil - } - if d, err := model.ParseDuration(s); err == nil { - return time.Duration(d), nil - } - return 0, fmt.Errorf("cannot parse %q to a valid duration", s) -} diff --git a/src/query/api/v1/handler/prom/read_test.go b/src/query/api/v1/handler/prom/read_test.go index e5a7167090..6176271d23 100644 --- a/src/query/api/v1/handler/prom/read_test.go +++ b/src/query/api/v1/handler/prom/read_test.go @@ -22,6 +22,7 @@ package prom import ( "encoding/json" + "fmt" "net/http" "net/http/httptest" "net/url" @@ -34,6 +35,8 @@ import ( "github.com/m3db/m3/src/query/api/v1/options" "github.com/m3db/m3/src/query/executor" "github.com/m3db/m3/src/x/instrument" + "github.com/prometheus/prometheus/pkg/labels" + promstorage "github.com/prometheus/prometheus/storage" "github.com/stretchr/testify/require" ) @@ -42,6 +45,7 @@ const promQuery = `http_requests_total{job="prometheus",group="canary"}` var testPromQLEngine = newMockPromQLEngine() type testHandlers struct { + queryable *mockQueryable readHandler http.Handler readInstantHandler http.Handler } @@ -66,10 +70,11 @@ func setupTest(t *testing.T) testHandlers { SetFetchOptionsBuilder(fetchOptsBuilder). SetEngine(engine). SetTimeoutOpts(timeoutOpts) - queryable := mockQueryable{} + queryable := &mockQueryable{} readHandler := newReadHandler(opts, hOpts, queryable) readInstantHandler := newReadInstantHandler(opts, hOpts, queryable) return testHandlers{ + queryable: queryable, readHandler: readHandler, readInstantHandler: readInstantHandler, } @@ -149,3 +154,108 @@ func TestPromReadInstantHandlerInvalidQuery(t *testing.T) { require.NoError(t, json.Unmarshal(recorder.Body.Bytes(), &resp)) require.Equal(t, statusError, resp.Status) } + +func TestPromReadInstantHandlerParseMinTime(t *testing.T) { + setup := setupTest(t) + + var ( + query *promstorage.SelectHints + selects int + ) + setup.queryable.selectFn = func( + sortSeries bool, + hints *promstorage.SelectHints, + labelMatchers ...*labels.Matcher, + ) (promstorage.SeriesSet, promstorage.Warnings, error) { + selects++ + query = hints + return &mockSeriesSet{}, nil, nil + } + + req, _ := http.NewRequest("GET", native.PromReadInstantURL, nil) + params := defaultParams() + params.Set("time", minTimeFormatted) + req.URL.RawQuery = params.Encode() + + var resp response + recorder := httptest.NewRecorder() + + setup.readInstantHandler.ServeHTTP(recorder, req) + + require.NoError(t, json.Unmarshal(recorder.Body.Bytes(), &resp)) + require.Equal(t, statusSuccess, resp.Status) + + require.Equal(t, 1, selects) + + fudge := 5 * time.Minute // Need to account for lookback + expected := time.Unix(0, 0) + actual := millisTime(query.Start) + require.True(t, abs(expected.Sub(actual)) <= fudge, + fmt.Sprintf("expected=%v, actual=%v, fudge=%v, delta=%v", + expected, actual, fudge, expected.Sub(actual))) + + fudge = 5 * time.Minute // Need to account for lookback + expected = time.Unix(0, 0) + actual = millisTime(query.Start) + require.True(t, abs(expected.Sub(actual)) <= fudge, + fmt.Sprintf("expected=%v, actual=%v, fudge=%v, delta=%v", + expected, actual, fudge, expected.Sub(actual))) +} + +func TestPromReadInstantHandlerParseMaxTime(t *testing.T) { + setup := setupTest(t) + + var ( + query *promstorage.SelectHints + selects int + ) + setup.queryable.selectFn = func( + sortSeries bool, + hints *promstorage.SelectHints, + labelMatchers ...*labels.Matcher, + ) (promstorage.SeriesSet, promstorage.Warnings, error) { + selects++ + query = hints + return &mockSeriesSet{}, nil, nil + } + + req, _ := http.NewRequest("GET", native.PromReadInstantURL, nil) + params := defaultParams() + params.Set("time", maxTimeFormatted) + req.URL.RawQuery = params.Encode() + + var resp response + recorder := httptest.NewRecorder() + + setup.readInstantHandler.ServeHTTP(recorder, req) + + require.NoError(t, json.Unmarshal(recorder.Body.Bytes(), &resp)) + require.Equal(t, statusSuccess, resp.Status) + + require.Equal(t, 1, selects) + + fudge := 6 * time.Minute // Need to account for lookback + time.Now() skew + expected := time.Now() + actual := millisTime(query.Start) + require.True(t, abs(expected.Sub(actual)) <= fudge, + fmt.Sprintf("expected=%v, actual=%v, fudge=%v, delta=%v", + expected, actual, fudge, expected.Sub(actual))) + + fudge = 6 * time.Minute // Need to account for lookback + time.Now() skew + expected = time.Now() + actual = millisTime(query.Start) + require.True(t, abs(expected.Sub(actual)) <= fudge, + fmt.Sprintf("expected=%v, actual=%v, fudge=%v, delta=%v", + expected, actual, fudge, expected.Sub(actual))) +} + +func abs(v time.Duration) time.Duration { + if v < 0 { + return v * -1 + } + return v +} + +func millisTime(timestampMilliseconds int64) time.Time { + return time.Unix(0, timestampMilliseconds*int64(time.Millisecond)) +} diff --git a/src/query/api/v1/handler/prometheus/common.go b/src/query/api/v1/handler/prometheus/common.go index 2ead521de8..0cf92573a4 100644 --- a/src/query/api/v1/handler/prometheus/common.go +++ b/src/query/api/v1/handler/prometheus/common.go @@ -149,12 +149,14 @@ func ParseTagCompletionParamsToQueries( r *http.Request, ) (TagCompletionQueries, *xhttp.ParseError) { tagCompletionQueries := TagCompletionQueries{} - start, err := parseTimeWithDefault(r, "start", time.Time{}) + start, err := util.ParseTimeStringWithDefault(r.FormValue("start"), + time.Unix(0, 0)) if err != nil { return tagCompletionQueries, xhttp.NewParseError(err, http.StatusBadRequest) } - end, err := parseTimeWithDefault(r, "end", time.Now()) + end, err := util.ParseTimeStringWithDefault(r.FormValue("end"), + time.Now()) if err != nil { return tagCompletionQueries, xhttp.NewParseError(err, http.StatusBadRequest) } @@ -218,18 +220,6 @@ func parseTagCompletionQueries(r *http.Request) ([]string, error) { return queries, nil } -func parseTimeWithDefault( - r *http.Request, - key string, - defaultTime time.Time, -) (time.Time, error) { - if t := r.FormValue(key); t != "" { - return util.ParseTimeString(t) - } - - return defaultTime, nil -} - // ParseSeriesMatchQuery parses all params from the GET request. func ParseSeriesMatchQuery( r *http.Request, @@ -241,12 +231,14 @@ func ParseSeriesMatchQuery( return nil, xhttp.NewParseError(errors.ErrInvalidMatchers, http.StatusBadRequest) } - start, err := parseTimeWithDefault(r, "start", time.Time{}) + start, err := util.ParseTimeStringWithDefault(r.FormValue("start"), + time.Unix(0, 0)) if err != nil { return nil, xhttp.NewParseError(err, http.StatusBadRequest) } - end, err := parseTimeWithDefault(r, "end", time.Now()) + end, err := util.ParseTimeStringWithDefault(r.FormValue("end"), + time.Now()) if err != nil { return nil, xhttp.NewParseError(err, http.StatusBadRequest) } diff --git a/src/query/util/timing.go b/src/query/util/timing.go index 1c7ab0837f..6d15467dce 100644 --- a/src/query/util/timing.go +++ b/src/query/util/timing.go @@ -25,12 +25,23 @@ import ( "math" "strconv" "time" + + "github.com/prometheus/common/model" +) + +var ( + minTime = time.Unix(math.MinInt64/1000+62135596801, 0).UTC() + maxTime = time.Unix(math.MaxInt64/1000-62135596801, 999999999).UTC() + + minTimeFormatted = minTime.Format(time.RFC3339Nano) + maxTimeFormatted = maxTime.Format(time.RFC3339Nano) ) -// ParseTimeString parses a time string into time.Time +// ParseTimeString parses a time string into time.Time. func ParseTimeString(s string) (time.Time, error) { if t, err := strconv.ParseFloat(s, 64); err == nil { s, ns := math.Modf(t) + ns = math.Round(ns*1000) / 1000 return time.Unix(int64(s), int64(ns*float64(time.Second))), nil } @@ -38,9 +49,47 @@ func ParseTimeString(s string) (time.Time, error) { return t, nil } + // Stdlib's time parser can only handle 4 digit years. As a workaround until + // that is fixed we want to at least support our own boundary times. + // Context: https://github.com/prometheus/client_golang/issues/614 + // Upstream issue: https://github.com/golang/go/issues/20555 + switch s { + case minTimeFormatted: + return time.Unix(0, 0), nil + case maxTimeFormatted: + return time.Now(), nil + } + return time.Time{}, fmt.Errorf("invalid timestamp for %s", s) } +// ParseTimeStringWithDefault parses a time string into time.Time. +func ParseTimeStringWithDefault( + s string, + defaultTime time.Time, +) (time.Time, error) { + if s != "" { + return ParseTimeString(s) + } + return defaultTime, nil +} + +// ParseDurationString parses a string duration allows for +// float seconds and also time strings such as 7d5h, etc. +func ParseDurationString(s string) (time.Duration, error) { + if d, err := strconv.ParseFloat(s, 64); err == nil { + ts := d * float64(time.Second) + if ts > float64(math.MaxInt64) || ts < float64(math.MinInt64) { + return 0, fmt.Errorf("cannot parse %q to a valid duration. It overflows int64", s) + } + return time.Duration(ts), nil + } + if d, err := model.ParseDuration(s); err == nil { + return time.Duration(d), nil + } + return 0, fmt.Errorf("cannot parse %q to a valid duration", s) +} + // DurationToMS converts a duration into milliseconds func DurationToMS(duration time.Duration) int64 { return duration.Nanoseconds() / int64(time.Millisecond) diff --git a/src/query/util/timing_test.go b/src/query/util/timing_test.go index 12485bc140..4f58011cfe 100644 --- a/src/query/util/timing_test.go +++ b/src/query/util/timing_test.go @@ -35,3 +35,38 @@ func TestParseTimeString(t *testing.T) { equalTimes := parsedTime.Equal(time.Unix(703354793, 0)) assert.True(t, equalTimes) } + +func TestParseTimeStringWithMinMaxGoTime(t *testing.T) { + parsedTime, err := ParseTimeString(minTimeFormatted) + require.NoError(t, err) + require.True(t, parsedTime.Equal(time.Unix(0, 0))) + + parsedTime, err = ParseTimeString(maxTimeFormatted) + require.NoError(t, err) + require.True(t, time.Now().Sub(parsedTime) < time.Minute) +} + +func TestParseTimeStringLargeFloat(t *testing.T) { + _, err := ParseTimeString("9999999999999.99999") + require.NoError(t, err) +} + +func TestParseTimeStringWithDefault(t *testing.T) { + defaultTime := time.Now().Add(-1 * time.Minute) + parsedTime, err := ParseTimeStringWithDefault("", defaultTime) + require.NoError(t, err) + require.True(t, defaultTime.Equal(parsedTime)) +} + +func TestParseDurationStringFloat(t *testing.T) { + d, err := ParseDurationString("1595545968.4985256") + require.NoError(t, err) + v := time.Duration(1595545968.4985256 * float64(time.Second)) + require.Equal(t, v, d) +} + +func TestParseDurationStringExtendedDurationString(t *testing.T) { + d, err := ParseDurationString("2d") + require.NoError(t, err) + require.Equal(t, 2*24*time.Hour, d) +}