From eaa68b7828d494ff026ecc7ad7da8098aa82a82b Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Tue, 13 Feb 2024 09:33:39 +0100 Subject: [PATCH] Allow using different listing strategies Signed-off-by: Filip Petkovski --- cmd/thanos/main_test.go | 1 + pkg/block/fetcher.go | 109 ++++++++++++++++++++++++++------ pkg/block/fetcher_test.go | 1 + pkg/compact/clean_test.go | 1 + pkg/compact/compact_e2e_test.go | 1 + pkg/compact/retention_test.go | 1 + pkg/store/acceptance_test.go | 1 + 7 files changed, 96 insertions(+), 19 deletions(-) diff --git a/cmd/thanos/main_test.go b/cmd/thanos/main_test.go index d8d1fffef00..c36183cebcc 100644 --- a/cmd/thanos/main_test.go +++ b/cmd/thanos/main_test.go @@ -21,6 +21,7 @@ import ( "github.com/thanos-io/objstore" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact/downsample" diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 828503e91b3..ba6c04540c2 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -170,26 +170,27 @@ func DefaultModifiedLabelValues() [][]string { } } -// Fetcher interface to retieve blockId information from a bucket. +// BlockIDsFetcher lists block IDs from a bucket. type BlockIDsFetcher interface { - // GetActiveBlocksIDs returning it via channel (streaming) and response. + // GetActiveAndPartialBlockIDs GetActiveBlocksIDs returning it via channel (streaming) and response. // Active blocks are blocks which contain meta.json, while partial blocks are blocks without meta.json GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) } -type BaseBlockIDsFetcher struct { +// RecursiveBlockIDsFetcher lists block IDs by iterating the object storage bucket recursively. +type RecursiveBlockIDsFetcher struct { logger log.Logger bkt objstore.InstrumentedBucketReader } -func NewBaseBlockIDsFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader) *BaseBlockIDsFetcher { - return &BaseBlockIDsFetcher{ +func NewRecursiveLister(logger log.Logger, bkt objstore.InstrumentedBucketReader) *RecursiveBlockIDsFetcher { + return &RecursiveBlockIDsFetcher{ logger: logger, bkt: bkt, } } -func (f *BaseBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) { +func (f *RecursiveBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) { partialBlocks = make(map[ulid.ULID]bool) err = f.bkt.Iter(ctx, "", func(name string) error { parts := strings.Split(name, "/") @@ -216,6 +217,76 @@ func (f *BaseBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, c return partialBlocks, err } +// BaseBlockIDsFetcher Default lists block IDs by doing a top level iteration of the bucket and using an Exists call to detect partial blocks. +type BaseBlockIDsFetcher struct { + logger log.Logger + bkt objstore.InstrumentedBucketReader +} + +func NewBaseBlockIDsFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader) *BaseBlockIDsFetcher { + return &BaseBlockIDsFetcher{ + logger: logger, + bkt: bkt, + } +} + +func (f *BaseBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) { + const concurrency = 64 + + var metaChan = make(chan ulid.ULID, concurrency) + if err = f.bkt.Iter(ctx, "", func(name string) error { + parts := strings.Split(name, "/") + dir, file := parts[0], parts[len(parts)-1] + id, ok := IsBlockDir(dir) + if !ok { + return nil + } + if !IsBlockMetaFile(file) { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case metaChan <- id: + } + return nil + }); err != nil { + return nil, err + } + + partialBlocks = make(map[ulid.ULID]bool) + var ( + eg errgroup.Group + mu sync.Mutex + ) + for i := 0; i < concurrency; i++ { + eg.Go(func() error { + for uid := range metaChan { + // TODO(bwplotka): If that causes problems (obj store rate limits), add longer ttl to cached items. + // For 1y and 100 block sources this generates ~1.5-3k HEAD RPM. AWS handles 330k RPM per prefix. + // TODO(bwplotka): Consider filtering by consistency delay here (can't do until compactor healthyOverride work). + metaFile := path.Join(uid.String(), MetaFilename) + ok, err := f.bkt.Exists(ctx, metaFile) + if err != nil { + return errors.Wrapf(err, "meta.json file exists: %v", uid) + } + if !ok { + mu.Lock() + partialBlocks[uid] = true + mu.Unlock() + return ErrorSyncMetaNotFound + } + ch <- uid + } + return nil + }) + } + if err := eg.Wait(); err != nil { + return nil, err + } + return partialBlocks, nil +} + type MetadataFetcher interface { Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error) UpdateOnChange(func([]metadata.Meta, error)) @@ -234,10 +305,10 @@ type MetadataFilter interface { // BaseFetcher is a struct that synchronizes filtered metadata of all block in the object storage with the local state. // Go-routine safe. type BaseFetcher struct { - logger log.Logger - concurrency int - bkt objstore.InstrumentedBucketReader - blockIDsFetcher BlockIDsFetcher + logger log.Logger + concurrency int + bkt objstore.InstrumentedBucketReader + blockIDsLister BlockIDsFetcher // Optional local directory to cache meta.json files. cacheDir string @@ -254,7 +325,7 @@ func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.Instrumente } // NewBaseFetcherWithMetrics constructs BaseFetcher. -func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, metrics *BaseFetcherMetrics) (*BaseFetcher, error) { +func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsLister BlockIDsFetcher, dir string, metrics *BaseFetcherMetrics) (*BaseFetcher, error) { if logger == nil { logger = log.NewNopLogger() } @@ -268,13 +339,13 @@ func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore. } return &BaseFetcher{ - logger: log.With(logger, "component", "block.BaseFetcher"), - concurrency: concurrency, - bkt: bkt, - blockIDsFetcher: blockIDsFetcher, - cacheDir: cacheDir, - cached: map[ulid.ULID]*metadata.Meta{}, - syncs: metrics.Syncs, + logger: log.With(logger, "component", "block.BaseFetcher"), + concurrency: concurrency, + bkt: bkt, + blockIDsLister: blockIDsLister, + cacheDir: cacheDir, + cached: map[ulid.ULID]*metadata.Meta{}, + syncs: metrics.Syncs, }, nil } @@ -445,7 +516,7 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { // Workers scheduled, distribute blocks. eg.Go(func() error { defer close(ch) - partialBlocks, err = f.blockIDsFetcher.GetActiveAndPartialBlockIDs(ctx, ch) + partialBlocks, err = f.blockIDsLister.GetActiveAndPartialBlockIDs(ctx, ch) return err }) diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index 11384e88fac..130c9830dc0 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -27,6 +27,7 @@ import ( "github.com/thanos-io/objstore/objtesting" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/model" diff --git a/pkg/compact/clean_test.go b/pkg/compact/clean_test.go index b66ca9f018a..ea3d1fa79d3 100644 --- a/pkg/compact/clean_test.go +++ b/pkg/compact/clean_test.go @@ -19,6 +19,7 @@ import ( "github.com/thanos-io/objstore" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" ) diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index f1e01ec4f49..13e0cae6f61 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -27,6 +27,7 @@ import ( "github.com/thanos-io/objstore/objtesting" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/dedup" diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go index c1936f095a9..b7b4464b364 100644 --- a/pkg/compact/retention_test.go +++ b/pkg/compact/retention_test.go @@ -21,6 +21,7 @@ import ( "github.com/thanos-io/objstore" "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index be1a1179f1f..bcd164047d1 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -26,6 +26,7 @@ import ( "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/providers/filesystem" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component"