diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index d87ff2c89f..79e56e964e 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -11,6 +11,7 @@ import ( opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store" @@ -49,7 +50,19 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage."). Default("20").Int() + minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store serves only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). + Default("0000-01-01T00:00:00Z")) + + maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Thanos Store serves only blocks, which happened eariler than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). + Default("9999-12-31T23:59:59Z")) + m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error { + // Sanity check Time filters + if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() { + return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'", + minTime, maxTime) + } + return runStore(g, logger, reg, @@ -69,6 +82,10 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string debugLogging, *syncInterval, *blockSyncConcurrency, + &store.FilterConfig{ + MinTime: *minTime, + MaxTime: *maxTime, + }, ) } } @@ -94,6 +111,7 @@ func runStore( verbose bool, syncInterval time.Duration, blockSyncConcurrency int, + blockFilterConf *store.FilterConfig, ) error { { confContentYaml, err := objStoreConfig.Content() @@ -135,6 +153,7 @@ func runStore( maxConcurrent, verbose, blockSyncConcurrency, + blockFilterConf, ) if err != nil { return errors.Wrap(err, "create object storage store") diff --git a/docs/components/store.md b/docs/components/store.md index 3bc0c1e4ef..3b45755b36 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -84,5 +84,60 @@ Flags: --block-sync-concurrency=20 Number of goroutines to use when syncing blocks from object storage. + --min-block-start-time=0000-01-01T00:00:00Z + Start of time range limit to serve. Thanos + Store serves only blocks, which have start time + greater than this value. Option can be a + constant time in RFC3339 format or time + duration relative to current time, such as + -1.5d or 2h45m. Valid duration units are ms, s, + m, h, d, w, y. + --max-block-start-time=9999-12-31T23:59:59Z + End of time range limit to serve. Thanos Store + serves only blocks, which have start time is + less than this value. Option can be a constant + time in RFC3339 format or time duration + relative to current time, such as -1.5d or + 2h45m. Valid duration units are ms, s, m, h, d, + w, y. + --min-block-end-time=0000-01-01T00:00:00Z + Start of time range limit to serve. Thanos + Store serves only blocks, which have end time + greater than this value. Option can be a + constant time in RFC3339 format or time + duration relative to current time, such as + -1.5d or 2h45m. Valid duration units are ms, s, + m, h, d, w, y. + --max-block-end-time=9999-12-31T23:59:59Z + End of time range limit to serve. Thanos Store + serves only blocks, which have end time is less + than this value. Option can be a constant time + in RFC3339 format or time duration relative to + current time, such as -1.5d or 2h45m. Valid + duration units are ms, s, m, h, d, w, y. ``` + +## Time & Duration based partioning + +By default Thanos Store Gateway looks at all the data in Object Store and returns it based on query's time range. + +There is a block syncing job, which synchronizes local state with remote storage. You can configure how often it runs via `--sync-block-duration=3m`. In most cases default should work well. + + +Recently Thanos Store introduced `--min-block-start-time`, `--max-block-start-time`, `--min-block-end-time`,`--max-block-end-time` flags, that allows you to shard Thanos Store based on constant time or duration relative to current time. +The `{min,max}-block-start-time` options only look at block's start time and `{min,max}-block-end-time` only at the end time. + +For example setting: `--min-block-start-time=-6w` & `--max-block-start-time==-2w` will make Thanos Store Gateway look at blocks that fall within `now - 6 weeks` up to `now - 2 weeks` time range. + +You can also set constant time in RFC3339 format. For example `--min-block-start-time=2018-01-01T00:00:00Z`, `--max-block-start-time=2019-01-01T23:59:59Z`. + +The block filtering is done in Thanos Store's syncing job, which adds some delay, so Thanos Store might not see new blocks or filter out blocks immediately. + +We recommend having overlapping time ranges with Thanos Sidecar and other Thanos Store gateways as this improves your resiliency to failures. +A lot of Object Store implementations provide eventual read-after-write consistency, which means that Thanos Store won't immediately see newly created & uploaded blocks. +Also Thanos Sidecar might fail to upload new blocks to Object Store due to network timeouts or Object Store downtime, so if your time ranges are too strict, you won't be able to query the data. + +Thanos Querier deals with overlapping time series by merging them together. It's important to note, that by having more overlapping time series Thanos Querier will use more resources, like CPU, network & memory, as it will have to pull down all the overlapping blocks and merge them. + +When configuring time partitioning keep in mind Thanos Compaction, as it builds bigger blocks and removes data based on your retention policies. diff --git a/pkg/model/timeduration.go b/pkg/model/timeduration.go new file mode 100644 index 0000000000..bbe766043f --- /dev/null +++ b/pkg/model/timeduration.go @@ -0,0 +1,75 @@ +package model + +import ( + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/timestamp" + "gopkg.in/alecthomas/kingpin.v2" +) + +// TimeOrDurationValue is a custom kingping parser for time in RFC3339 +// or duration in Go's duration format, such as "300ms", "-1.5h" or "2h45m". +// Only one will be set. +type TimeOrDurationValue struct { + Time *time.Time + Dur *model.Duration +} + +// Set converts string to TimeOrDurationValue. +func (tdv *TimeOrDurationValue) Set(s string) error { + t, err := time.Parse(time.RFC3339, s) + if err == nil { + tdv.Time = &t + return nil + } + + // error parsing time, let's try duration. + var minus bool + if s[0] == '-' { + minus = true + s = s[1:] + } + dur, err := model.ParseDuration(s) + if err != nil { + return err + } + + if minus { + dur = dur * -1 + } + tdv.Dur = &dur + return nil +} + +// String returns either tume or duration. +func (tdv *TimeOrDurationValue) String() string { + switch { + case tdv.Time != nil: + return tdv.Time.String() + case tdv.Dur != nil: + return tdv.Dur.String() + } + + return "nil" +} + +// PrometheusTimestamp returns TimeOrDurationValue converted to PrometheusTimestamp +// if duration is set now+duration is converted to Timestamp. +func (tdv *TimeOrDurationValue) PrometheusTimestamp() int64 { + switch { + case tdv.Time != nil: + return timestamp.FromTime(*tdv.Time) + case tdv.Dur != nil: + return timestamp.FromTime(time.Now().Add(time.Duration(*tdv.Dur))) + } + + return 0 +} + +// TimeOrDuration helper for parsing TimeOrDuration with kingpin. +func TimeOrDuration(flags *kingpin.FlagClause) *TimeOrDurationValue { + value := new(TimeOrDurationValue) + flags.SetValue(value) + return value +} diff --git a/pkg/model/timeduration_test.go b/pkg/model/timeduration_test.go new file mode 100644 index 0000000000..d3d3eba271 --- /dev/null +++ b/pkg/model/timeduration_test.go @@ -0,0 +1,36 @@ +package model_test + +import ( + "testing" + "time" + + "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/thanos-io/thanos/pkg/model" + "github.com/thanos-io/thanos/pkg/testutil" + "gopkg.in/alecthomas/kingpin.v2" +) + +func TestTimeOrDurationValue(t *testing.T) { + cmd := kingpin.New("test", "test") + + minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve")) + + maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve"). + Default("9999-12-31T23:59:59Z")) + + _, err := cmd.Parse([]string{"--min-time", "10s"}) + if err != nil { + t.Fatal(err) + } + + testutil.Equals(t, "10s", minTime.String()) + testutil.Equals(t, "9999-12-31 23:59:59 +0000 UTC", maxTime.String()) + + prevTime := timestamp.FromTime(time.Now()) + afterTime := timestamp.FromTime(time.Now().Add(15 * time.Second)) + + testutil.Assert(t, minTime.PrometheusTimestamp() > prevTime, "minTime prometheus timestamp is less than time now.") + testutil.Assert(t, minTime.PrometheusTimestamp() < afterTime, "minTime prometheus timestamp is more than time now + 15s") + + testutil.Assert(t, 253402300799000 == maxTime.PrometheusTimestamp(), "maxTime is not equal to 253402300799000") +} diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 47965cd21a..3efa393619 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -32,6 +32,7 @@ import ( "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/pool" "github.com/thanos-io/thanos/pkg/runutil" @@ -182,6 +183,11 @@ type indexCache interface { Series(b ulid.ULID, id uint64) ([]byte, bool) } +// FilterConfig is a configiration, which Store uses for filtering metrics. +type FilterConfig struct { + MinTime, MaxTime model.TimeOrDurationValue +} + // BucketStore implements the store API backed by a bucket. It loads all index // files to local disk. type BucketStore struct { @@ -208,6 +214,8 @@ type BucketStore struct { // samplesLimiter limits the number of samples per each Series() call. samplesLimiter *Limiter partitioner partitioner + + filterConfig *FilterConfig } // NewBucketStore creates a new bucket backed store that implements the store API against @@ -223,6 +231,7 @@ func NewBucketStore( maxConcurrent int, debugLogging bool, blockSyncConcurrency int, + filterConf *FilterConfig, ) (*BucketStore, error) { if logger == nil { logger = log.NewNopLogger() @@ -242,6 +251,7 @@ func NewBucketStore( metrics := newBucketStoreMetrics(reg) s := &BucketStore{ logger: logger, + metrics: metrics, bucket: bucket, dir: dir, indexCache: indexCache, @@ -256,8 +266,8 @@ func NewBucketStore( ), samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped), partitioner: gapBasedPartitioner{maxGapSize: maxGapSize}, + filterConfig: filterConf, } - s.metrics = metrics if err := os.MkdirAll(dir, 0777); err != nil { return nil, errors.Wrap(err, "create dir") @@ -309,6 +319,17 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { if err != nil { return nil } + + inRange, err := s.isBlockInMinMaxRange(ctx, id) + if err != nil { + level.Warn(s.logger).Log("msg", "error parsing block range", "block", id, "err", err) + return nil + } + + if !inRange { + return nil + } + allIDs[id] = struct{}{} if b := s.getBlock(id); b != nil { @@ -377,6 +398,31 @@ func (s *BucketStore) numBlocks() int { return len(s.blocks) } +func (s *BucketStore) isBlockInMinMaxRange(ctx context.Context, id ulid.ULID) (bool, error) { + dir := filepath.Join(s.dir, id.String()) + + b := &bucketBlock{ + logger: s.logger, + bucket: s.bucket, + id: id, + dir: dir, + } + if err := b.loadMeta(ctx, id); err != nil { + return false, err + } + + // We check for blocks in configured minTime, maxTime range + switch { + case b.meta.MaxTime <= s.filterConfig.MinTime.PrometheusTimestamp(): + return false, nil + + case b.meta.MinTime >= s.filterConfig.MaxTime.PrometheusTimestamp(): + return false, nil + } + + return true, nil +} + func (s *BucketStore) getBlock(id ulid.ULID) *bucketBlock { s.mtx.RLock() defer s.mtx.RUnlock() @@ -409,6 +455,7 @@ func (s *BucketStore) addBlock(ctx context.Context, id ulid.ULID) (err error) { if err != nil { return errors.Wrap(err, "new bucket block") } + s.mtx.Lock() defer s.mtx.Unlock() @@ -468,9 +515,33 @@ func (s *BucketStore) TimeRange() (mint, maxt int64) { maxt = b.meta.MaxTime } } + + mint = s.normalizeMinTime(mint) + maxt = s.normalizeMaxTime(maxt) + return mint, maxt } +func (s *BucketStore) normalizeMinTime(mint int64) int64 { + filterMinTime := s.filterConfig.MinTime.PrometheusTimestamp() + + if mint < filterMinTime { + return filterMinTime + } + + return mint +} + +func (s *BucketStore) normalizeMaxTime(maxt int64) int64 { + filterMaxTime := s.filterConfig.MaxTime.PrometheusTimestamp() + + if maxt > filterMaxTime { + maxt = filterMaxTime + } + + return maxt +} + // Info implements the storepb.StoreServer interface. func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.InfoResponse, error) { mint, maxt := s.TimeRange() @@ -730,11 +801,16 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie ) s.mtx.RLock() + // Adjust Request MinTime based on filters. + req.MinTime = s.normalizeMinTime(req.MinTime) + req.MaxTime = s.normalizeMaxTime(req.MaxTime) + for _, bs := range s.blockSets { blockMatchers, ok := bs.labelMatchers(matchers...) if !ok { continue } + blocks := bs.getFor(req.MinTime, req.MaxTime, req.MaxResolutionWindow) if s.debugLogging { diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 2185e0d161..7c9721a724 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -9,14 +9,14 @@ import ( "testing" "time" - "github.com/oklog/ulid" - "github.com/go-kit/kit/log" + "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/tsdb/labels" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/objtesting" "github.com/thanos-io/thanos/pkg/runutil" @@ -25,6 +25,19 @@ import ( "github.com/thanos-io/thanos/pkg/testutil" ) +var ( + minTime = time.Unix(0, 0) + maxTime, _ = time.Parse(time.RFC3339, "9999-12-31T23:59:59Z") + minTimeDuration = model.TimeOrDurationValue{Time: &minTime} + maxTimeDuration = model.TimeOrDurationValue{Time: &maxTime} + blockFilterConf = &BlockFilterConfig{ + MinBlockStartTime: minTimeDuration, + MaxBlockStartTime: maxTimeDuration, + MinBlockEndTime: minTimeDuration, + MaxBlockEndTime: maxTimeDuration, + } +) + type noopCache struct{} func (noopCache) SetPostings(b ulid.ULID, l labels.Label, v []byte) {} @@ -56,6 +69,13 @@ func (c *swappableCache) Series(b ulid.ULID, id uint64) ([]byte, bool) { return c.ptr.Series(b, id) } +var ( + minTime = time.Unix(0, 0) + maxTime, _ = time.Parse(time.RFC3339, "9999-12-31T23:59:59Z") + minTimeDuration = &TimeOrDurationValue{t: &minTime} + maxTimeDuration = &TimeOrDurationValue{t: &maxTime} +) + type storeSuite struct { cancel context.CancelFunc wg sync.WaitGroup @@ -128,7 +148,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m testutil.Ok(t, os.RemoveAll(dir2)) } - store, err := NewBucketStore(s.logger, nil, bkt, dir, s.cache, 0, maxSampleCount, 20, false, 20) + store, err := NewBucketStore(s.logger, nil, bkt, dir, s.cache, 0, maxSampleCount, 20, false, 20, blockFilterConf) testutil.Ok(t, err) s.store = store diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 0a85f7cc49..319df4fe92 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -4,17 +4,23 @@ import ( "context" "io/ioutil" "math" + "path/filepath" "testing" "time" "github.com/fortytw2/leaktest" + "github.com/go-kit/kit/log" "github.com/leanovate/gopter" "github.com/leanovate/gopter/gen" "github.com/leanovate/gopter/prop" "github.com/oklog/ulid" + prommodel "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/tsdb/labels" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact/downsample" + "github.com/thanos-io/thanos/pkg/model" + "github.com/thanos-io/thanos/pkg/objstore/inmem" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/testutil" ) @@ -416,7 +422,7 @@ func TestBucketStore_Info(t *testing.T) { dir, err := ioutil.TempDir("", "prometheus-test") testutil.Ok(t, err) - bucketStore, err := NewBucketStore(nil, nil, nil, dir, noopCache{}, 2e5, 0, 0, false, 20) + bucketStore, err := NewBucketStore(nil, nil, nil, dir, noopCache{}, 2e5, 0, 0, false, 20, blockFilterConf) testutil.Ok(t, err) resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{}) @@ -426,3 +432,65 @@ func TestBucketStore_Info(t *testing.T) { testutil.Equals(t, int64(math.MaxInt64), resp.MinTime) testutil.Equals(t, int64(math.MinInt64), resp.MaxTime) } + +func TestBucketStore_isBlockInMinMaxRange(t *testing.T) { + ctx := context.TODO() + dir, err := ioutil.TempDir("", "block-min-max-test") + testutil.Ok(t, err) + + series := []labels.Labels{labels.FromStrings("a", "1", "b", "1")} + extLset := labels.FromStrings("ext1", "value1") + + // Create a block in range [-2w, -1w] + id1, err := testutil.CreateBlock(ctx, dir, series, 10, + timestamp.FromTime(time.Now().Add(-14*24*time.Hour)), + timestamp.FromTime(time.Now().Add(-7*24*time.Hour)), + extLset, 0) + testutil.Ok(t, err) + + // Create a block in range [-1w, 0w] + id2, err := testutil.CreateBlock(ctx, dir, series, 10, + timestamp.FromTime(time.Now().Add(-7*24*time.Hour)), + timestamp.FromTime(time.Now().Add(-0*24*time.Hour)), + extLset, 0) + testutil.Ok(t, err) + + // Create a block in range [+1w, +2w] + id3, err := testutil.CreateBlock(ctx, dir, series, 10, + timestamp.FromTime(time.Now().Add(7*24*time.Hour)), + timestamp.FromTime(time.Now().Add(14*24*time.Hour)), + extLset, 0) + testutil.Ok(t, err) + + dir1, dir2 := filepath.Join(dir, id1.String()), filepath.Join(dir, id2.String()) + meta1, err := metadata.Read(dir1) + testutil.Ok(t, err) + testutil.Ok(t, metadata.Write(log.NewNopLogger(), dir1, meta1)) + + meta2, err := metadata.Read(dir2) + testutil.Ok(t, err) + testutil.Ok(t, metadata.Write(log.NewNopLogger(), dir2, meta2)) + + // Run actual test + hourBeforeDur := prommodel.Duration(-1 * time.Hour) + hourBefore := model.TimeOrDurationValue{Dur: &hourBeforeDur} + + // bucketStore accepts blocks in range [0, now-1h] + bucketStore, err := NewBucketStore(nil, nil, inmem.NewBucket(), dir, noopCache{}, 0, 0, 20, false, 20, + &BlockFilterConfig{ + MinBlockStartTime: minTimeDuration, + MaxBlockStartTime: hourBefore, + MinBlockEndTime: minTimeDuration, + MaxBlockEndTime: maxTimeDuration, + }) + + inRange, err := bucketStore.isBlockInMinMaxRange(context.TODO(), id1) + testutil.Ok(t, err) + testutil.Equals(t, true, inRange) + + inRange, err = bucketStore.isBlockInMinMaxRange(context.TODO(), id2) + testutil.Equals(t, true, inRange) + + inRange, err = bucketStore.isBlockInMinMaxRange(context.TODO(), id3) + testutil.Equals(t, false, inRange) +}