Skip to content

Commit

Permalink
store: Filter blocks before loading it. Sort advertise labels; Added …
Browse files Browse the repository at this point in the history
…sharding e2e test.

Fixes: #1664

Signed-off-by: Bartek Plotka <[email protected]>
  • Loading branch information
bwplotka committed Oct 18, 2019
1 parent f0d3b14 commit 30f7b87
Show file tree
Hide file tree
Showing 4 changed files with 316 additions and 184 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#1660](https://github.com/thanos-io/thanos/pull/1660) Add a new `--prometheus.ready_timeout` CLI option to the sidecar to set how long to wait until Prometheus starts up.

### Fixed

- [#1669](https://github.com/thanos-io/thanos/pull/1669) Fixed store sharding. Now it does not load excluded meta.jsons and load/fetch index-cache.json files.

## [v0.8.1](https://github.com/thanos-io/thanos/releases/tag/v0.8.1) - 2019.10.14

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion pkg/objstore/objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src,

rc, err := bkt.Get(ctx, src)
if err != nil {
return errors.Wrap(err, "get file")
return errors.Wrapf(err, "get file %s", src)
}
defer runutil.CloseWithLogOnErr(logger, rc, "download block's file reader")

Expand Down
123 changes: 59 additions & 64 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ type BucketStore struct {
filterConfig *FilterConfig
relabelConfig []*relabel.Config

labelSets map[uint64]labels.Labels
advLabelSets []storepb.LabelSet
enableCompatibilityLabel bool
}

Expand Down Expand Up @@ -322,14 +322,14 @@ func (s *BucketStore) Close() (err error) {
// It will reuse disk space as persistent cache based on s.dir param.
func (s *BucketStore) SyncBlocks(ctx context.Context) error {
var wg sync.WaitGroup
blockc := make(chan ulid.ULID)
blockc := make(chan *metadata.Meta)

for i := 0; i < s.blockSyncConcurrency; i++ {
wg.Add(1)
go func() {
for id := range blockc {
if err := s.addBlock(ctx, id); err != nil {
level.Warn(s.logger).Log("msg", "loading block failed", "id", id, "err", err)
for meta := range blockc {
if err := s.addBlock(ctx, meta); err != nil {
level.Warn(s.logger).Log("msg", "loading block failed", "id", meta.ULID, "err", err)
continue
}
}
Expand All @@ -346,14 +346,27 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error {
return nil
}

inRange, err := s.isBlockInMinMaxRange(ctx, id)
bdir := path.Join(s.dir, id.String())
meta, err := loadMeta(ctx, s.logger, s.bucket, bdir, id)
if err != nil {
return errors.Wrap(err, "load meta")
}

inRange, err := s.isBlockInMinMaxRange(ctx, meta)
if err != nil {
level.Warn(s.logger).Log("msg", "error parsing block range", "block", id, "err", err)
return nil
return os.RemoveAll(bdir)
}

if !inRange {
return nil
return os.RemoveAll(bdir)
}

// Check for block labels by relabeling.
// If output is empty, the block will be dropped.
if processedLabels := relabel.Process(promlabels.FromMap(meta.Thanos.Labels), s.relabelConfig...); processedLabels == nil {
level.Debug(s.logger).Log("msg", "ignoring block (drop in relabeling)", "block", id)
return os.RemoveAll(bdir)
}

allIDs[id] = struct{}{}
Expand All @@ -363,7 +376,7 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error {
}
select {
case <-ctx.Done():
case blockc <- id:
case blockc <- meta:
}
return nil
})
Expand All @@ -387,11 +400,19 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error {
}

// Sync advertise labels.
var storeLabels []storepb.Label
s.mtx.Lock()
s.labelSets = make(map[uint64]labels.Labels, len(s.blocks))
for _, bs := range s.blocks {
s.labelSets[bs.labels.Hash()] = append(labels.Labels(nil), bs.labels...)
s.advLabelSets = s.advLabelSets[:0]
for _, bs := range s.blockSets {
storeLabels := storeLabels[:0]
for _, l := range bs.labels {
storeLabels = append(storeLabels, storepb.Label{Name: l.Name, Value: l.Value})
}
s.advLabelSets = append(s.advLabelSets, storepb.LabelSet{Labels: storeLabels})
}
sort.Slice(s.advLabelSets, func(i, j int) bool {
return strings.Compare(s.advLabelSets[i].String(), s.advLabelSets[j].String()) < 0
})
s.mtx.Unlock()

return nil
Expand Down Expand Up @@ -432,14 +453,7 @@ func (s *BucketStore) numBlocks() int {
return len(s.blocks)
}

func (s *BucketStore) isBlockInMinMaxRange(ctx context.Context, id ulid.ULID) (bool, error) {
dir := filepath.Join(s.dir, id.String())

err, meta := loadMeta(ctx, s.logger, s.bucket, dir, id)
if err != nil {
return false, err
}

func (s *BucketStore) isBlockInMinMaxRange(ctx context.Context, meta *metadata.Meta) (bool, error) {
// We check for blocks in configured minTime, maxTime range.
switch {
case meta.MaxTime <= s.filterConfig.MinTime.PrometheusTimestamp():
Expand All @@ -458,8 +472,8 @@ func (s *BucketStore) getBlock(id ulid.ULID) *bucketBlock {
return s.blocks[id]
}

func (s *BucketStore) addBlock(ctx context.Context, id ulid.ULID) (err error) {
dir := filepath.Join(s.dir, id.String())
func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err error) {
dir := filepath.Join(s.dir, meta.ULID.String())

defer func() {
if err != nil {
Expand All @@ -471,11 +485,14 @@ func (s *BucketStore) addBlock(ctx context.Context, id ulid.ULID) (err error) {
}()
s.metrics.blockLoads.Inc()

lset := labels.FromMap(meta.Thanos.Labels)
h := lset.Hash()

b, err := newBucketBlock(
ctx,
log.With(s.logger, "block", id),
log.With(s.logger, "block", meta.ULID),
meta,
s.bucket,
id,
dir,
s.indexCache,
s.chunkPool,
Expand All @@ -487,17 +504,7 @@ func (s *BucketStore) addBlock(ctx context.Context, id ulid.ULID) (err error) {
s.mtx.Lock()
defer s.mtx.Unlock()

lset := labels.FromMap(b.meta.Thanos.Labels)
h := lset.Hash()

// Check for block labels by relabeling.
// If output is empty, the block will be dropped.
if processedLabels := relabel.Process(promlabels.FromMap(lset.Map()), s.relabelConfig...); processedLabels == nil {
level.Debug(s.logger).Log("msg", "dropping block(drop in relabeling)", "block", id)
return os.RemoveAll(dir)
}
b.labels = lset
sort.Sort(b.labels)
sort.Sort(lset)

set, ok := s.blockSets[h]
if !ok {
Expand Down Expand Up @@ -569,14 +576,8 @@ func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.Info
}

s.mtx.RLock()
res.LabelSets = make([]storepb.LabelSet, 0, len(s.labelSets))
for _, ls := range s.labelSets {
lset := make([]storepb.Label, 0, len(ls))
for _, l := range ls {
lset = append(lset, storepb.Label{Name: l.Name, Value: l.Value})
}
res.LabelSets = append(res.LabelSets, storepb.LabelSet{Labels: lset})
}
// Should we clone?
res.LabelSets = s.advLabelSets
s.mtx.RUnlock()

if s.enableCompatibilityLabel && len(res.LabelSets) > 0 {
Expand Down Expand Up @@ -1192,21 +1193,18 @@ type bucketBlock struct {
lvals map[string][]string
postings map[labels.Label]index.Range

id ulid.ULID
chunkObjs []string

pendingReaders sync.WaitGroup

partitioner partitioner

labels labels.Labels
}

func newBucketBlock(
ctx context.Context,
logger log.Logger,
meta *metadata.Meta,
bkt objstore.BucketReader,
id ulid.ULID,
dir string,
indexCache indexCache,
chunkPool *pool.BytesPool,
Expand All @@ -1215,23 +1213,17 @@ func newBucketBlock(
b = &bucketBlock{
logger: logger,
bucket: bkt,
id: id,
indexCache: indexCache,
chunkPool: chunkPool,
dir: dir,
partitioner: p,
meta: meta,
}
err, meta := loadMeta(ctx, logger, bkt, dir, id)
if err != nil {
return nil, errors.Wrap(err, "load meta")
}
b.meta = meta

if err = b.loadIndexCacheFile(ctx); err != nil {
return nil, errors.Wrap(err, "load index cache")
}
// Get object handles for all chunk files.
err = bkt.Iter(ctx, path.Join(id.String(), block.ChunksDirname), func(n string) error {
err = bkt.Iter(ctx, path.Join(meta.ULID.String(), block.ChunksDirname), func(n string) error {
b.chunkObjs = append(b.chunkObjs, n)
return nil
})
Expand All @@ -1242,33 +1234,36 @@ func newBucketBlock(
}

func (b *bucketBlock) indexFilename() string {
return path.Join(b.id.String(), block.IndexFilename)
return path.Join(b.meta.ULID.String(), block.IndexFilename)
}

func (b *bucketBlock) indexCacheFilename() string {
return path.Join(b.id.String(), block.IndexCacheFilename)
return path.Join(b.meta.ULID.String(), block.IndexCacheFilename)
}

func loadMeta(ctx context.Context, logger log.Logger, bucket objstore.BucketReader, dir string, id ulid.ULID) (error, *metadata.Meta) {
func loadMeta(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID) (*metadata.Meta, error) {
// If we haven't seen the block before or it is missing the meta.json, download it.
if _, err := os.Stat(path.Join(dir, block.MetaFilename)); os.IsNotExist(err) {
if err := os.MkdirAll(dir, 0777); err != nil {
return errors.Wrap(err, "create dir"), nil
return nil, errors.Wrap(err, "create dir")
}
src := path.Join(id.String(), block.MetaFilename)

if err := objstore.DownloadFile(ctx, logger, bucket, src, dir); err != nil {
return errors.Wrap(err, "download meta.json"), nil
if err := objstore.DownloadFile(ctx, logger, bkt, src, dir); err != nil {
if bkt.IsObjNotFoundErr(errors.Cause(err)) {
level.Debug(logger).Log("msg", "meta file wasn't found. Block not ready or being deleted.", "block", id.String())
}
return nil, errors.Wrap(err, "download meta.json")
}
} else if err != nil {
return err, nil
return nil, err
}
meta, err := metadata.Read(dir)
if err != nil {
return errors.Wrap(err, "read meta.json"), nil
return nil, errors.Wrap(err, "read meta.json")
}

return nil, meta
return meta, err
}

func (b *bucketBlock) loadIndexCacheFile(ctx context.Context) (err error) {
Expand Down
Loading

0 comments on commit 30f7b87

Please sign in to comment.