diff --git a/CHANGELOG.md b/CHANGELOG.md index d0f7fb606c..c46de23483 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ We use *breaking* word for marking changes that are not backward compatible (rel ## Unreleased +### Fixed + +- [#1146](https://github.com/improbable-eng/thanos/pull/1146) store/bucket: make getFor() work with interleaved resolutions + ### Added - [#1094](https://github.com/improbable-eng/thanos/pull/1094) Allow configuring the response header timeout for the S3 client. diff --git a/go.mod b/go.mod index 5ed4dedc09..46231cd576 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/hashicorp/golang-lru v0.5.1 github.com/hashicorp/memberlist v0.1.3 github.com/julienschmidt/httprouter v1.1.0 // indirect + github.com/leanovate/gopter v0.2.4 github.com/lovoo/gcloud-opentracing v0.3.0 github.com/miekg/dns v1.1.8 github.com/minio/minio-go v0.0.0-20200511070425-f33eae714a28 diff --git a/go.sum b/go.sum index 4b637c57d4..7fc481ed46 100644 --- a/go.sum +++ b/go.sum @@ -155,6 +155,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/leanovate/gopter v0.2.4 h1:U4YLBggDFhJdqQsG4Na2zX7joVTky9vHaj/AGEwSuXU= +github.com/leanovate/gopter v0.2.4/go.mod h1:gNcbPWNEWRe4lm+bycKqxUYoH5uoVje5SkOJ3uoLer8= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lightstep/lightstep-tracer-go v0.15.6/go.mod h1:6AMpwZpsyCFwSovxzM78e+AsYxE8sGwiM6C3TytaWeI= github.com/lovoo/gcloud-opentracing v0.3.0 h1:nAeKG70rIsog0TelcEtt6KU0Y1s5qXtsDLnHp0urPLU= diff --git a/pkg/query/api/v1.go b/pkg/query/api/v1.go index 58f6b2df2e..8c85bef1b0 100644 --- a/pkg/query/api/v1.go +++ b/pkg/query/api/v1.go @@ -203,9 +203,9 @@ func (api *API) parseEnableDedupParam(r *http.Request) (enableDeduplication bool return enableDeduplication, nil } -func (api *API) parseDownsamplingParam(r *http.Request, step time.Duration) (maxSourceResolution time.Duration, _ *ApiError) { +func (api *API) parseDownsamplingParamMillis(r *http.Request, step time.Duration) (maxResolutionMillis int64, _ *ApiError) { const maxSourceResolutionParam = "max_source_resolution" - maxSourceResolution = 0 * time.Second + maxSourceResolution := 0 * time.Second if api.enableAutodownsampling { // If no max_source_resolution is specified fit at least 5 samples between steps. @@ -223,7 +223,7 @@ func (api *API) parseDownsamplingParam(r *http.Request, step time.Duration) (max return 0, &ApiError{errorBadData, errors.Errorf("negative '%s' is not accepted. Try a positive integer", maxSourceResolutionParam)} } - return maxSourceResolution, nil + return int64(maxSourceResolution / time.Millisecond), nil } func (api *API) parsePartialResponseParam(r *http.Request) (enablePartialResponse bool, _ *ApiError) { @@ -366,7 +366,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) { return nil, nil, apiErr } - maxSourceResolution, apiErr := api.parseDownsamplingParam(r, step) + maxSourceResolution, apiErr := api.parseDownsamplingParamMillis(r, step) if apiErr != nil { return nil, nil, apiErr } diff --git a/pkg/query/api/v1_test.go b/pkg/query/api/v1_test.go index 10b1792e2d..696778ce85 100644 --- a/pkg/query/api/v1_test.go +++ b/pkg/query/api/v1_test.go @@ -30,6 +30,7 @@ import ( "time" "github.com/go-kit/kit/log" + "github.com/improbable-eng/thanos/pkg/compact" "github.com/improbable-eng/thanos/pkg/query" "github.com/improbable-eng/thanos/pkg/testutil" opentracing "github.com/opentracing/opentracing-go" @@ -42,7 +43,7 @@ import ( ) func testQueryableCreator(queryable storage.Queryable) query.QueryableCreator { - return func(_ bool, _ time.Duration, _ bool, _ query.WarningReporter) storage.Queryable { + return func(_ bool, _ int64, _ bool, _ query.WarningReporter) storage.Queryable { return queryable } } @@ -833,3 +834,71 @@ func BenchmarkQueryResultEncoding(b *testing.B) { testutil.Ok(b, err) fmt.Println(len(c)) } + +func TestParseDownsamplingParamMillis(t *testing.T) { + var tests = []struct { + maxSourceResolutionParam string + result int64 + step time.Duration + fail bool + enableAutodownsampling bool + }{ + { + maxSourceResolutionParam: "0s", + enableAutodownsampling: false, + step: time.Hour, + result: int64(compact.ResolutionLevelRaw), + fail: false, + }, + { + maxSourceResolutionParam: "5m", + step: time.Hour, + enableAutodownsampling: false, + result: int64(compact.ResolutionLevel5m), + fail: false, + }, + { + maxSourceResolutionParam: "1h", + step: time.Hour, + enableAutodownsampling: false, + result: int64(compact.ResolutionLevel1h), + fail: false, + }, + { + maxSourceResolutionParam: "", + enableAutodownsampling: true, + step: time.Hour, + result: int64(time.Hour / (5 * 1000 * 1000)), + fail: false, + }, + { + maxSourceResolutionParam: "", + enableAutodownsampling: true, + step: time.Hour, + result: int64((1 * time.Hour) / 6), + fail: true, + }, + { + maxSourceResolutionParam: "", + enableAutodownsampling: true, + step: time.Hour, + result: int64((1 * time.Hour) / 6), + fail: true, + }, + } + + for i, test := range tests { + api := API{enableAutodownsampling: test.enableAutodownsampling} + v := url.Values{} + v.Set("max_source_resolution", test.maxSourceResolutionParam) + r := http.Request{PostForm: v} + + maxResMillis, _ := api.parseDownsamplingParamMillis(&r, test.step) + if test.fail == false { + testutil.Assert(t, maxResMillis == test.result, "case %v: expected %v to be equal to %v", i, maxResMillis, test.result) + } else { + testutil.Assert(t, maxResMillis != test.result, "case %v: expected %v not to be equal to %v", i, maxResMillis, test.result) + } + + } +} diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 07dc3ec71b..5fcd26ee97 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -5,8 +5,6 @@ import ( "sort" "strings" - "time" - "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/store/storepb" "github.com/improbable-eng/thanos/pkg/tracing" @@ -24,19 +22,19 @@ type WarningReporter func(error) // QueryableCreator returns implementation of promql.Queryable that fetches data from the proxy store API endpoints. // If deduplication is enabled, all data retrieved from it will be deduplicated along the replicaLabel by default. -// maxSourceResolution controls downsampling resolution that is allowed. +// maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds). // partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behaviour of proxy. -type QueryableCreator func(deduplicate bool, maxSourceResolution time.Duration, partialResponse bool, r WarningReporter) storage.Queryable +type QueryableCreator func(deduplicate bool, maxResolutionMillis int64, partialResponse bool, r WarningReporter) storage.Queryable // NewQueryableCreator creates QueryableCreator. func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer, replicaLabel string) QueryableCreator { - return func(deduplicate bool, maxSourceResolution time.Duration, partialResponse bool, r WarningReporter) storage.Queryable { + return func(deduplicate bool, maxResolutionMillis int64, partialResponse bool, r WarningReporter) storage.Queryable { return &queryable{ logger: logger, replicaLabel: replicaLabel, proxy: proxy, deduplicate: deduplicate, - maxSourceResolution: maxSourceResolution, + maxResolutionMillis: maxResolutionMillis, partialResponse: partialResponse, warningReporter: r, } @@ -48,14 +46,14 @@ type queryable struct { replicaLabel string proxy storepb.StoreServer deduplicate bool - maxSourceResolution time.Duration + maxResolutionMillis int64 partialResponse bool warningReporter WarningReporter } // Querier returns a new storage querier against the underlying proxy store API. func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabel, q.proxy, q.deduplicate, int64(q.maxSourceResolution/time.Millisecond), q.partialResponse, q.warningReporter), nil + return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabel, q.proxy, q.deduplicate, int64(q.maxResolutionMillis), q.partialResponse, q.warningReporter), nil } type querier struct { @@ -66,7 +64,7 @@ type querier struct { replicaLabel string proxy storepb.StoreServer deduplicate bool - maxSourceResolution int64 + maxResolutionMillis int64 partialResponse bool warningReporter WarningReporter } @@ -80,7 +78,7 @@ func newQuerier( replicaLabel string, proxy storepb.StoreServer, deduplicate bool, - maxSourceResolution int64, + maxResolutionMillis int64, partialResponse bool, warningReporter WarningReporter, ) *querier { @@ -100,7 +98,7 @@ func newQuerier( replicaLabel: replicaLabel, proxy: proxy, deduplicate: deduplicate, - maxSourceResolution: maxSourceResolution, + maxResolutionMillis: maxResolutionMillis, partialResponse: partialResponse, warningReporter: warningReporter, } @@ -185,7 +183,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s MinTime: q.mint, MaxTime: q.maxt, Matchers: sms, - MaxResolutionWindow: q.maxSourceResolution, + MaxResolutionWindow: q.maxResolutionMillis, Aggregates: queryAggrs, PartialResponseDisabled: !q.partialResponse, }, resp); err != nil { diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index 980d837213..9a7999e384 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -19,6 +19,25 @@ import ( "github.com/prometheus/tsdb/chunkenc" ) +func TestQueryableCreator_MaxResolution(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + testProxy := &storeServer{resps: []*storepb.SeriesResponse{}} + queryableCreator := NewQueryableCreator(nil, testProxy, "test") + + oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond) + queryable := queryableCreator(false, oneHourMillis, false, func(err error) {}) + + q, err := queryable.Querier(context.Background(), 0, 42) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, q.Close()) }() + + querierActual, ok := q.(*querier) + + testutil.Assert(t, ok == true, "expected it to be a querier") + testutil.Assert(t, querierActual.maxResolutionMillis == oneHourMillis, "expected max source resolution to be 1 hour in milliseconds") + +} + func TestQuerier_Series(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 3823d84225..a4e1ef734e 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -675,7 +675,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag // labels and resolution. This is important because we allow mixed resolution results, so it is quite crucial // to be aware what exactly resolution we see on query. // TODO(bplotka): Consider adding resolution label to all results to propagate that info to UI and Query API. -func debugFoundBlockSetOverview(logger log.Logger, mint, maxt int64, lset labels.Labels, bs []*bucketBlock) { +func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMillis int64, lset labels.Labels, bs []*bucketBlock) { if len(bs) == 0 { level.Debug(logger).Log("msg", "No block found", "mint", mint, "maxt", maxt, "lset", lset.String()) return @@ -703,7 +703,7 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt int64, lset labels parts = append(parts, fmt.Sprintf("Range: %d-%d Resolution: %d", currMin, currMax, currRes)) - level.Debug(logger).Log("msg", "Blocks source resolutions", "blocks", len(bs), "mint", mint, "maxt", maxt, "lset", lset.String(), "spans", strings.Join(parts, "\n")) + level.Debug(logger).Log("msg", "Blocks source resolutions", "blocks", len(bs), "Maximum Resolution", maxResolutionMillis, "mint", mint, "maxt", maxt, "lset", lset.String(), "spans", strings.Join(parts, "\n")) } // Series implements the storepb.StoreServer interface. @@ -738,7 +738,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie blocks := bs.getFor(req.MinTime, req.MaxTime, req.MaxResolutionWindow) if s.debugLogging { - debugFoundBlockSetOverview(s.logger, req.MinTime, req.MaxTime, bs.labels, blocks) + debugFoundBlockSetOverview(s.logger, req.MinTime, req.MaxTime, req.MaxResolutionWindow, bs.labels, blocks) } for _, b := range blocks { @@ -934,7 +934,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR type bucketBlockSet struct { labels labels.Labels mtx sync.RWMutex - resolutions []int64 // available resolution, high to low + resolutions []int64 // available resolution, high to low (in milliseconds) blocks [][]*bucketBlock // ordered buckets for the existing resolutions } @@ -996,8 +996,8 @@ func int64index(s []int64, x int64) int { } // getFor returns a time-ordered list of blocks that cover date between mint and maxt. -// Blocks with the lowest resolution possible but not lower than the given resolution are returned. -func (s *bucketBlockSet) getFor(mint, maxt, minResolution int64) (bs []*bucketBlock) { +// Blocks with the biggest resolution possible but not bigger than the given max resolution are returned. +func (s *bucketBlockSet) getFor(mint, maxt, maxResolutionMillis int64) (bs []*bucketBlock) { if mint == maxt { return nil } @@ -1007,10 +1007,12 @@ func (s *bucketBlockSet) getFor(mint, maxt, minResolution int64) (bs []*bucketBl // Find first matching resolution. i := 0 - for ; i < len(s.resolutions) && s.resolutions[i] > minResolution; i++ { + for ; i < len(s.resolutions) && s.resolutions[i] > maxResolutionMillis; i++ { } - // Base case, we fill the given interval with the closest resolution. + // Fill the given interval with the blocks for the current resolution. + // Our current resolution might not cover all data, so recursively fill the gaps with higher resolution blocks if there is any. + start := mint for _, b := range s.blocks[i] { if b.meta.MaxTime <= mint { continue @@ -1018,22 +1020,18 @@ func (s *bucketBlockSet) getFor(mint, maxt, minResolution int64) (bs []*bucketBl if b.meta.MinTime >= maxt { break } + + if i+1 < len(s.resolutions) { + bs = append(bs, s.getFor(start, b.meta.MinTime, s.resolutions[i+1])...) + } bs = append(bs, b) + start = b.meta.MaxTime } - // Our current resolution might not cover all data, recursively fill the gaps at the start - // and end of [mint, maxt] with higher resolution blocks. - i++ - // No higher resolution left, we are done. - if i >= len(s.resolutions) { - return bs - } - if len(bs) == 0 { - return s.getFor(mint, maxt, s.resolutions[i]) - } - left := s.getFor(mint, bs[0].meta.MinTime, s.resolutions[i]) - right := s.getFor(bs[len(bs)-1].meta.MaxTime, maxt, s.resolutions[i]) - return append(left, append(bs, right...)...) + if i+1 < len(s.resolutions) { + bs = append(bs, s.getFor(start, maxt, s.resolutions[i+1])...) + } + return bs } // labelMatchers verifies whether the block set matches the given matchers and returns a new diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 4e9e581014..d5f436a65b 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -12,10 +12,150 @@ import ( "github.com/improbable-eng/thanos/pkg/compact/downsample" "github.com/improbable-eng/thanos/pkg/store/storepb" "github.com/improbable-eng/thanos/pkg/testutil" + "github.com/leanovate/gopter" + "github.com/leanovate/gopter/gen" + "github.com/leanovate/gopter/prop" "github.com/oklog/ulid" "github.com/prometheus/tsdb/labels" ) +func TestBucketBlock_Property(t *testing.T) { + parameters := gopter.DefaultTestParameters() + parameters.Rng.Seed(2000) + parameters.MinSuccessfulTests = 20000 + properties := gopter.NewProperties(parameters) + + set := newBucketBlockSet(labels.Labels{}) + + type resBlock struct { + mint, maxt int64 + window int64 + } + // This input resembles a typical production-level block layout + // in remote object storage. + input := []resBlock{ + {window: downsample.ResLevel0, mint: 0, maxt: 100}, + {window: downsample.ResLevel0, mint: 100, maxt: 200}, + // Compaction level 2 begins but not downsampling (8 hour block length) + {window: downsample.ResLevel0, mint: 200, maxt: 600}, + {window: downsample.ResLevel0, mint: 600, maxt: 1000}, + // Compaction level 3 begins, Some of it is downsampled but still retained (48 hour block length) + {window: downsample.ResLevel0, mint: 1000, maxt: 1750}, + {window: downsample.ResLevel1, mint: 1000, maxt: 1750}, + // Compaction level 4 begins, different downsampling levels cover the same (336 hour block length) + {window: downsample.ResLevel0, mint: 1750, maxt: 7000}, + {window: downsample.ResLevel1, mint: 1750, maxt: 7000}, + {window: downsample.ResLevel2, mint: 1750, maxt: 7000}, + // Compaction level 4 already happened, raw samples have been deleted + {window: downsample.ResLevel0, mint: 7000, maxt: 14000}, + {window: downsample.ResLevel1, mint: 7000, maxt: 14000}, + // Compaction level 4 already happened, raw and downsample res level 1 samples have been deleted + {window: downsample.ResLevel2, mint: 14000, maxt: 21000}, + } + + for _, in := range input { + var m metadata.Meta + m.Thanos.Downsample.Resolution = in.window + m.MinTime = in.mint + m.MaxTime = in.maxt + + testutil.Ok(t, set.add(&bucketBlock{meta: &m})) + } + + properties.Property("getFor always gets at least some data in range", prop.ForAllNoShrink( + func(low, high, maxResolution int64) bool { + // Bogus case. + if low >= high { + return true + } + + res := set.getFor(low, high, maxResolution) + + // The data that we get must all encompass our requested range + if len(res) == 1 && (res[0].meta.Thanos.Downsample.Resolution > maxResolution || + res[0].meta.MinTime > low) { + return false + } else if len(res) > 1 { + mint := int64(21001) + maxt := int64(0) + for i := 0; i < len(res)-1; i++ { + if res[i].meta.Thanos.Downsample.Resolution > maxResolution { + return false + } + if res[i+1].meta.MinTime != res[i].meta.MaxTime { + return false + } + if res[i].meta.MinTime < mint { + mint = res[i].meta.MinTime + } + if res[i].meta.MaxTime > maxt { + maxt = res[i].meta.MaxTime + } + } + if res[len(res)-1].meta.MinTime < mint { + mint = res[len(res)-1].meta.MinTime + } + if res[len(res)-1].meta.MaxTime > maxt { + maxt = res[len(res)-1].meta.MaxTime + } + if low < mint { + return false + } + + } + return true + }, gen.Int64Range(0, 21000), gen.Int64Range(0, 21000), gen.Int64Range(0, 60*60*1000)), + ) + + properties.Property("getFor always gets all data in range", prop.ForAllNoShrink( + func(low, high int64) bool { + // Bogus case. + if low >= high { + return true + } + + maxResolution := downsample.ResLevel2 + res := set.getFor(low, high, maxResolution) + + // The data that we get must all encompass our requested range + if len(res) == 1 && (res[0].meta.Thanos.Downsample.Resolution > maxResolution || + res[0].meta.MinTime > low || res[0].meta.MaxTime < high) { + return false + } else if len(res) > 1 { + mint := int64(21001) + maxt := int64(0) + for i := 0; i < len(res)-1; i++ { + if res[i+1].meta.MinTime != res[i].meta.MaxTime { + return false + } + if res[i].meta.MinTime < mint { + mint = res[i].meta.MinTime + } + if res[i].meta.MaxTime > maxt { + maxt = res[i].meta.MaxTime + } + } + if res[len(res)-1].meta.MinTime < mint { + mint = res[len(res)-1].meta.MinTime + } + if res[len(res)-1].meta.MaxTime > maxt { + maxt = res[len(res)-1].meta.MaxTime + } + if low < mint { + return false + } + if high > maxt { + return false + } + + } + return true + }, gen.Int64Range(0, 21000), gen.Int64Range(0, 21000)), + ) + + properties.TestingRun(t) +} + func TestBucketBlockSet_addGet(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() @@ -53,13 +193,13 @@ func TestBucketBlockSet_addGet(t *testing.T) { cases := []struct { mint, maxt int64 - minResolution int64 + maxResolution int64 res []resBlock }{ { mint: -100, maxt: 1000, - minResolution: 0, + maxResolution: 0, res: []resBlock{ {window: downsample.ResLevel0, mint: 0, maxt: 100}, {window: downsample.ResLevel0, mint: 100, maxt: 200}, @@ -70,7 +210,7 @@ func TestBucketBlockSet_addGet(t *testing.T) { }, { mint: 100, maxt: 400, - minResolution: downsample.ResLevel1 - 1, + maxResolution: downsample.ResLevel1 - 1, res: []resBlock{ {window: downsample.ResLevel0, mint: 100, maxt: 200}, {window: downsample.ResLevel0, mint: 200, maxt: 300}, @@ -79,7 +219,7 @@ func TestBucketBlockSet_addGet(t *testing.T) { }, { mint: 100, maxt: 500, - minResolution: downsample.ResLevel1, + maxResolution: downsample.ResLevel1, res: []resBlock{ {window: downsample.ResLevel1, mint: 100, maxt: 200}, {window: downsample.ResLevel1, mint: 200, maxt: 300}, @@ -89,7 +229,7 @@ func TestBucketBlockSet_addGet(t *testing.T) { }, { mint: 0, maxt: 500, - minResolution: downsample.ResLevel2, + maxResolution: downsample.ResLevel2, res: []resBlock{ {window: downsample.ResLevel1, mint: 0, maxt: 100}, {window: downsample.ResLevel2, mint: 100, maxt: 200}, @@ -110,7 +250,7 @@ func TestBucketBlockSet_addGet(t *testing.T) { m.MaxTime = b.maxt exp = append(exp, &bucketBlock{meta: &m}) } - res := set.getFor(c.mint, c.maxt, c.minResolution) + res := set.getFor(c.mint, c.maxt, c.maxResolution) testutil.Equals(t, exp, res) } }