Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/bucket: make getFor() work with interleaved resolutions #1146

Merged
merged 25 commits into from
May 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
9c94918
store/bucket_test: add interleaved resolutions test for getFor()
May 15, 2019
cd3363c
store/bucket: include blocks in the middle as well
May 15, 2019
00b2c7c
store/bucket: add test cases with duplicated time ranges
May 16, 2019
8f22504
query/querier: send proper maxSourceResolution
May 16, 2019
e2c46bf
README: add entry
May 16, 2019
342aede
query/querier_test: add queryableCreator test
May 16, 2019
4e73b2f
store/bucket: do the iteration without sorting
May 16, 2019
fae1929
store/bucket: bsi->j in loop
May 16, 2019
bf12553
Merge remote-tracking branch 'origin/master' into getFor
May 17, 2019
de4b79a
store/bucket: fix according to review comments
May 17, 2019
c3abb09
query/querier_test: fix test
May 17, 2019
f22cffa
*: clarify everywhere that max source resolution is in millis
May 17, 2019
5e9d0e9
*: maxSourceResolutionMillis -> maxResolutionMillis
May 17, 2019
420ebe0
CHANGELOG: update
May 17, 2019
e8b3189
query/querier_test: fix
May 17, 2019
bacfd06
store/bucket: add gets all data in range property test
May 17, 2019
93a764c
store/bucket_test: add production property test
May 17, 2019
f4b0a66
store/bucket_test: fix
May 17, 2019
dac133e
store/bucket_test: add always gets property
May 17, 2019
dbe3727
query/querier_test: do not shrink
May 17, 2019
6dac954
store/bucket: revert change
May 17, 2019
55aa32d
store/bucket_test: remove more confusion
May 18, 2019
3fe4217
store/bucket: clean up tests
May 22, 2019
3bbc6cb
Simplified goFor implementation.
bwplotka May 27, 2019
3602678
Merge pull request #1 from improbable-eng/getFor
May 27, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ We use *breaking* word for marking changes that are not backward compatible (rel

## Unreleased

### Fixed

- [#1146](https://github.com/improbable-eng/thanos/pull/1146) store/bucket: make getFor() work with interleaved resolutions

### Added

- [#1094](https://github.com/improbable-eng/thanos/pull/1094) Allow configuring the response header timeout for the S3 client.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/hashicorp/golang-lru v0.5.1
github.com/hashicorp/memberlist v0.1.3
github.com/julienschmidt/httprouter v1.1.0 // indirect
github.com/leanovate/gopter v0.2.4
github.com/lovoo/gcloud-opentracing v0.3.0
github.com/miekg/dns v1.1.8
github.com/minio/minio-go v0.0.0-20200511070425-f33eae714a28
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/leanovate/gopter v0.2.4 h1:U4YLBggDFhJdqQsG4Na2zX7joVTky9vHaj/AGEwSuXU=
github.com/leanovate/gopter v0.2.4/go.mod h1:gNcbPWNEWRe4lm+bycKqxUYoH5uoVje5SkOJ3uoLer8=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lightstep/lightstep-tracer-go v0.15.6/go.mod h1:6AMpwZpsyCFwSovxzM78e+AsYxE8sGwiM6C3TytaWeI=
github.com/lovoo/gcloud-opentracing v0.3.0 h1:nAeKG70rIsog0TelcEtt6KU0Y1s5qXtsDLnHp0urPLU=
Expand Down
8 changes: 4 additions & 4 deletions pkg/query/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,9 @@ func (api *API) parseEnableDedupParam(r *http.Request) (enableDeduplication bool
return enableDeduplication, nil
}

func (api *API) parseDownsamplingParam(r *http.Request, step time.Duration) (maxSourceResolution time.Duration, _ *ApiError) {
func (api *API) parseDownsamplingParamMillis(r *http.Request, step time.Duration) (maxResolutionMillis int64, _ *ApiError) {
const maxSourceResolutionParam = "max_source_resolution"
maxSourceResolution = 0 * time.Second
maxSourceResolution := 0 * time.Second

if api.enableAutodownsampling {
// If no max_source_resolution is specified fit at least 5 samples between steps.
Expand All @@ -223,7 +223,7 @@ func (api *API) parseDownsamplingParam(r *http.Request, step time.Duration) (max
return 0, &ApiError{errorBadData, errors.Errorf("negative '%s' is not accepted. Try a positive integer", maxSourceResolutionParam)}
}

return maxSourceResolution, nil
return int64(maxSourceResolution / time.Millisecond), nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

func (api *API) parsePartialResponseParam(r *http.Request) (enablePartialResponse bool, _ *ApiError) {
Expand Down Expand Up @@ -366,7 +366,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

maxSourceResolution, apiErr := api.parseDownsamplingParam(r, step)
maxSourceResolution, apiErr := api.parseDownsamplingParamMillis(r, step)
if apiErr != nil {
return nil, nil, apiErr
}
Expand Down
71 changes: 70 additions & 1 deletion pkg/query/api/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/compact"
"github.com/improbable-eng/thanos/pkg/query"
"github.com/improbable-eng/thanos/pkg/testutil"
opentracing "github.com/opentracing/opentracing-go"
Expand All @@ -42,7 +43,7 @@ import (
)

func testQueryableCreator(queryable storage.Queryable) query.QueryableCreator {
return func(_ bool, _ time.Duration, _ bool, _ query.WarningReporter) storage.Queryable {
return func(_ bool, _ int64, _ bool, _ query.WarningReporter) storage.Queryable {
return queryable
}
}
Expand Down Expand Up @@ -833,3 +834,71 @@ func BenchmarkQueryResultEncoding(b *testing.B) {
testutil.Ok(b, err)
fmt.Println(len(c))
}

func TestParseDownsamplingParamMillis(t *testing.T) {
var tests = []struct {
maxSourceResolutionParam string
result int64
step time.Duration
fail bool
enableAutodownsampling bool
}{
{
maxSourceResolutionParam: "0s",
enableAutodownsampling: false,
step: time.Hour,
result: int64(compact.ResolutionLevelRaw),
fail: false,
},
{
maxSourceResolutionParam: "5m",
step: time.Hour,
enableAutodownsampling: false,
result: int64(compact.ResolutionLevel5m),
fail: false,
},
{
maxSourceResolutionParam: "1h",
step: time.Hour,
enableAutodownsampling: false,
result: int64(compact.ResolutionLevel1h),
fail: false,
},
{
maxSourceResolutionParam: "",
enableAutodownsampling: true,
step: time.Hour,
result: int64(time.Hour / (5 * 1000 * 1000)),
fail: false,
},
{
maxSourceResolutionParam: "",
enableAutodownsampling: true,
step: time.Hour,
result: int64((1 * time.Hour) / 6),
fail: true,
},
{
maxSourceResolutionParam: "",
enableAutodownsampling: true,
step: time.Hour,
result: int64((1 * time.Hour) / 6),
fail: true,
},
}

for i, test := range tests {
api := API{enableAutodownsampling: test.enableAutodownsampling}
v := url.Values{}
v.Set("max_source_resolution", test.maxSourceResolutionParam)
r := http.Request{PostForm: v}

maxResMillis, _ := api.parseDownsamplingParamMillis(&r, test.step)
if test.fail == false {
testutil.Assert(t, maxResMillis == test.result, "case %v: expected %v to be equal to %v", i, maxResMillis, test.result)
} else {
testutil.Assert(t, maxResMillis != test.result, "case %v: expected %v not to be equal to %v", i, maxResMillis, test.result)
}

}
}
22 changes: 10 additions & 12 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"sort"
"strings"

"time"

"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/store/storepb"
"github.com/improbable-eng/thanos/pkg/tracing"
Expand All @@ -24,19 +22,19 @@ type WarningReporter func(error)

// QueryableCreator returns implementation of promql.Queryable that fetches data from the proxy store API endpoints.
// If deduplication is enabled, all data retrieved from it will be deduplicated along the replicaLabel by default.
// maxSourceResolution controls downsampling resolution that is allowed.
// maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds).
// partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behaviour of proxy.
type QueryableCreator func(deduplicate bool, maxSourceResolution time.Duration, partialResponse bool, r WarningReporter) storage.Queryable
type QueryableCreator func(deduplicate bool, maxResolutionMillis int64, partialResponse bool, r WarningReporter) storage.Queryable

// NewQueryableCreator creates QueryableCreator.
func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer, replicaLabel string) QueryableCreator {
return func(deduplicate bool, maxSourceResolution time.Duration, partialResponse bool, r WarningReporter) storage.Queryable {
return func(deduplicate bool, maxResolutionMillis int64, partialResponse bool, r WarningReporter) storage.Queryable {
return &queryable{
logger: logger,
replicaLabel: replicaLabel,
proxy: proxy,
deduplicate: deduplicate,
maxSourceResolution: maxSourceResolution,
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
warningReporter: r,
}
Expand All @@ -48,14 +46,14 @@ type queryable struct {
replicaLabel string
proxy storepb.StoreServer
deduplicate bool
maxSourceResolution time.Duration
maxResolutionMillis int64
partialResponse bool
warningReporter WarningReporter
}

// Querier returns a new storage querier against the underlying proxy store API.
func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabel, q.proxy, q.deduplicate, int64(q.maxSourceResolution/time.Millisecond), q.partialResponse, q.warningReporter), nil
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabel, q.proxy, q.deduplicate, int64(q.maxResolutionMillis), q.partialResponse, q.warningReporter), nil
}

type querier struct {
Expand All @@ -66,7 +64,7 @@ type querier struct {
replicaLabel string
proxy storepb.StoreServer
deduplicate bool
maxSourceResolution int64
maxResolutionMillis int64
partialResponse bool
warningReporter WarningReporter
}
Expand All @@ -80,7 +78,7 @@ func newQuerier(
replicaLabel string,
proxy storepb.StoreServer,
deduplicate bool,
maxSourceResolution int64,
maxResolutionMillis int64,
partialResponse bool,
warningReporter WarningReporter,
) *querier {
Expand All @@ -100,7 +98,7 @@ func newQuerier(
replicaLabel: replicaLabel,
proxy: proxy,
deduplicate: deduplicate,
maxSourceResolution: maxSourceResolution,
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
warningReporter: warningReporter,
}
Expand Down Expand Up @@ -185,7 +183,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
MinTime: q.mint,
MaxTime: q.maxt,
Matchers: sms,
MaxResolutionWindow: q.maxSourceResolution,
MaxResolutionWindow: q.maxResolutionMillis,
Aggregates: queryAggrs,
PartialResponseDisabled: !q.partialResponse,
}, resp); err != nil {
Expand Down
19 changes: 19 additions & 0 deletions pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,25 @@ import (
"github.com/prometheus/tsdb/chunkenc"
)

func TestQueryableCreator_MaxResolution(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()
testProxy := &storeServer{resps: []*storepb.SeriesResponse{}}
queryableCreator := NewQueryableCreator(nil, testProxy, "test")

oneHourMillis := int64(1*time.Hour) / int64(time.Millisecond)
queryable := queryableCreator(false, oneHourMillis, false, func(err error) {})

q, err := queryable.Querier(context.Background(), 0, 42)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, q.Close()) }()

querierActual, ok := q.(*querier)

testutil.Assert(t, ok == true, "expected it to be a querier")
testutil.Assert(t, querierActual.maxResolutionMillis == oneHourMillis, "expected max source resolution to be 1 hour in milliseconds")

}

func TestQuerier_Series(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

Expand Down
40 changes: 19 additions & 21 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag
// labels and resolution. This is important because we allow mixed resolution results, so it is quite crucial
// to be aware what exactly resolution we see on query.
// TODO(bplotka): Consider adding resolution label to all results to propagate that info to UI and Query API.
func debugFoundBlockSetOverview(logger log.Logger, mint, maxt int64, lset labels.Labels, bs []*bucketBlock) {
func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMillis int64, lset labels.Labels, bs []*bucketBlock) {
if len(bs) == 0 {
level.Debug(logger).Log("msg", "No block found", "mint", mint, "maxt", maxt, "lset", lset.String())
return
Expand Down Expand Up @@ -703,7 +703,7 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt int64, lset labels

parts = append(parts, fmt.Sprintf("Range: %d-%d Resolution: %d", currMin, currMax, currRes))

level.Debug(logger).Log("msg", "Blocks source resolutions", "blocks", len(bs), "mint", mint, "maxt", maxt, "lset", lset.String(), "spans", strings.Join(parts, "\n"))
level.Debug(logger).Log("msg", "Blocks source resolutions", "blocks", len(bs), "Maximum Resolution", maxResolutionMillis, "mint", mint, "maxt", maxt, "lset", lset.String(), "spans", strings.Join(parts, "\n"))
}

// Series implements the storepb.StoreServer interface.
Expand Down Expand Up @@ -738,7 +738,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
blocks := bs.getFor(req.MinTime, req.MaxTime, req.MaxResolutionWindow)

if s.debugLogging {
debugFoundBlockSetOverview(s.logger, req.MinTime, req.MaxTime, bs.labels, blocks)
debugFoundBlockSetOverview(s.logger, req.MinTime, req.MaxTime, req.MaxResolutionWindow, bs.labels, blocks)
}

for _, b := range blocks {
Expand Down Expand Up @@ -934,7 +934,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
type bucketBlockSet struct {
labels labels.Labels
mtx sync.RWMutex
resolutions []int64 // available resolution, high to low
resolutions []int64 // available resolution, high to low (in milliseconds)
blocks [][]*bucketBlock // ordered buckets for the existing resolutions
}

Expand Down Expand Up @@ -996,8 +996,8 @@ func int64index(s []int64, x int64) int {
}

// getFor returns a time-ordered list of blocks that cover date between mint and maxt.
// Blocks with the lowest resolution possible but not lower than the given resolution are returned.
func (s *bucketBlockSet) getFor(mint, maxt, minResolution int64) (bs []*bucketBlock) {
// Blocks with the biggest resolution possible but not bigger than the given max resolution are returned.
func (s *bucketBlockSet) getFor(mint, maxt, maxResolutionMillis int64) (bs []*bucketBlock) {
if mint == maxt {
return nil
}
Expand All @@ -1007,33 +1007,31 @@ func (s *bucketBlockSet) getFor(mint, maxt, minResolution int64) (bs []*bucketBl

// Find first matching resolution.
i := 0
for ; i < len(s.resolutions) && s.resolutions[i] > minResolution; i++ {
for ; i < len(s.resolutions) && s.resolutions[i] > maxResolutionMillis; i++ {
}

// Base case, we fill the given interval with the closest resolution.
// Fill the given interval with the blocks for the current resolution.
// Our current resolution might not cover all data, so recursively fill the gaps with higher resolution blocks if there is any.
start := mint
for _, b := range s.blocks[i] {
if b.meta.MaxTime <= mint {
continue
}
if b.meta.MinTime >= maxt {
break
}

if i+1 < len(s.resolutions) {
bs = append(bs, s.getFor(start, b.meta.MinTime, s.resolutions[i+1])...)
}
bs = append(bs, b)
start = b.meta.MaxTime
}
// Our current resolution might not cover all data, recursively fill the gaps at the start
// and end of [mint, maxt] with higher resolution blocks.
i++
// No higher resolution left, we are done.
if i >= len(s.resolutions) {
return bs
}
if len(bs) == 0 {
return s.getFor(mint, maxt, s.resolutions[i])
}
left := s.getFor(mint, bs[0].meta.MinTime, s.resolutions[i])
right := s.getFor(bs[len(bs)-1].meta.MaxTime, maxt, s.resolutions[i])

return append(left, append(bs, right...)...)
if i+1 < len(s.resolutions) {
bs = append(bs, s.getFor(start, maxt, s.resolutions[i+1])...)
}
return bs
}

// labelMatchers verifies whether the block set matches the given matchers and returns a new
Expand Down
Loading