From 30f7b878de0e2c64445b84484596a86021a30174 Mon Sep 17 00:00:00 2001 From: Bartek Plotka Date: Fri, 18 Oct 2019 18:43:46 +0100 Subject: [PATCH] store: Filter blocks before loading it. Sort advertise labels; Added sharding e2e test. Fixes: https://github.com/thanos-io/thanos/issues/1664 Signed-off-by: Bartek Plotka --- CHANGELOG.md | 4 + pkg/objstore/objstore.go | 2 +- pkg/store/bucket.go | 123 +++++++------ pkg/store/bucket_test.go | 371 ++++++++++++++++++++++++++------------- 4 files changed, 316 insertions(+), 184 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c7c45a703b..2700eb0c42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,10 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#1660](https://github.com/thanos-io/thanos/pull/1660) Add a new `--prometheus.ready_timeout` CLI option to the sidecar to set how long to wait until Prometheus starts up. +### Fixed + +- [#1669](https://github.com/thanos-io/thanos/pull/1669) Fixed store sharding. Now it does not load excluded meta.jsons and load/fetch index-cache.json files. + ## [v0.8.1](https://github.com/thanos-io/thanos/releases/tag/v0.8.1) - 2019.10.14 ### Fixed diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index 67a0ce2a12..a52ba12fc1 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -109,7 +109,7 @@ func DownloadFile(ctx context.Context, logger log.Logger, bkt BucketReader, src, rc, err := bkt.Get(ctx, src) if err != nil { - return errors.Wrap(err, "get file") + return errors.Wrapf(err, "get file %s", src) } defer runutil.CloseWithLogOnErr(logger, rc, "download block's file reader") diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index d5cdf655de..d5ae031316 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -236,7 +236,7 @@ type BucketStore struct { filterConfig *FilterConfig relabelConfig []*relabel.Config - labelSets map[uint64]labels.Labels + advLabelSets []storepb.LabelSet enableCompatibilityLabel bool } @@ -322,14 +322,14 @@ func (s *BucketStore) Close() (err error) { // It will reuse disk space as persistent cache based on s.dir param. func (s *BucketStore) SyncBlocks(ctx context.Context) error { var wg sync.WaitGroup - blockc := make(chan ulid.ULID) + blockc := make(chan *metadata.Meta) for i := 0; i < s.blockSyncConcurrency; i++ { wg.Add(1) go func() { - for id := range blockc { - if err := s.addBlock(ctx, id); err != nil { - level.Warn(s.logger).Log("msg", "loading block failed", "id", id, "err", err) + for meta := range blockc { + if err := s.addBlock(ctx, meta); err != nil { + level.Warn(s.logger).Log("msg", "loading block failed", "id", meta.ULID, "err", err) continue } } @@ -346,14 +346,27 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { return nil } - inRange, err := s.isBlockInMinMaxRange(ctx, id) + bdir := path.Join(s.dir, id.String()) + meta, err := loadMeta(ctx, s.logger, s.bucket, bdir, id) + if err != nil { + return errors.Wrap(err, "load meta") + } + + inRange, err := s.isBlockInMinMaxRange(ctx, meta) if err != nil { level.Warn(s.logger).Log("msg", "error parsing block range", "block", id, "err", err) - return nil + return os.RemoveAll(bdir) } if !inRange { - return nil + return os.RemoveAll(bdir) + } + + // Check for block labels by relabeling. + // If output is empty, the block will be dropped. + if processedLabels := relabel.Process(promlabels.FromMap(meta.Thanos.Labels), s.relabelConfig...); processedLabels == nil { + level.Debug(s.logger).Log("msg", "ignoring block (drop in relabeling)", "block", id) + return os.RemoveAll(bdir) } allIDs[id] = struct{}{} @@ -363,7 +376,7 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { } select { case <-ctx.Done(): - case blockc <- id: + case blockc <- meta: } return nil }) @@ -387,11 +400,19 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { } // Sync advertise labels. + var storeLabels []storepb.Label s.mtx.Lock() - s.labelSets = make(map[uint64]labels.Labels, len(s.blocks)) - for _, bs := range s.blocks { - s.labelSets[bs.labels.Hash()] = append(labels.Labels(nil), bs.labels...) + s.advLabelSets = s.advLabelSets[:0] + for _, bs := range s.blockSets { + storeLabels := storeLabels[:0] + for _, l := range bs.labels { + storeLabels = append(storeLabels, storepb.Label{Name: l.Name, Value: l.Value}) + } + s.advLabelSets = append(s.advLabelSets, storepb.LabelSet{Labels: storeLabels}) } + sort.Slice(s.advLabelSets, func(i, j int) bool { + return strings.Compare(s.advLabelSets[i].String(), s.advLabelSets[j].String()) < 0 + }) s.mtx.Unlock() return nil @@ -432,14 +453,7 @@ func (s *BucketStore) numBlocks() int { return len(s.blocks) } -func (s *BucketStore) isBlockInMinMaxRange(ctx context.Context, id ulid.ULID) (bool, error) { - dir := filepath.Join(s.dir, id.String()) - - err, meta := loadMeta(ctx, s.logger, s.bucket, dir, id) - if err != nil { - return false, err - } - +func (s *BucketStore) isBlockInMinMaxRange(ctx context.Context, meta *metadata.Meta) (bool, error) { // We check for blocks in configured minTime, maxTime range. switch { case meta.MaxTime <= s.filterConfig.MinTime.PrometheusTimestamp(): @@ -458,8 +472,8 @@ func (s *BucketStore) getBlock(id ulid.ULID) *bucketBlock { return s.blocks[id] } -func (s *BucketStore) addBlock(ctx context.Context, id ulid.ULID) (err error) { - dir := filepath.Join(s.dir, id.String()) +func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err error) { + dir := filepath.Join(s.dir, meta.ULID.String()) defer func() { if err != nil { @@ -471,11 +485,14 @@ func (s *BucketStore) addBlock(ctx context.Context, id ulid.ULID) (err error) { }() s.metrics.blockLoads.Inc() + lset := labels.FromMap(meta.Thanos.Labels) + h := lset.Hash() + b, err := newBucketBlock( ctx, - log.With(s.logger, "block", id), + log.With(s.logger, "block", meta.ULID), + meta, s.bucket, - id, dir, s.indexCache, s.chunkPool, @@ -487,17 +504,7 @@ func (s *BucketStore) addBlock(ctx context.Context, id ulid.ULID) (err error) { s.mtx.Lock() defer s.mtx.Unlock() - lset := labels.FromMap(b.meta.Thanos.Labels) - h := lset.Hash() - - // Check for block labels by relabeling. - // If output is empty, the block will be dropped. - if processedLabels := relabel.Process(promlabels.FromMap(lset.Map()), s.relabelConfig...); processedLabels == nil { - level.Debug(s.logger).Log("msg", "dropping block(drop in relabeling)", "block", id) - return os.RemoveAll(dir) - } - b.labels = lset - sort.Sort(b.labels) + sort.Sort(lset) set, ok := s.blockSets[h] if !ok { @@ -569,14 +576,8 @@ func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.Info } s.mtx.RLock() - res.LabelSets = make([]storepb.LabelSet, 0, len(s.labelSets)) - for _, ls := range s.labelSets { - lset := make([]storepb.Label, 0, len(ls)) - for _, l := range ls { - lset = append(lset, storepb.Label{Name: l.Name, Value: l.Value}) - } - res.LabelSets = append(res.LabelSets, storepb.LabelSet{Labels: lset}) - } + // Should we clone? + res.LabelSets = s.advLabelSets s.mtx.RUnlock() if s.enableCompatibilityLabel && len(res.LabelSets) > 0 { @@ -1192,21 +1193,18 @@ type bucketBlock struct { lvals map[string][]string postings map[labels.Label]index.Range - id ulid.ULID chunkObjs []string pendingReaders sync.WaitGroup partitioner partitioner - - labels labels.Labels } func newBucketBlock( ctx context.Context, logger log.Logger, + meta *metadata.Meta, bkt objstore.BucketReader, - id ulid.ULID, dir string, indexCache indexCache, chunkPool *pool.BytesPool, @@ -1215,23 +1213,17 @@ func newBucketBlock( b = &bucketBlock{ logger: logger, bucket: bkt, - id: id, indexCache: indexCache, chunkPool: chunkPool, dir: dir, partitioner: p, + meta: meta, } - err, meta := loadMeta(ctx, logger, bkt, dir, id) - if err != nil { - return nil, errors.Wrap(err, "load meta") - } - b.meta = meta - if err = b.loadIndexCacheFile(ctx); err != nil { return nil, errors.Wrap(err, "load index cache") } // Get object handles for all chunk files. - err = bkt.Iter(ctx, path.Join(id.String(), block.ChunksDirname), func(n string) error { + err = bkt.Iter(ctx, path.Join(meta.ULID.String(), block.ChunksDirname), func(n string) error { b.chunkObjs = append(b.chunkObjs, n) return nil }) @@ -1242,33 +1234,36 @@ func newBucketBlock( } func (b *bucketBlock) indexFilename() string { - return path.Join(b.id.String(), block.IndexFilename) + return path.Join(b.meta.ULID.String(), block.IndexFilename) } func (b *bucketBlock) indexCacheFilename() string { - return path.Join(b.id.String(), block.IndexCacheFilename) + return path.Join(b.meta.ULID.String(), block.IndexCacheFilename) } -func loadMeta(ctx context.Context, logger log.Logger, bucket objstore.BucketReader, dir string, id ulid.ULID) (error, *metadata.Meta) { +func loadMeta(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID) (*metadata.Meta, error) { // If we haven't seen the block before or it is missing the meta.json, download it. if _, err := os.Stat(path.Join(dir, block.MetaFilename)); os.IsNotExist(err) { if err := os.MkdirAll(dir, 0777); err != nil { - return errors.Wrap(err, "create dir"), nil + return nil, errors.Wrap(err, "create dir") } src := path.Join(id.String(), block.MetaFilename) - if err := objstore.DownloadFile(ctx, logger, bucket, src, dir); err != nil { - return errors.Wrap(err, "download meta.json"), nil + if err := objstore.DownloadFile(ctx, logger, bkt, src, dir); err != nil { + if bkt.IsObjNotFoundErr(errors.Cause(err)) { + level.Debug(logger).Log("msg", "meta file wasn't found. Block not ready or being deleted.", "block", id.String()) + } + return nil, errors.Wrap(err, "download meta.json") } } else if err != nil { - return err, nil + return nil, err } meta, err := metadata.Read(dir) if err != nil { - return errors.Wrap(err, "read meta.json"), nil + return nil, errors.Wrap(err, "read meta.json") } - return nil, meta + return meta, err } func (b *bucketBlock) loadIndexCacheFile(ctx context.Context) (err error) { diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index c285df4ed4..54dc7fde4c 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -2,11 +2,14 @@ package store import ( "context" + "io" "io/ioutil" "math" + "os" "path" "path/filepath" "sort" + "sync" "testing" "time" @@ -486,14 +489,12 @@ func TestBucketStore_isBlockInMinMaxRange(t *testing.T) { extLset, 0) testutil.Ok(t, err) - dir1, dir2 := filepath.Join(dir, id1.String()), filepath.Join(dir, id2.String()) - meta1, err := metadata.Read(dir1) + meta1, err := metadata.Read(path.Join(dir, id1.String())) testutil.Ok(t, err) - testutil.Ok(t, metadata.Write(log.NewNopLogger(), dir1, meta1)) - - meta2, err := metadata.Read(dir2) + meta2, err := metadata.Read(path.Join(dir, id2.String())) + testutil.Ok(t, err) + meta3, err := metadata.Read(path.Join(dir, id3.String())) testutil.Ok(t, err) - testutil.Ok(t, metadata.Write(log.NewNopLogger(), dir2, meta2)) // Run actual test. hourBeforeDur := prommodel.Duration(-1 * time.Hour) @@ -507,166 +508,298 @@ func TestBucketStore_isBlockInMinMaxRange(t *testing.T) { }, emptyRelabelConfig, true) testutil.Ok(t, err) - inRange, err := bucketStore.isBlockInMinMaxRange(context.TODO(), id1) + inRange, err := bucketStore.isBlockInMinMaxRange(context.TODO(), meta1) testutil.Ok(t, err) testutil.Equals(t, true, inRange) - inRange, err = bucketStore.isBlockInMinMaxRange(context.TODO(), id2) + inRange, err = bucketStore.isBlockInMinMaxRange(context.TODO(), meta2) testutil.Ok(t, err) testutil.Equals(t, true, inRange) - inRange, err = bucketStore.isBlockInMinMaxRange(context.TODO(), id3) + inRange, err = bucketStore.isBlockInMinMaxRange(context.TODO(), meta3) testutil.Ok(t, err) testutil.Equals(t, false, inRange) } -func TestBucketStore_selectorBlocks(t *testing.T) { - ctx := context.TODO() +type recorder struct { + mtx sync.Mutex + objstore.Bucket + + touched []string +} + +func (r *recorder) Get(ctx context.Context, name string) (io.ReadCloser, error) { + r.mtx.Lock() + defer r.mtx.Unlock() + + r.touched = append(r.touched, name) + return r.Bucket.Get(ctx, name) +} + +func TestBucketStore_Sharding(t *testing.T) { + ctx := context.Background() logger := log.NewNopLogger() - dir, err := ioutil.TempDir("", "selector-blocks") + + dir, err := ioutil.TempDir("", "test-sharding-prepare") testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + bkt := inmem.NewBucket() series := []labels.Labels{labels.FromStrings("a", "1", "b", "1")} - id1, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "A"}}, 0) + id1, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "a"}, {Name: "region", Value: "r1"}}, 0) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, logger, bkt, filepath.Join(dir, id1.String()))) + + id2, err := testutil.CreateBlock(ctx, dir, series, 10, 1000, 2000, labels.Labels{{Name: "cluster", Value: "a"}, {Name: "region", Value: "r1"}}, 0) testutil.Ok(t, err) - testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id1.String(), block.MetaFilename), path.Join(id1.String(), block.MetaFilename))) - testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id1.String(), block.IndexFilename), path.Join(id1.String(), block.IndexFilename))) + testutil.Ok(t, block.Upload(ctx, logger, bkt, filepath.Join(dir, id2.String()))) - id2, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "B"}}, 0) + id3, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "b"}, {Name: "region", Value: "r1"}}, 0) testutil.Ok(t, err) - testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id2.String(), block.MetaFilename), path.Join(id2.String(), block.MetaFilename))) - testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id2.String(), block.IndexFilename), path.Join(id2.String(), block.IndexFilename))) + testutil.Ok(t, block.Upload(ctx, logger, bkt, filepath.Join(dir, id3.String()))) - id3, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "A"}}, 0) + id4, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "a"}, {Name: "region", Value: "r2"}}, 0) testutil.Ok(t, err) - testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id3.String(), block.MetaFilename), path.Join(id3.String(), block.MetaFilename))) - testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id3.String(), block.IndexFilename), path.Join(id3.String(), block.IndexFilename))) + testutil.Ok(t, block.Upload(ctx, logger, bkt, filepath.Join(dir, id4.String()))) + if ok := t.Run("new_runs", func(t *testing.T) { + testSharding(t, "", bkt, id1, id2, id3, id4) + }); !ok { + return + } + + dir2, err := ioutil.TempDir("", "test-sharding2") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir2)) }() + + if ok := t.Run("reuse_disk", func(t *testing.T) { + testSharding(t, dir2, bkt, id1, id2, id3, id4) + }); !ok { + return + } + +} + +func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ulid.ULID) { + var cached []ulid.ULID + + logger := log.NewLogfmtLogger(os.Stderr) for _, sc := range []struct { - relabelContentYaml string - exceptedLength int - exceptedIds []ulid.ULID + name string + relabel string + expectedIDs []ulid.ULID + expectedAdvLabels []storepb.LabelSet }{ { - relabelContentYaml: ` + name: "no sharding", + expectedIDs: all, + expectedAdvLabels: []storepb.LabelSet{ + { + Labels: []storepb.Label{ + {Name: "cluster", Value: "a"}, + {Name: "region", Value: "r1"}, + }, + }, + { + Labels: []storepb.Label{ + {Name: "cluster", Value: "a"}, + {Name: "region", Value: "r2"}, + }, + }, + { + Labels: []storepb.Label{ + {Name: "cluster", Value: "b"}, + {Name: "region", Value: "r1"}, + }, + }, + { + Labels: []storepb.Label{ + {Name: CompatibilityTypeLabelName, Value: "store"}, + }, + }, + }, + }, + { + name: "drop cluster=a sources", + relabel: ` - action: drop - regex: "A" + regex: "a" source_labels: - cluster `, - exceptedLength: 1, - exceptedIds: []ulid.ULID{id2}, + expectedIDs: []ulid.ULID{all[2]}, + expectedAdvLabels: []storepb.LabelSet{ + { + Labels: []storepb.Label{ + {Name: "cluster", Value: "b"}, + {Name: "region", Value: "r1"}, + }, + }, + { + Labels: []storepb.Label{ + {Name: CompatibilityTypeLabelName, Value: "store"}, + }, + }, + }, }, { - relabelContentYaml: ` + name: "keep only cluster=a sources", + relabel: ` - action: keep - regex: "A" + regex: "a" source_labels: - cluster `, - exceptedLength: 2, - exceptedIds: []ulid.ULID{id1, id3}, + expectedIDs: []ulid.ULID{all[0], all[1], all[3]}, + expectedAdvLabels: []storepb.LabelSet{ + { + Labels: []storepb.Label{ + {Name: "cluster", Value: "a"}, + {Name: "region", Value: "r1"}, + }, + }, + { + Labels: []storepb.Label{ + {Name: "cluster", Value: "a"}, + {Name: "region", Value: "r2"}, + }, + }, + { + Labels: []storepb.Label{ + {Name: CompatibilityTypeLabelName, Value: "store"}, + }, + }, + }, + }, + { + name: "keep only cluster=a without .*2 region sources", + relabel: ` + - action: keep + regex: "a" + source_labels: + - cluster + - action: drop + regex: ".*2" + source_labels: + - region + `, + expectedIDs: []ulid.ULID{all[0], all[1]}, + expectedAdvLabels: []storepb.LabelSet{ + { + Labels: []storepb.Label{ + {Name: "cluster", Value: "a"}, + {Name: "region", Value: "r1"}, + }, + }, + { + Labels: []storepb.Label{ + {Name: CompatibilityTypeLabelName, Value: "store"}, + }, + }, + }, + }, + { + name: "drop all", + relabel: ` + - action: drop + regex: "a" + source_labels: + - cluster + - action: drop + regex: "r1" + source_labels: + - region + `, + expectedIDs: []ulid.ULID{}, + expectedAdvLabels: []storepb.LabelSet(nil), }, } { - var relabelConf []*relabel.Config - err = yaml.Unmarshal([]byte(sc.relabelContentYaml), &relabelConf) - testutil.Ok(t, err) + t.Run(sc.name, func(t *testing.T) { + dir := reuseDisk + + if dir == "" { + var err error + dir, err = ioutil.TempDir("", "test-sharding") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + } - bucketStore, err := NewBucketStore(nil, nil, bkt, dir, noopCache{}, 0, 0, 20, false, 20, - filterConf, relabelConf, true) - testutil.Ok(t, err) + var relabelConf []*relabel.Config + testutil.Ok(t, yaml.Unmarshal([]byte(sc.relabel), &relabelConf)) - for _, id := range []ulid.ULID{id1, id2, id3} { - testutil.Ok(t, bucketStore.addBlock(ctx, id)) - } - testutil.Equals(t, sc.exceptedLength, len(bucketStore.blocks)) + rec := &recorder{Bucket: bkt} + bucketStore, err := NewBucketStore(logger, nil, rec, dir, noopCache{}, 0, 0, 99, false, 20, + filterConf, relabelConf, true) + testutil.Ok(t, err) - ids := make([]ulid.ULID, 0, len(bucketStore.blocks)) - for id := range bucketStore.blocks { - ids = append(ids, id) - } - sort.Slice(sc.exceptedIds, func(i, j int) bool { - return sc.exceptedIds[i].Compare(sc.exceptedIds[j]) > 0 - }) - sort.Slice(ids, func(i, j int) bool { - return ids[i].Compare(ids[j]) > 0 - }) - testutil.Equals(t, sc.exceptedIds, ids) - } -} + testutil.Ok(t, bucketStore.SyncBlocks(context.Background())) -func TestBucketStore_InfoWithLabels(t *testing.T) { - defer leaktest.CheckTimeout(t, 10*time.Second)() + // Check "stored" blocks. + ids := make([]ulid.ULID, 0, len(bucketStore.blocks)) + for id := range bucketStore.blocks { + ids = append(ids, id) + } + sort.Slice(ids, func(i, j int) bool { + return ids[i].Compare(ids[j]) < 0 + }) + testutil.Equals(t, sc.expectedIDs, ids) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + // Check Info endpoint. + resp, err := bucketStore.Info(context.Background(), &storepb.InfoRequest{}) + testutil.Ok(t, err) - dir, err := ioutil.TempDir("", "bucketstore-test") - testutil.Ok(t, err) + testutil.Equals(t, storepb.StoreType_STORE, resp.StoreType) + testutil.Equals(t, []storepb.Label(nil), resp.Labels) + testutil.Equals(t, sc.expectedAdvLabels, resp.LabelSets) - bkt := inmem.NewBucket() - series := []labels.Labels{labels.FromStrings("a", "1", "b", "1")} + // Make sure we don't download files we did not expect to. + // Regression test: https://github.com/thanos-io/thanos/issues/1664 - logger := log.NewNopLogger() - id1, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "A"}}, 0) - testutil.Ok(t, err) - testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id1.String(), block.IndexFilename), path.Join(id1.String(), block.IndexFilename))) + // Sort records. We load blocks concurrently so operations might be not ordered. + sort.Strings(rec.touched) - id2, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "B"}}, 0) - testutil.Ok(t, err) - testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id2.String(), block.IndexFilename), path.Join(id2.String(), block.IndexFilename))) + if reuseDisk != "" { + testutil.Equals(t, expectedTouchedBlockOps(all, sc.expectedIDs, cached), rec.touched) + cached = sc.expectedIDs + return + } - id3, err := testutil.CreateBlock(ctx, dir, series, 10, 0, 1000, labels.Labels{{Name: "cluster", Value: "B"}}, 0) - testutil.Ok(t, err) - testutil.Ok(t, objstore.UploadFile(ctx, logger, bkt, filepath.Join(dir, id3.String(), block.IndexFilename), path.Join(id3.String(), block.IndexFilename))) - - relabelContentYaml := ` - - action: drop - regex: "A" - source_labels: - - cluster - ` - var relabelConfig []*relabel.Config - err = yaml.Unmarshal([]byte(relabelContentYaml), &relabelConfig) - testutil.Ok(t, err) - bucketStore, err := NewBucketStore( - nil, - nil, - bkt, - dir, - noopCache{}, - 2e5, - 0, - 0, - false, - 20, - filterConf, - relabelConfig, - true, - ) - testutil.Ok(t, err) + testutil.Equals(t, expectedTouchedBlockOps(all, sc.expectedIDs, nil), rec.touched) + }) + } +} - err = bucketStore.SyncBlocks(ctx) - testutil.Ok(t, err) +func expectedTouchedBlockOps(all []ulid.ULID, expected []ulid.ULID, cached []ulid.ULID) []string { + var ops []string + for _, id := range all { + blockCached := false + for _, fid := range cached { + if id.Compare(fid) == 0 { + blockCached = true + break + } + } + if blockCached { + continue + } - resp, err := bucketStore.Info(ctx, &storepb.InfoRequest{}) - testutil.Ok(t, err) + found := false + for _, fid := range expected { + if id.Compare(fid) == 0 { + found = true + break + } + } - testutil.Equals(t, storepb.StoreType_STORE, resp.StoreType) - testutil.Equals(t, int64(0), resp.MinTime) - testutil.Equals(t, int64(1000), resp.MaxTime) - testutil.Equals(t, []storepb.Label(nil), resp.Labels) - testutil.Equals(t, []storepb.LabelSet{ - { - Labels: []storepb.Label{ - {Name: "cluster", Value: "B"}, - }, - }, - { - Labels: []storepb.Label{ - {Name: CompatibilityTypeLabelName, Value: "store"}, - }, - }, - }, resp.LabelSets) + ops = append(ops, path.Join(id.String(), block.MetaFilename)) + if found { + ops = append(ops, + path.Join(id.String(), block.IndexCacheFilename), + path.Join(id.String(), block.IndexFilename), + ) + } + } + sort.Strings(ops) + return ops }