Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store: Add time & duration based partitioning #1077

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store"
Expand Down Expand Up @@ -49,7 +50,19 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage.").
Default("20").Int()

minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store serves only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z"))
povilasv marked this conversation as resolved.
Show resolved Hide resolved

maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve. Thanos Store serves only blocks, which happened eariler than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("9999-12-31T23:59:59Z"))

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error {
// Sanity check Time filters
if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() {
return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'",
minTime, maxTime)
}

return runStore(g,
logger,
reg,
Expand All @@ -69,6 +82,10 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application, name string
debugLogging,
*syncInterval,
*blockSyncConcurrency,
&store.FilterConfig{
MinTime: *minTime,
MaxTime: *maxTime,
},
)
}
}
Expand All @@ -94,6 +111,7 @@ func runStore(
verbose bool,
syncInterval time.Duration,
blockSyncConcurrency int,
blockFilterConf *store.FilterConfig,
) error {
{
confContentYaml, err := objStoreConfig.Content()
Expand Down Expand Up @@ -135,6 +153,7 @@ func runStore(
maxConcurrent,
verbose,
blockSyncConcurrency,
blockFilterConf,
)
if err != nil {
return errors.Wrap(err, "create object storage store")
Expand Down
55 changes: 55 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,60 @@ Flags:
--block-sync-concurrency=20
Number of goroutines to use when syncing blocks
from object storage.
--min-block-start-time=0000-01-01T00:00:00Z
Start of time range limit to serve. Thanos
Store serves only blocks, which have start time
greater than this value. Option can be a
constant time in RFC3339 format or time
duration relative to current time, such as
-1.5d or 2h45m. Valid duration units are ms, s,
m, h, d, w, y.
--max-block-start-time=9999-12-31T23:59:59Z
End of time range limit to serve. Thanos Store
serves only blocks, which have start time is
less than this value. Option can be a constant
time in RFC3339 format or time duration
relative to current time, such as -1.5d or
2h45m. Valid duration units are ms, s, m, h, d,
w, y.
--min-block-end-time=0000-01-01T00:00:00Z
Start of time range limit to serve. Thanos
Store serves only blocks, which have end time
greater than this value. Option can be a
constant time in RFC3339 format or time
duration relative to current time, such as
-1.5d or 2h45m. Valid duration units are ms, s,
m, h, d, w, y.
--max-block-end-time=9999-12-31T23:59:59Z
End of time range limit to serve. Thanos Store
serves only blocks, which have end time is less
than this value. Option can be a constant time
in RFC3339 format or time duration relative to
current time, such as -1.5d or 2h45m. Valid
duration units are ms, s, m, h, d, w, y.

```

## Time & Duration based partioning

By default Thanos Store Gateway looks at all the data in Object Store and returns it based on query's time range.

There is a block syncing job, which synchronizes local state with remote storage. You can configure how often it runs via `--sync-block-duration=3m`. In most cases default should work well.


Recently Thanos Store introduced `--min-block-start-time`, `--max-block-start-time`, `--min-block-end-time`,`--max-block-end-time` flags, that allows you to shard Thanos Store based on constant time or duration relative to current time.
The `{min,max}-block-start-time` options only look at block's start time and `{min,max}-block-end-time` only at the end time.

For example setting: `--min-block-start-time=-6w` & `--max-block-start-time==-2w` will make Thanos Store Gateway look at blocks that fall within `now - 6 weeks` up to `now - 2 weeks` time range.

You can also set constant time in RFC3339 format. For example `--min-block-start-time=2018-01-01T00:00:00Z`, `--max-block-start-time=2019-01-01T23:59:59Z`.

The block filtering is done in Thanos Store's syncing job, which adds some delay, so Thanos Store might not see new blocks or filter out blocks immediately.

We recommend having overlapping time ranges with Thanos Sidecar and other Thanos Store gateways as this improves your resiliency to failures.
A lot of Object Store implementations provide eventual read-after-write consistency, which means that Thanos Store won't immediately see newly created & uploaded blocks.
Also Thanos Sidecar might fail to upload new blocks to Object Store due to network timeouts or Object Store downtime, so if your time ranges are too strict, you won't be able to query the data.

Thanos Querier deals with overlapping time series by merging them together. It's important to note, that by having more overlapping time series Thanos Querier will use more resources, like CPU, network & memory, as it will have to pull down all the overlapping blocks and merge them.

When configuring time partitioning keep in mind Thanos Compaction, as it builds bigger blocks and removes data based on your retention policies.
75 changes: 75 additions & 0 deletions pkg/model/timeduration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package model

import (
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/timestamp"
"gopkg.in/alecthomas/kingpin.v2"
)

// TimeOrDurationValue is a custom kingping parser for time in RFC3339
// or duration in Go's duration format, such as "300ms", "-1.5h" or "2h45m".
// Only one will be set.
type TimeOrDurationValue struct {
Time *time.Time
Dur *model.Duration
}

// Set converts string to TimeOrDurationValue.
func (tdv *TimeOrDurationValue) Set(s string) error {
t, err := time.Parse(time.RFC3339, s)
if err == nil {
tdv.Time = &t
return nil
}

// error parsing time, let's try duration.
var minus bool
if s[0] == '-' {
minus = true
s = s[1:]
}
dur, err := model.ParseDuration(s)
if err != nil {
return err
}

if minus {
dur = dur * -1
}
tdv.Dur = &dur
return nil
}

// String returns either tume or duration.
func (tdv *TimeOrDurationValue) String() string {
switch {
case tdv.Time != nil:
return tdv.Time.String()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense for this to format back to RFC3339?

case tdv.Dur != nil:
return tdv.Dur.String()
}

return "nil"
}

// PrometheusTimestamp returns TimeOrDurationValue converted to PrometheusTimestamp
// if duration is set now+duration is converted to Timestamp.
func (tdv *TimeOrDurationValue) PrometheusTimestamp() int64 {
switch {
case tdv.Time != nil:
return timestamp.FromTime(*tdv.Time)
case tdv.Dur != nil:
return timestamp.FromTime(time.Now().Add(time.Duration(*tdv.Dur)))
}

return 0
}

// TimeOrDuration helper for parsing TimeOrDuration with kingpin.
func TimeOrDuration(flags *kingpin.FlagClause) *TimeOrDurationValue {
value := new(TimeOrDurationValue)
flags.SetValue(value)
return value
}
36 changes: 36 additions & 0 deletions pkg/model/timeduration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package model_test

import (
"testing"
"time"

"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/testutil"
"gopkg.in/alecthomas/kingpin.v2"
)

func TestTimeOrDurationValue(t *testing.T) {
cmd := kingpin.New("test", "test")

minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve"))

maxTime := model.TimeOrDuration(cmd.Flag("max-time", "End of time range limit to serve").
Default("9999-12-31T23:59:59Z"))

_, err := cmd.Parse([]string{"--min-time", "10s"})
if err != nil {
t.Fatal(err)
}

testutil.Equals(t, "10s", minTime.String())
testutil.Equals(t, "9999-12-31 23:59:59 +0000 UTC", maxTime.String())

prevTime := timestamp.FromTime(time.Now())
afterTime := timestamp.FromTime(time.Now().Add(15 * time.Second))

testutil.Assert(t, minTime.PrometheusTimestamp() > prevTime, "minTime prometheus timestamp is less than time now.")
testutil.Assert(t, minTime.PrometheusTimestamp() < afterTime, "minTime prometheus timestamp is more than time now + 15s")

testutil.Assert(t, 253402300799000 == maxTime.PrometheusTimestamp(), "maxTime is not equal to 253402300799000")
}
78 changes: 77 additions & 1 deletion pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/pool"
"github.com/thanos-io/thanos/pkg/runutil"
Expand Down Expand Up @@ -182,6 +183,11 @@ type indexCache interface {
Series(b ulid.ULID, id uint64) ([]byte, bool)
}

// FilterConfig is a configiration, which Store uses for filtering metrics.
type FilterConfig struct {
MinTime, MaxTime model.TimeOrDurationValue
}

// BucketStore implements the store API backed by a bucket. It loads all index
// files to local disk.
type BucketStore struct {
Expand All @@ -208,6 +214,8 @@ type BucketStore struct {
// samplesLimiter limits the number of samples per each Series() call.
samplesLimiter *Limiter
partitioner partitioner

filterConfig *FilterConfig
}

// NewBucketStore creates a new bucket backed store that implements the store API against
Expand All @@ -223,6 +231,7 @@ func NewBucketStore(
maxConcurrent int,
debugLogging bool,
blockSyncConcurrency int,
filterConf *FilterConfig,
) (*BucketStore, error) {
if logger == nil {
logger = log.NewNopLogger()
Expand All @@ -242,6 +251,7 @@ func NewBucketStore(
metrics := newBucketStoreMetrics(reg)
s := &BucketStore{
logger: logger,
metrics: metrics,
bucket: bucket,
dir: dir,
indexCache: indexCache,
Expand All @@ -256,8 +266,8 @@ func NewBucketStore(
),
samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped),
partitioner: gapBasedPartitioner{maxGapSize: maxGapSize},
filterConfig: filterConf,
}
s.metrics = metrics

if err := os.MkdirAll(dir, 0777); err != nil {
return nil, errors.Wrap(err, "create dir")
Expand Down Expand Up @@ -309,6 +319,17 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error {
if err != nil {
return nil
}

inRange, err := s.isBlockInMinMaxRange(ctx, id)
povilasv marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
level.Warn(s.logger).Log("msg", "error parsing block range", "block", id, "err", err)
return nil
}

if !inRange {
return nil
}

allIDs[id] = struct{}{}

if b := s.getBlock(id); b != nil {
Expand Down Expand Up @@ -377,6 +398,31 @@ func (s *BucketStore) numBlocks() int {
return len(s.blocks)
}

func (s *BucketStore) isBlockInMinMaxRange(ctx context.Context, id ulid.ULID) (bool, error) {
dir := filepath.Join(s.dir, id.String())

b := &bucketBlock{
logger: s.logger,
bucket: s.bucket,
id: id,
dir: dir,
}
if err := b.loadMeta(ctx, id); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we load meta once? I think we do it everythere in small functions ):

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should cache those on disk, not sure

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

b.loadMeta already does this:

	if _, err := os.Stat(b.dir); os.IsNotExist(err) {
		if err := os.MkdirAll(b.dir, 0777); err != nil {
			return errors.Wrap(err, "create dir")
		}
		src := path.Join(id.String(), block.MetaFilename)

		level.Debug(b.logger).Log("msg", "download file", "bucket", b.bucket, "src", src, "dir", b.dir)
		if err := objstore.DownloadFile(ctx, b.logger, b.bucket, src, b.dir); err != nil {
			return errors.Wrap(err, "download meta.json")
		}
	} else if err != nil {
		return err
	}
	meta, err := metadata.Read(b.dir)
	if err != nil {
		return errors.Wrap(err, "read meta.json")
	}

return false, err
}

// We check for blocks in configured minTime, maxTime range
povilasv marked this conversation as resolved.
Show resolved Hide resolved
switch {
case b.meta.MaxTime <= s.filterConfig.MinTime.PrometheusTimestamp():
return false, nil

case b.meta.MinTime >= s.filterConfig.MaxTime.PrometheusTimestamp():
return false, nil
}

return true, nil
}

func (s *BucketStore) getBlock(id ulid.ULID) *bucketBlock {
s.mtx.RLock()
defer s.mtx.RUnlock()
Expand Down Expand Up @@ -409,6 +455,7 @@ func (s *BucketStore) addBlock(ctx context.Context, id ulid.ULID) (err error) {
if err != nil {
return errors.Wrap(err, "new bucket block")
}

s.mtx.Lock()
defer s.mtx.Unlock()

Expand Down Expand Up @@ -468,9 +515,33 @@ func (s *BucketStore) TimeRange() (mint, maxt int64) {
maxt = b.meta.MaxTime
}
}

mint = s.normalizeMinTime(mint)
maxt = s.normalizeMaxTime(maxt)

return mint, maxt
}

func (s *BucketStore) normalizeMinTime(mint int64) int64 {
filterMinTime := s.filterConfig.MinTime.PrometheusTimestamp()

if mint < filterMinTime {
return filterMinTime
}

return mint
}

func (s *BucketStore) normalizeMaxTime(maxt int64) int64 {
filterMaxTime := s.filterConfig.MaxTime.PrometheusTimestamp()

if maxt > filterMaxTime {
maxt = filterMaxTime
}

return maxt
}

// Info implements the storepb.StoreServer interface.
func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.InfoResponse, error) {
mint, maxt := s.TimeRange()
Expand Down Expand Up @@ -730,11 +801,16 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
)
s.mtx.RLock()

// Adjust Request MinTime based on filters.
req.MinTime = s.normalizeMinTime(req.MinTime)
req.MaxTime = s.normalizeMaxTime(req.MaxTime)

for _, bs := range s.blockSets {
blockMatchers, ok := bs.labelMatchers(matchers...)
if !ok {
continue
}

blocks := bs.getFor(req.MinTime, req.MaxTime, req.MaxResolutionWindow)

if s.debugLogging {
Expand Down
Loading