Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetcher: Add a BlockIDsFetcher Interface to BaseFetcher #6902

Merged
merged 8 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ func runCompact(
consistencyDelayMetaFilter := block.NewConsistencyDelayMetaFilter(logger, conf.consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg))
timePartitionMetaFilter := block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime)

baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg))
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, baseBlockIDsFetcher, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ func RunDownsample(
insBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name()))

// While fetching blocks, filter out blocks that were marked for no downsample.
metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
block.NewDeduplicateFilter(block.FetcherConcurrency),
downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt, block.FetcherConcurrency),
})
Expand Down
6 changes: 4 additions & 2 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ func TestRegression4960_Deadlock(t *testing.T) {

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.GroupKey())))
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, "", nil, nil)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt)
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, baseBlockIDsFetcher, "", nil, nil)
testutil.Ok(t, err)

metas, _, err := metaFetcher.Fetch(ctx)
Expand Down Expand Up @@ -196,7 +197,8 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.GroupKey())))
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, "", nil, nil)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt)
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, baseBlockIDsFetcher, "", nil, nil)
testutil.Ok(t, err)

metas, _, err := metaFetcher.Fetch(ctx)
Expand Down
3 changes: 2 additions & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,8 @@ func runStore(
}

ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, time.Duration(conf.ignoreDeletionMarksDelay), conf.blockMetaFetchConcurrency)
metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, baseBlockIDsFetcher, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
[]block.MetadataFilter{
block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
Expand Down
18 changes: 12 additions & 6 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path

// We ignore any block that has the deletion marker file.
filters := []block.MetadataFilter{block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency)}
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters)
if err != nil {
return err
}
Expand Down Expand Up @@ -407,7 +408,8 @@ func registerBucketLs(app extkingpin.AppClause, objStoreConfig *extflag.PathOrCo
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency)
filters = append(filters, ignoreDeletionMarkFilter)
}
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters)
if err != nil {
return err
}
Expand Down Expand Up @@ -508,7 +510,8 @@ func registerBucketInspect(app extkingpin.AppClause, objStoreConfig *extflag.Pat
}
insBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name()))

fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -651,7 +654,8 @@ func registerBucketWeb(app extkingpin.AppClause, objStoreConfig *extflag.PathOrC
return err
}
// TODO(bwplotka): Allow Bucket UI to visualize the state of block as well.
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg),
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg),
[]block.MetadataFilter{
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
Expand Down Expand Up @@ -829,7 +833,8 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat

var sy *compact.Syncer
{
baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
Expand Down Expand Up @@ -1371,7 +1376,8 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P

var sy *compact.Syncer
{
baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
Expand Down
112 changes: 70 additions & 42 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,52 @@ func DefaultModifiedLabelValues() [][]string {
}
}

// Fetcher interface to retieve blockId information from a bucket.
type BlockIDsFetcher interface {
// GetActiveBlocksIDs returning it via channel (streaming) and response.
// Active blocks are blocks which contain meta.json, while partial blocks are blocks without meta.json
GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error)
}

type BaseBlockIDsFetcher struct {
logger log.Logger
bkt objstore.InstrumentedBucketReader
}

func NewBaseBlockIDsFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader) *BaseBlockIDsFetcher {
return &BaseBlockIDsFetcher{
logger: logger,
bkt: bkt,
}
}

func (f *BaseBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) {
partialBlocks = make(map[ulid.ULID]bool)
err = f.bkt.Iter(ctx, "", func(name string) error {
parts := strings.Split(name, "/")
dir, file := parts[0], parts[len(parts)-1]
id, ok := IsBlockDir(dir)
if !ok {
return nil
}
if _, ok := partialBlocks[id]; !ok {
partialBlocks[id] = true
}
if !IsBlockMetaFile(file) {
return nil
}
partialBlocks[id] = false

select {
case <-ctx.Done():
return ctx.Err()
case ch <- id:
}
return nil
}, objstore.WithRecursiveIter)
return partialBlocks, err
}

type MetadataFetcher interface {
Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error)
UpdateOnChange(func([]metadata.Meta, error))
Expand All @@ -188,9 +234,10 @@ type MetadataFilter interface {
// BaseFetcher is a struct that synchronizes filtered metadata of all block in the object storage with the local state.
// Go-routine safe.
type BaseFetcher struct {
logger log.Logger
concurrency int
bkt objstore.InstrumentedBucketReader
logger log.Logger
concurrency int
bkt objstore.InstrumentedBucketReader
blockIDsFetcher BlockIDsFetcher

// Optional local directory to cache meta.json files.
cacheDir string
Expand All @@ -202,12 +249,12 @@ type BaseFetcher struct {
}

// NewBaseFetcher constructs BaseFetcher.
func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, dir string, reg prometheus.Registerer) (*BaseFetcher, error) {
return NewBaseFetcherWithMetrics(logger, concurrency, bkt, dir, NewBaseFetcherMetrics(reg))
func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, reg prometheus.Registerer) (*BaseFetcher, error) {
return NewBaseFetcherWithMetrics(logger, concurrency, bkt, blockIDsFetcher, dir, NewBaseFetcherMetrics(reg))
}

// NewBaseFetcherWithMetrics constructs BaseFetcher.
func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, dir string, metrics *BaseFetcherMetrics) (*BaseFetcher, error) {
func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, metrics *BaseFetcherMetrics) (*BaseFetcher, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -221,33 +268,34 @@ func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.
}

return &BaseFetcher{
logger: log.With(logger, "component", "block.BaseFetcher"),
concurrency: concurrency,
bkt: bkt,
cacheDir: cacheDir,
cached: map[ulid.ULID]*metadata.Meta{},
syncs: metrics.Syncs,
logger: log.With(logger, "component", "block.BaseFetcher"),
concurrency: concurrency,
bkt: bkt,
blockIDsFetcher: blockIDsFetcher,
cacheDir: cacheDir,
cached: map[ulid.ULID]*metadata.Meta{},
syncs: metrics.Syncs,
}, nil
}

// NewRawMetaFetcher returns basic meta fetcher without proper handling for eventual consistent backends or partial uploads.
// NOTE: Not suitable to use in production.
func NewRawMetaFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader) (*MetaFetcher, error) {
return NewMetaFetcher(logger, 1, bkt, "", nil, nil)
func NewRawMetaFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher) (*MetaFetcher, error) {
return NewMetaFetcher(logger, 1, bkt, blockIDsFetcher, "", nil, nil)
}

// NewMetaFetcher returns meta fetcher.
func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, dir string, reg prometheus.Registerer, filters []MetadataFilter) (*MetaFetcher, error) {
b, err := NewBaseFetcher(logger, concurrency, bkt, dir, reg)
func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, reg prometheus.Registerer, filters []MetadataFilter) (*MetaFetcher, error) {
b, err := NewBaseFetcher(logger, concurrency, bkt, blockIDsFetcher, dir, reg)
if err != nil {
return nil, err
}
return b.NewMetaFetcher(reg, filters), nil
}

// NewMetaFetcherWithMetrics returns meta fetcher.
func NewMetaFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, dir string, baseFetcherMetrics *BaseFetcherMetrics, fetcherMetrics *FetcherMetrics, filters []MetadataFilter) (*MetaFetcher, error) {
b, err := NewBaseFetcherWithMetrics(logger, concurrency, bkt, dir, baseFetcherMetrics)
func NewMetaFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, baseFetcherMetrics *BaseFetcherMetrics, fetcherMetrics *FetcherMetrics, filters []MetadataFilter) (*MetaFetcher, error) {
b, err := NewBaseFetcherWithMetrics(logger, concurrency, bkt, blockIDsFetcher, dir, baseFetcherMetrics)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -392,33 +440,13 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) {
})
}

partialBlocks := make(map[ulid.ULID]bool)
var partialBlocks map[ulid.ULID]bool
var err error
// Workers scheduled, distribute blocks.
eg.Go(func() error {
defer close(ch)
return f.bkt.Iter(ctx, "", func(name string) error {
parts := strings.Split(name, "/")
dir, file := parts[0], parts[len(parts)-1]
id, ok := IsBlockDir(dir)
if !ok {
return nil
}
if _, ok := partialBlocks[id]; !ok {
partialBlocks[id] = true
}
if !IsBlockMetaFile(file) {
return nil
}
partialBlocks[id] = false

select {
case <-ctx.Done():
return ctx.Err()
case ch <- id:
}

return nil
}, objstore.WithRecursiveIter)
partialBlocks, err = f.blockIDsFetcher.GetActiveAndPartialBlockIDs(ctx, ch)
return err
})

if err := eg.Wait(); err != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ func TestMetaFetcher_Fetch(t *testing.T) {

var ulidToDelete ulid.ULID
r := prometheus.NewRegistry()
baseFetcher, err := NewBaseFetcher(log.NewNopLogger(), 20, objstore.WithNoopInstr(bkt), dir, r)
noopLogger := log.NewNopLogger()
insBkt := objstore.WithNoopInstr(bkt)
baseBlockIDsFetcher := NewBaseBlockIDsFetcher(noopLogger, insBkt)
baseFetcher, err := NewBaseFetcher(noopLogger, 20, insBkt, baseBlockIDsFetcher, dir, r)
testutil.Ok(t, err)

fetcher := baseFetcher.NewMetaFetcher(r, []MetadataFilter{
Expand Down
3 changes: 2 additions & 1 deletion pkg/compact/clean_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ func TestBestEffortCleanAbortedPartialUploads(t *testing.T) {
bkt := objstore.WithNoopInstr(objstore.NewInMemBucket())
logger := log.NewNopLogger()

metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt)
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, baseBlockIDsFetcher, "", nil, nil)
testutil.Ok(t, err)

// 1. No meta, old block, should be removed.
Expand Down
12 changes: 9 additions & 3 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
}

duplicateBlocksFilter := block.NewDeduplicateFilter(fetcherConcurrency)
metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.WithNoopInstr(bkt), "", nil, []block.MetadataFilter{
insBkt := objstore.WithNoopInstr(bkt)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(nil, insBkt)
metaFetcher, err := block.NewMetaFetcher(nil, 32, insBkt, baseBlockIDsFetcher, "", nil, []block.MetadataFilter{
duplicateBlocksFilter,
})
testutil.Ok(t, err)
Expand Down Expand Up @@ -194,7 +196,9 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, objstore.WithNoopInstr(bkt), 48*time.Hour, fetcherConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter(fetcherConcurrency)
noCompactMarkerFilter := NewGatherNoCompactionMarkFilter(logger, objstore.WithNoopInstr(bkt), 2)
metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.WithNoopInstr(bkt), "", nil, []block.MetadataFilter{
insBkt := objstore.WithNoopInstr(bkt)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(nil, 32, insBkt, baseBlockIDsFetcher, "", nil, []block.MetadataFilter{
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
noCompactMarkerFilter,
Expand Down Expand Up @@ -504,7 +508,9 @@ func TestGarbageCollectDoesntCreateEmptyBlocksWithDeletionMarksOnly(t *testing.T
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(nil, objstore.WithNoopInstr(bkt), 48*time.Hour, fetcherConcurrency)

duplicateBlocksFilter := block.NewDeduplicateFilter(fetcherConcurrency)
metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.WithNoopInstr(bkt), "", nil, []block.MetadataFilter{
insBkt := objstore.WithNoopInstr(bkt)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(nil, 32, insBkt, baseBlockIDsFetcher, "", nil, []block.MetadataFilter{
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
})
Expand Down
3 changes: 2 additions & 1 deletion pkg/compact/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) {
uploadMockBlock(t, bkt, b.id, b.minTime, b.maxTime, int64(b.resolution))
}

metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", nil, nil)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt)
metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, baseBlockIDsFetcher, "", nil, nil)
testutil.Ok(t, err)

blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
Expand Down
2 changes: 2 additions & 0 deletions pkg/replicate/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,12 @@ func newMetaFetcher(
if ignoreMarkedForDeletion {
filters = append(filters, thanosblock.NewIgnoreDeletionMarkFilter(logger, fromBkt, 0, concurrency))
}
baseBlockIDsFetcher := thanosblock.NewBaseBlockIDsFetcher(logger, fromBkt)
return thanosblock.NewMetaFetcher(
logger,
concurrency,
fromBkt,
baseBlockIDsFetcher,
"",
reg,
filters,
Expand Down
4 changes: 3 additions & 1 deletion pkg/store/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,9 @@ func TestBucketStore_Acceptance(t *testing.T) {
chunkPool, err := NewDefaultChunkBytesPool(2e5)
testutil.Ok(tt, err)

metaFetcher, err := block.NewMetaFetcher(logger, 20, objstore.WithNoopInstr(bkt), metaDir, nil, []block.MetadataFilter{
insBkt := objstore.WithNoopInstr(bkt)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(logger, 20, insBkt, baseBlockIDsFetcher, metaDir, nil, []block.MetadataFilter{
block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime),
})
testutil.Ok(tt, err)
Expand Down
4 changes: 3 additions & 1 deletion pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m
maxTime: maxTime,
}

metaFetcher, err := block.NewMetaFetcher(s.logger, 20, objstore.WithNoopInstr(bkt), dir, nil, []block.MetadataFilter{
insBkt := objstore.WithNoopInstr(bkt)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(s.logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(s.logger, 20, insBkt, baseBlockIDsFetcher, dir, nil, []block.MetadataFilter{
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
})
Expand Down
Loading
Loading