diff --git a/CHANGELOG.md b/CHANGELOG.md index e5c3ca393cc..f1f2732a799 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel ### Fixed +- []() Store: Improved synchronization of meta JSON files. Store now properly handles corrupted disk cache. Added meta.json sync metrics. - [#1856](https://github.com/thanos-io/thanos/pull/1856) Receive: close DBReadOnly after flushing to fix a memory leak. - [#1882](https://github.com/thanos-io/thanos/pull/1882) Receive: upload to object storage as 'receive' rather than 'sidecar'. - [#1907](https://github.com/thanos-io/thanos/pull/1907) Store: Fixed the duration unit for the metric `thanos_bucket_store_series_gate_duration_seconds`. diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 1ec81d87b6c..6937b9f4dfa 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -11,8 +11,10 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/relabel" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/extflag" + "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/prober" @@ -26,6 +28,8 @@ import ( yaml "gopkg.in/yaml.v2" ) +const fetcherConcurrency = 32 + // registerStore registers a store command. func registerStore(m map[string]setupFunc, app *kingpin.Application) { cmd := app.Command(component.Store.String(), "store node giving access to blocks in a bucket provider. Now supported GCS, S3, Azure, Swift and Tencent COS.") @@ -47,7 +51,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { Default("2GB").Bytes() maxSampleCount := cmd.Flag("store.grpc.series-sample-limit", - "Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: for efficiency we take 120 as the number of samples in chunk (it cannot be bigger than that), so the actual number of samples might be lower, even though the maximum could be hit."). + "Maximum amount of samples returned via a single Series call. 0 means no limit. NOTE: For efficiency we take 120 as the number of samples in chunk (it cannot be bigger than that), so the actual number of samples might be lower, even though the maximum could be hit."). Default("0").Uint() maxConcurrent := cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").Default("20").Int() @@ -57,7 +61,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { syncInterval := cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view."). Default("3m").Duration() - blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing blocks from object storage."). + blockSyncConcurrency := cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage."). Default("20").Int() minTime := model.TimeOrDuration(cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). @@ -128,7 +132,7 @@ func runStore( indexCacheSizeBytes uint64, chunkPoolSizeBytes uint64, maxSampleCount uint64, - maxConcurrent int, + maxConcurrency int, component component.Component, verbose bool, syncInterval time.Duration, @@ -202,19 +206,27 @@ func runStore( return errors.Wrap(err, "create index cache") } + metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg), + block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime).Filter, + block.NewLabelShardedMetaFilter(relabelConfig).Filter, + ) + if err != nil { + return errors.Wrap(err, "meta fetcher") + } + bs, err := store.NewBucketStore( logger, reg, bkt, + metaFetcher, dataDir, indexCache, chunkPoolSizeBytes, maxSampleCount, - maxConcurrent, + maxConcurrency, verbose, blockSyncConcurrency, filterConf, - relabelConfig, advertiseCompatibilityLabel, ) if err != nil { diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index ddb9edb114e..53b1306bc2c 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -22,7 +22,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/fileutil" @@ -190,7 +189,7 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { return &m } -// FilterConfig is a configuration, which Store uses for filtering metrics. +// FilterConfig is a configuration, which Store uses for filtering metrics based on time. type FilterConfig struct { MinTime, MaxTime model.TimeOrDurationValue } @@ -201,6 +200,7 @@ type BucketStore struct { logger log.Logger metrics *bucketStoreMetrics bucket objstore.BucketReader + fetcher block.MetadataFetcher dir string indexCache storecache.IndexCache chunkPool *pool.BytesPool @@ -222,9 +222,7 @@ type BucketStore struct { samplesLimiter *Limiter partitioner partitioner - filterConfig *FilterConfig - relabelConfig []*relabel.Config - + filterConfig *FilterConfig advLabelSets []storepb.LabelSet enableCompatibilityLabel bool } @@ -235,6 +233,7 @@ func NewBucketStore( logger log.Logger, reg prometheus.Registerer, bucket objstore.BucketReader, + fetcher block.MetadataFetcher, dir string, indexCache storecache.IndexCache, maxChunkPoolBytes uint64, @@ -242,8 +241,7 @@ func NewBucketStore( maxConcurrent int, debugLogging bool, blockSyncConcurrency int, - filterConf *FilterConfig, - relabelConfig []*relabel.Config, + filterConfig *FilterConfig, enableCompatibilityLabel bool, ) (*BucketStore, error) { if logger == nil { @@ -265,6 +263,7 @@ func NewBucketStore( s := &BucketStore{ logger: logger, bucket: bucket, + fetcher: fetcher, dir: dir, indexCache: indexCache, chunkPool: chunkPool, @@ -272,14 +271,13 @@ func NewBucketStore( blockSets: map[uint64]*bucketBlockSet{}, debugLogging: debugLogging, blockSyncConcurrency: blockSyncConcurrency, + filterConfig: filterConfig, queryGate: gate.NewGate( maxConcurrent, extprom.WrapRegistererWithPrefix("thanos_bucket_store_series_", reg), ), samplesLimiter: NewLimiter(maxSampleCount, metrics.queriesDropped), partitioner: gapBasedPartitioner{maxGapSize: maxGapSize}, - filterConfig: filterConf, - relabelConfig: relabelConfig, enableCompatibilityLabel: enableCompatibilityLabel, } s.metrics = metrics @@ -310,6 +308,12 @@ func (s *BucketStore) Close() (err error) { // SyncBlocks synchronizes the stores state with the Bucket bucket. // It will reuse disk space as persistent cache based on s.dir param. func (s *BucketStore) SyncBlocks(ctx context.Context) error { + metas, _, metaFetchErr := s.fetcher.Fetch(ctx) + // For partial view allow adding new blocks at least. + if metaFetchErr != nil && metas == nil { + return metaFetchErr + } + var wg sync.WaitGroup blockc := make(chan *metadata.Meta) @@ -318,7 +322,6 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { go func() { for meta := range blockc { if err := s.addBlock(ctx, meta); err != nil { - level.Warn(s.logger).Log("msg", "loading block failed", "id", meta.ULID, "err", err) continue } } @@ -326,65 +329,33 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { }() } - allIDs := map[ulid.ULID]struct{}{} - - err := s.bucket.Iter(ctx, "", func(name string) error { - // Strip trailing slash indicating a directory. - id, err := ulid.Parse(name[:len(name)-1]) - if err != nil { - return nil - } - - bdir := path.Join(s.dir, id.String()) - meta, err := loadMeta(ctx, s.logger, s.bucket, bdir, id) - if err != nil { - return errors.Wrap(err, "load meta") - } - - inRange, err := s.isBlockInMinMaxRange(ctx, meta) - if err != nil { - level.Warn(s.logger).Log("msg", "error parsing block range", "block", id, "err", err) - return os.RemoveAll(bdir) - } - - if !inRange { - return os.RemoveAll(bdir) - } - - // Check for block labels by relabeling. - // If output is empty, the block will be dropped. - if processedLabels := relabel.Process(labels.FromMap(meta.Thanos.Labels), s.relabelConfig...); processedLabels == nil { - level.Debug(s.logger).Log("msg", "ignoring block (drop in relabeling)", "block", id) - return os.RemoveAll(bdir) - } - - allIDs[id] = struct{}{} - + for id, meta := range metas { if b := s.getBlock(id); b != nil { - return nil + continue } select { case <-ctx.Done(): case blockc <- meta: } - return nil - }) + } close(blockc) wg.Wait() - if err != nil { - return errors.Wrap(err, "iter") + if metaFetchErr != nil { + return metaFetchErr } + // Drop all blocks that are no longer present in the bucket. for id := range s.blocks { - if _, ok := allIDs[id]; ok { + if _, ok := metas[id]; ok { continue } if err := s.removeBlock(id); err != nil { - level.Warn(s.logger).Log("msg", "drop outdated block", "block", id, "err", err) + level.Warn(s.logger).Log("msg", "drop outdated block failed", "block", id, "err", err) s.metrics.blockDropFailures.Inc() } + level.Debug(s.logger).Log("msg", "dropped outdated block", "block", id) s.metrics.blockDrops.Inc() } @@ -442,19 +413,6 @@ func (s *BucketStore) numBlocks() int { return len(s.blocks) } -func (s *BucketStore) isBlockInMinMaxRange(ctx context.Context, meta *metadata.Meta) (bool, error) { - // We check for blocks in configured minTime, maxTime range. - switch { - case meta.MaxTime <= s.filterConfig.MinTime.PrometheusTimestamp(): - return false, nil - - case meta.MinTime >= s.filterConfig.MaxTime.PrometheusTimestamp(): - return false, nil - } - - return true, nil -} - func (s *BucketStore) getBlock(id ulid.ULID) *bucketBlock { s.mtx.RLock() defer s.mtx.RUnlock() @@ -463,13 +421,22 @@ func (s *BucketStore) getBlock(id ulid.ULID) *bucketBlock { func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err error) { dir := filepath.Join(s.dir, meta.ULID.String()) + start := time.Now() + + if err := os.MkdirAll(dir, os.ModePerm); err != nil { + return errors.Wrap(err, "create dir") + } + level.Debug(s.logger).Log("msg", "loading new block", "id", meta.ULID) defer func() { if err != nil { s.metrics.blockLoadFailures.Inc() if err2 := os.RemoveAll(dir); err2 != nil { level.Warn(s.logger).Log("msg", "failed to remove block we cannot load", "err", err2) } + level.Warn(s.logger).Log("msg", "loading block failed", "elapsed", time.Since(start), "id", meta.ULID, "err", err) + } else { + level.Debug(s.logger).Log("msg", "loaded block", "elapsed", time.Since(start), "id", meta.ULID) } }() s.metrics.blockLoads.Inc() @@ -1226,31 +1193,6 @@ func (b *bucketBlock) indexCacheFilename() string { return path.Join(b.meta.ULID.String(), block.IndexCacheFilename) } -func loadMeta(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID) (*metadata.Meta, error) { - // If we haven't seen the block before or it is missing the meta.json, download it. - if _, err := os.Stat(path.Join(dir, block.MetaFilename)); os.IsNotExist(err) { - if err := os.MkdirAll(dir, 0777); err != nil { - return nil, errors.Wrap(err, "create dir") - } - src := path.Join(id.String(), block.MetaFilename) - - if err := objstore.DownloadFile(ctx, logger, bkt, src, dir); err != nil { - if bkt.IsObjNotFoundErr(errors.Cause(err)) { - level.Debug(logger).Log("msg", "meta file wasn't found. Block not ready or being deleted.", "block", id.String()) - } - return nil, errors.Wrap(err, "download meta.json") - } - } else if err != nil { - return nil, err - } - meta, err := metadata.Read(dir) - if err != nil { - return nil, errors.Wrap(err, "read meta.json") - } - - return meta, err -} - func (b *bucketBlock) loadIndexCacheFile(ctx context.Context) (err error) { cachefn := filepath.Join(b.dir, block.IndexCacheFilename) if err = b.loadIndexCacheFileFromFile(ctx, cachefn); err == nil { diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 01cf814eda0..83548a2ba44 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -2,6 +2,7 @@ package store import ( "context" + "fmt" "io/ioutil" "os" "path/filepath" @@ -25,11 +26,11 @@ import ( ) var ( - minTime = time.Unix(0, 0) - maxTime, _ = time.Parse(time.RFC3339, "9999-12-31T23:59:59Z") - minTimeDuration = model.TimeOrDurationValue{Time: &minTime} - maxTimeDuration = model.TimeOrDurationValue{Time: &maxTime} - filterConf = &FilterConfig{ + minTime = time.Unix(0, 0) + maxTime, _ = time.Parse(time.RFC3339, "9999-12-31T23:59:59Z") + minTimeDuration = model.TimeOrDurationValue{Time: &minTime} + maxTimeDuration = model.TimeOrDurationValue{Time: &maxTime} + allowAllFilterConf = &FilterConfig{ MinTime: minTimeDuration, MaxTime: maxTimeDuration, } @@ -80,7 +81,7 @@ type storeSuite struct { } func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt objstore.Bucket, - series []labels.Labels, extLset labels.Labels) (blocks int, minTime, maxTime int64) { + series []labels.Labels, extLset labels.Labels) (minTime, maxTime int64) { ctx := context.Background() logger := log.NewNopLogger() @@ -111,7 +112,6 @@ func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt o testutil.Ok(t, block.Upload(ctx, logger, bkt, dir1)) testutil.Ok(t, block.Upload(ctx, logger, bkt, dir2)) - blocks += 2 testutil.Ok(t, os.RemoveAll(dir1)) testutil.Ok(t, os.RemoveAll(dir2)) @@ -120,7 +120,7 @@ func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt o return } -func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxSampleCount uint64, relabelConfig []*relabel.Config) *storeSuite { +func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, maxSampleCount uint64, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite { series := []labels.Labels{ labels.FromStrings("a", "1", "b", "1"), labels.FromStrings("a", "1", "b", "2"), @@ -133,7 +133,7 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m } extLset := labels.FromStrings("ext1", "value1") - blocks, minTime, maxTime := prepareTestBlocks(t, time.Now(), 3, dir, bkt, + minTime, maxTime := prepareTestBlocks(t, time.Now(), 3, dir, bkt, series, extLset) s := &storeSuite{ @@ -143,7 +143,27 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m maxTime: maxTime, } - store, err := NewBucketStore(s.logger, nil, bkt, dir, s.cache, 0, maxSampleCount, 20, false, 20, filterConf, relabelConfig, true) + metaFetcher, err := block.NewMetaFetcher(s.logger, 20, bkt, dir, nil, + block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime).Filter, + block.NewLabelShardedMetaFilter(relabelConfig).Filter, + ) + testutil.Ok(t, err) + + store, err := NewBucketStore( + s.logger, + nil, + bkt, + metaFetcher, + dir, + s.cache, + 0, + maxSampleCount, + 20, + false, + 20, + filterConf, + true, + ) testutil.Ok(t, err) s.store = store @@ -155,10 +175,6 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m defer cancel() testutil.Ok(t, store.SyncBlocks(ctx)) - - if store.numBlocks() < blocks { - t.Fatalf("not all blocks loaded got %v, expected %v", store.numBlocks(), blocks) - } return s } @@ -371,7 +387,7 @@ func TestBucketStore_e2e(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, 0, emptyRelabelConfig, allowAllFilterConf) t.Log("Test with no index cache") s.cache.SwapWith(noopCache{}) @@ -419,7 +435,7 @@ func TestBucketStore_ManyParts_e2e(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - s := prepareStoreWithTestBlocks(t, dir, bkt, true, 0, emptyRelabelConfig) + s := prepareStoreWithTestBlocks(t, dir, bkt, true, 0, emptyRelabelConfig, allowAllFilterConf) indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(s.logger, nil, storecache.InMemoryIndexCacheConfig{ MaxItemSize: 1e5, @@ -441,69 +457,46 @@ func TestBucketStore_TimePartitioning_e2e(t *testing.T) { testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - series := []labels.Labels{ - labels.FromStrings("a", "1", "b", "1"), - labels.FromStrings("a", "1", "b", "1"), - labels.FromStrings("a", "1", "b", "1"), - labels.FromStrings("a", "1", "b", "1"), - labels.FromStrings("a", "1", "b", "2"), - labels.FromStrings("a", "1", "b", "2"), - labels.FromStrings("a", "1", "b", "2"), - labels.FromStrings("a", "1", "b", "2"), - } - extLset := labels.FromStrings("ext1", "value1") - - _, minTime, _ := prepareTestBlocks(t, time.Now(), 3, dir, bkt, series, extLset) - hourAfter := time.Now().Add(1 * time.Hour) filterMaxTime := model.TimeOrDurationValue{Time: &hourAfter} - store, err := NewBucketStore(nil, nil, bkt, dir, noopCache{}, 0, 0, 20, false, 20, - &FilterConfig{ - MinTime: minTimeDuration, - MaxTime: filterMaxTime, - }, emptyRelabelConfig, true) - testutil.Ok(t, err) - - err = store.SyncBlocks(ctx) - testutil.Ok(t, err) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, 241, emptyRelabelConfig, &FilterConfig{ + MinTime: minTimeDuration, + MaxTime: filterMaxTime, + }) + testutil.Ok(t, s.store.SyncBlocks(ctx)) - mint, maxt := store.TimeRange() - testutil.Equals(t, minTime, mint) + mint, maxt := s.store.TimeRange() + testutil.Equals(t, s.minTime, mint) testutil.Equals(t, filterMaxTime.PrometheusTimestamp(), maxt) - for i, tcase := range []struct { - req *storepb.SeriesRequest - expectedLabels [][]storepb.Label - expectedChunks int - }{ - { - req: &storepb.SeriesRequest{ - Matchers: []storepb.LabelMatcher{ - {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"}, - }, - MinTime: mint, - MaxTime: timestamp.FromTime(time.Now().AddDate(0, 0, 1)), - }, - expectedLabels: [][]storepb.Label{ - {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, - {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext2", Value: "value2"}}, - }, - // prepareTestBlocks makes 3 chunks containing 2 hour data, - // we should only get 1, as we are filtering. - expectedChunks: 1, + req := &storepb.SeriesRequest{ + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"}, }, - } { - t.Log("Run", i) + MinTime: mint, + MaxTime: timestamp.FromTime(time.Now().AddDate(0, 0, 1)), + } - srv := newStoreSeriesServer(ctx) + expectedLabels := [][]storepb.Label{ + {{Name: "a", Value: "1"}, {Name: "b", Value: "1"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "1"}, {Name: "b", Value: "2"}, {Name: "ext1", Value: "value1"}}, + {{Name: "a", Value: "1"}, {Name: "c", Value: "1"}, {Name: "ext2", Value: "value2"}}, + {{Name: "a", Value: "1"}, {Name: "c", Value: "2"}, {Name: "ext2", Value: "value2"}}, + } - testutil.Ok(t, store.Series(tcase.req, srv)) - testutil.Equals(t, len(tcase.expectedLabels), len(srv.SeriesSet)) + s.cache.SwapWith(noopCache{}) + srv := newStoreSeriesServer(ctx) - for i, s := range srv.SeriesSet { - testutil.Equals(t, tcase.expectedLabels[i], s.Labels) - testutil.Equals(t, tcase.expectedChunks, len(s.Chunks)) - } + testutil.Ok(t, s.store.Series(req, srv)) + testutil.Equals(t, len(expectedLabels), len(srv.SeriesSet)) + + for i, s := range srv.SeriesSet { + fmt.Println(s.Labels) + testutil.Equals(t, expectedLabels[i], s.Labels) + + // prepareTestBlocks makes 3 chunks containing 2 hour data, + // we should only get 1, as we are filtering by time. + testutil.Equals(t, 1, len(s.Chunks)) } } diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 8586036f6bf..913714f6555 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -2,6 +2,7 @@ package store import ( "context" + "fmt" "io" "io/ioutil" "math" @@ -19,14 +20,11 @@ import ( "github.com/leanovate/gopter/gen" "github.com/leanovate/gopter/prop" "github.com/oklog/ulid" - prommodel "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/relabel" - "github.com/prometheus/prometheus/pkg/timestamp" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact/downsample" - "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/inmem" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -439,6 +437,7 @@ func TestBucketStore_Info(t *testing.T) { nil, nil, nil, + nil, dir, noopCache{}, 2e5, @@ -446,8 +445,7 @@ func TestBucketStore_Info(t *testing.T) { 0, false, 20, - filterConf, - emptyRelabelConfig, + allowAllFilterConf, true, ) testutil.Ok(t, err) @@ -462,67 +460,6 @@ func TestBucketStore_Info(t *testing.T) { testutil.Equals(t, []storepb.Label(nil), resp.Labels) } -func TestBucketStore_isBlockInMinMaxRange(t *testing.T) { - ctx := context.TODO() - dir, err := ioutil.TempDir("", "block-min-max-test") - testutil.Ok(t, err) - - series := []labels.Labels{labels.FromStrings("a", "1", "b", "1")} - extLset := labels.FromStrings("ext1", "value1") - - // Create a block in range [-2w, -1w]. - id1, err := testutil.CreateBlock(ctx, dir, series, 10, - timestamp.FromTime(time.Now().Add(-14*24*time.Hour)), - timestamp.FromTime(time.Now().Add(-7*24*time.Hour)), - extLset, 0) - testutil.Ok(t, err) - - // Create a block in range [-1w, 0w]. - id2, err := testutil.CreateBlock(ctx, dir, series, 10, - timestamp.FromTime(time.Now().Add(-7*24*time.Hour)), - timestamp.FromTime(time.Now().Add(-0*24*time.Hour)), - extLset, 0) - testutil.Ok(t, err) - - // Create a block in range [+1w, +2w]. - id3, err := testutil.CreateBlock(ctx, dir, series, 10, - timestamp.FromTime(time.Now().Add(7*24*time.Hour)), - timestamp.FromTime(time.Now().Add(14*24*time.Hour)), - extLset, 0) - testutil.Ok(t, err) - - meta1, err := metadata.Read(path.Join(dir, id1.String())) - testutil.Ok(t, err) - meta2, err := metadata.Read(path.Join(dir, id2.String())) - testutil.Ok(t, err) - meta3, err := metadata.Read(path.Join(dir, id3.String())) - testutil.Ok(t, err) - - // Run actual test. - hourBeforeDur := prommodel.Duration(-1 * time.Hour) - hourBefore := model.TimeOrDurationValue{Dur: &hourBeforeDur} - - // bucketStore accepts blocks in range [0, now-1h]. - bucketStore, err := NewBucketStore(nil, nil, inmem.NewBucket(), dir, noopCache{}, 0, 0, 20, false, 20, - &FilterConfig{ - MinTime: minTimeDuration, - MaxTime: hourBefore, - }, emptyRelabelConfig, true) - testutil.Ok(t, err) - - inRange, err := bucketStore.isBlockInMinMaxRange(context.TODO(), meta1) - testutil.Ok(t, err) - testutil.Equals(t, true, inRange) - - inRange, err = bucketStore.isBlockInMinMaxRange(context.TODO(), meta2) - testutil.Ok(t, err) - testutil.Equals(t, true, inRange) - - inRange, err = bucketStore.isBlockInMinMaxRange(context.TODO(), meta3) - testutil.Ok(t, err) - testutil.Equals(t, false, inRange) -} - type recorder struct { mtx sync.Mutex objstore.Bucket @@ -731,11 +668,29 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul testutil.Ok(t, yaml.Unmarshal([]byte(sc.relabel), &relabelConf)) rec := &recorder{Bucket: bkt} - bucketStore, err := NewBucketStore(logger, nil, rec, dir, noopCache{}, 0, 0, 99, false, 20, - filterConf, relabelConf, true) + metaFetcher, err := block.NewMetaFetcher(logger, 20, bkt, dir, nil, + block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime).Filter, + block.NewLabelShardedMetaFilter(relabelConf).Filter, + ) + testutil.Ok(t, err) + + bucketStore, err := NewBucketStore( + logger, + nil, + rec, + metaFetcher, + dir, + noopCache{}, + 0, + 0, + 99, + false, + 20, + allowAllFilterConf, + true) testutil.Ok(t, err) - testutil.Ok(t, bucketStore.SyncBlocks(context.Background())) + testutil.Ok(t, bucketStore.InitialSync(context.Background())) // Check "stored" blocks. ids := make([]ulid.ULID, 0, len(bucketStore.blocks)) @@ -761,6 +716,7 @@ func testSharding(t *testing.T, reuseDisk string, bkt objstore.Bucket, all ...ul // Sort records. We load blocks concurrently so operations might be not ordered. sort.Strings(rec.touched) + fmt.Println(cached, sc.expectedIDs, all, rec.touched) if reuseDisk != "" { testutil.Equals(t, expectedTouchedBlockOps(all, sc.expectedIDs, cached), rec.touched) cached = sc.expectedIDs @@ -794,7 +750,6 @@ func expectedTouchedBlockOps(all []ulid.ULID, expected []ulid.ULID, cached []uli } } - ops = append(ops, path.Join(id.String(), block.MetaFilename)) if found { ops = append(ops, path.Join(id.String(), block.IndexCacheFilename),