diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ee8f3a290..6a90d23e84 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ * [CHANGE] Ruler: Remove `cortex_ruler_write_requests_total`, `cortex_ruler_write_requests_failed_total`, `cortex_ruler_queries_total`, `cortex_ruler_queries_failed_total`, and `cortex_ruler_query_seconds_total` metrics for the tenant when the ruler deletes the manager for the tenant. #5772 * [CHANGE] Main: Mark `mem-ballast-size-bytes` flag as deprecated. #5816 * [CHANGE] Querier: Mark `-querier.ingester-streaming` flag as deprecated. Now query ingester streaming is always enabled. #5817 -* [CHANGE] AlertManager API: Removal of all api/v1/ endpoints following [2970](https://github.com/prometheus/alertmanager/pull/2970). [5841] +* [CHANGE] Compactor/Bucket Store: Added `-blocks-storage.bucket-store.block-discovery-strategy` to configure different block listing strategy. Reverted the current recursive block listing mechanism and use the strategy `Concurrent` as in 1.15. #5828 * [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477 * [FEATURE] Query Frontend/Scheduler: Add query priority support. #5605 * [FEATURE] Tracing: Add `kuberesolver` to resolve endpoints address with `kubernetes://` prefix as Kubernetes service. #5731 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index cc9b5f0479..bdb418cea1 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -1314,6 +1314,17 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.bucket-index.max-stale-period [max_stale_period: | default = 1h] + # One of concurrent, recursive, bucket_index. When set to concurrent, stores + # will concurrently issue one call per directory to discover active blocks + # in the bucket. The recursive strategy iterates through all objects in the + # bucket, recursively traversing into each directory. This avoids N+1 calls + # at the expense of having slower bucket iterations. bucket_index strategy + # can be used in Compactor only and utilizes the existing bucket index to + # fetch block IDs to sync. This avoids iterating the bucket but can be + # impacted by delays of cleaner creating bucket index. + # CLI flag: -blocks-storage.bucket-store.block-discovery-strategy + [block_discovery_strategy: | default = "concurrent"] + # Max size - in bytes - of a chunks pool, used to reduce memory allocations. # The pool is shared across all tenants. 0 to disable the limit. # CLI flag: -blocks-storage.bucket-store.max-chunk-pool-bytes diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 51bc6a5933..a33b129bb9 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -1423,6 +1423,17 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.bucket-index.max-stale-period [max_stale_period: | default = 1h] + # One of concurrent, recursive, bucket_index. When set to concurrent, stores + # will concurrently issue one call per directory to discover active blocks + # in the bucket. The recursive strategy iterates through all objects in the + # bucket, recursively traversing into each directory. This avoids N+1 calls + # at the expense of having slower bucket iterations. bucket_index strategy + # can be used in Compactor only and utilizes the existing bucket index to + # fetch block IDs to sync. This avoids iterating the bucket but can be + # impacted by delays of cleaner creating bucket index. + # CLI flag: -blocks-storage.bucket-store.block-discovery-strategy + [block_discovery_strategy: | default = "concurrent"] + # Max size - in bytes - of a chunks pool, used to reduce memory allocations. # The pool is shared across all tenants. 0 to disable the limit. # CLI flag: -blocks-storage.bucket-store.max-chunk-pool-bytes diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 68aa08dd67..f28da2b8cf 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -1850,6 +1850,17 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.bucket-index.max-stale-period [max_stale_period: | default = 1h] + # One of concurrent, recursive, bucket_index. When set to concurrent, stores + # will concurrently issue one call per directory to discover active blocks in + # the bucket. The recursive strategy iterates through all objects in the + # bucket, recursively traversing into each directory. This avoids N+1 calls at + # the expense of having slower bucket iterations. bucket_index strategy can be + # used in Compactor only and utilizes the existing bucket index to fetch block + # IDs to sync. This avoids iterating the bucket but can be impacted by delays + # of cleaner creating bucket index. + # CLI flag: -blocks-storage.bucket-store.block-discovery-strategy + [block_discovery_strategy: | default = "concurrent"] + # Max size - in bytes - of a chunks pool, used to reduce memory allocations. # The pool is shared across all tenants. 0 to disable the limit. # CLI flag: -blocks-storage.bucket-store.max-chunk-pool-bytes diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 3e461ee71d..d366bda557 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -827,22 +827,26 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { // out of order chunks or index file too big. noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(ulogger, bucket, c.compactorCfg.MetaSyncConcurrency) - var blockIDsFetcher block.Lister - var fetcherULogger log.Logger - if c.storageCfg.BucketStore.BucketIndex.Enabled { - fetcherULogger = log.With(ulogger, "blockIdsFetcher", "BucketIndexBlockIDsFetcher") - blockIDsFetcher = bucketindex.NewBlockIDsFetcher(fetcherULogger, c.bucketClient, userID, c.limits) - - } else { - fetcherULogger = log.With(ulogger, "blockIdsFetcher", "BaseBlockIDsFetcher") - blockIDsFetcher = block.NewRecursiveLister(fetcherULogger, bucket) + var blockLister block.Lister + switch cortex_tsdb.BlockDiscoveryStrategy(c.storageCfg.BucketStore.BlockDiscoveryStrategy) { + case cortex_tsdb.ConcurrentDiscovery: + blockLister = block.NewConcurrentLister(ulogger, bucket) + case cortex_tsdb.RecursiveDiscovery: + blockLister = block.NewRecursiveLister(ulogger, bucket) + case cortex_tsdb.BucketIndexDiscovery: + if !c.storageCfg.BucketStore.BucketIndex.Enabled { + return cortex_tsdb.ErrInvalidBucketIndexBlockDiscoveryStrategy + } + blockLister = bucketindex.NewBlockLister(ulogger, c.bucketClient, userID, c.limits) + default: + return cortex_tsdb.ErrBlockDiscoveryStrategy } fetcher, err := block.NewMetaFetcher( - fetcherULogger, + ulogger, c.compactorCfg.MetaSyncConcurrency, bucket, - blockIDsFetcher, + blockLister, c.metaSyncDirForUser(userID), reg, // List of filters to apply (order matters). diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 891c66c596..726a167bee 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1673,6 +1673,7 @@ func prepareConfig() Config { func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.InstrumentedBucket, limits *validation.Limits) (*Compactor, *tsdbCompactorMock, *tsdbPlannerMock, *concurrency.SyncBuffer, prometheus.Gatherer) { storageCfg := cortex_tsdb.BlocksStorageConfig{} flagext.DefaultValues(&storageCfg) + storageCfg.BucketStore.BlockDiscoveryStrategy = string(cortex_tsdb.RecursiveDiscovery) // Create a temporary directory for compactor data. compactorCfg.DataDir = t.TempDir() diff --git a/pkg/querier/blocks_finder_bucket_scan.go b/pkg/querier/blocks_finder_bucket_scan.go index 26f95fd04a..15e53dd66d 100644 --- a/pkg/querier/blocks_finder_bucket_scan.go +++ b/pkg/querier/blocks_finder_bucket_scan.go @@ -43,6 +43,8 @@ type BucketScanBlocksFinderConfig struct { ConsistencyDelay time.Duration IgnoreDeletionMarksDelay time.Duration IgnoreBlocksWithin time.Duration + + BlockDiscoveryStrategy string } // BucketScanBlocksFinder is a BlocksFinder implementation periodically scanning the bucket to discover blocks. @@ -384,12 +386,25 @@ func (d *BucketScanBlocksFinder) createMetaFetcher(userID string) (block.Metadat filters = append(filters, storegateway.NewIgnoreNonQueryableBlocksFilter(d.logger, d.cfg.IgnoreBlocksWithin)) } - blockIdsFetcher := block.NewRecursiveLister(userLogger, userBucket) + var ( + err error + blockLister block.Lister + ) + switch cortex_tsdb.BlockDiscoveryStrategy(d.cfg.BlockDiscoveryStrategy) { + case cortex_tsdb.ConcurrentDiscovery: + blockLister = block.NewConcurrentLister(userLogger, userBucket) + case cortex_tsdb.RecursiveDiscovery: + blockLister = block.NewRecursiveLister(userLogger, userBucket) + case cortex_tsdb.BucketIndexDiscovery: + return nil, nil, nil, cortex_tsdb.ErrInvalidBucketIndexBlockDiscoveryStrategy + default: + return nil, nil, nil, cortex_tsdb.ErrBlockDiscoveryStrategy + } f, err := block.NewMetaFetcher( userLogger, d.cfg.MetasConcurrency, userBucket, - blockIdsFetcher, + blockLister, // The fetcher stores cached metas in the "meta-syncer/" sub directory. filepath.Join(d.cfg.CacheDir, userID), userReg, diff --git a/pkg/querier/blocks_finder_bucket_scan_test.go b/pkg/querier/blocks_finder_bucket_scan_test.go index 73932cb633..6dab2ca866 100644 --- a/pkg/querier/blocks_finder_bucket_scan_test.go +++ b/pkg/querier/blocks_finder_bucket_scan_test.go @@ -521,5 +521,6 @@ func prepareBucketScanBlocksFinderConfig() BucketScanBlocksFinderConfig { MetasConcurrency: 10, IgnoreDeletionMarksDelay: time.Hour, IgnoreBlocksWithin: 10 * time.Hour, // All blocks created in the last 10 hour shouldn't be scanned. + BlockDiscoveryStrategy: string(cortex_tsdb.RecursiveDiscovery), } } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 7d80d0f605..66769f4f62 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -219,6 +219,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa CacheDir: storageCfg.BucketStore.SyncDir, IgnoreDeletionMarksDelay: storageCfg.BucketStore.IgnoreDeletionMarksDelay, IgnoreBlocksWithin: storageCfg.BucketStore.IgnoreBlocksWithin, + BlockDiscoveryStrategy: storageCfg.BucketStore.BlockDiscoveryStrategy, }, bucketClient, limits, logger, reg) } diff --git a/pkg/storage/tsdb/bucketindex/block_ids_fetcher.go b/pkg/storage/tsdb/bucketindex/block_ids_fetcher.go index 5914322eee..5a9d3470f9 100644 --- a/pkg/storage/tsdb/bucketindex/block_ids_fetcher.go +++ b/pkg/storage/tsdb/bucketindex/block_ids_fetcher.go @@ -13,40 +13,40 @@ import ( "github.com/cortexproject/cortex/pkg/storage/bucket" ) -type BlockIDsFetcher struct { - logger log.Logger - bkt objstore.Bucket - userID string - cfgProvider bucket.TenantConfigProvider - baseBlockIDsFetcher block.Lister +type BlockLister struct { + logger log.Logger + bkt objstore.Bucket + userID string + cfgProvider bucket.TenantConfigProvider + baseLister block.Lister } -func NewBlockIDsFetcher(logger log.Logger, bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider) *BlockIDsFetcher { +func NewBlockLister(logger log.Logger, bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider) *BlockLister { userBkt := bucket.NewUserBucketClient(userID, bkt, cfgProvider) - baseBlockIDsFetcher := block.NewRecursiveLister(logger, userBkt) - return &BlockIDsFetcher{ - logger: logger, - bkt: bkt, - userID: userID, - cfgProvider: cfgProvider, - baseBlockIDsFetcher: baseBlockIDsFetcher, + baseLister := block.NewConcurrentLister(logger, userBkt) + return &BlockLister{ + logger: logger, + bkt: bkt, + userID: userID, + cfgProvider: cfgProvider, + baseLister: baseLister, } } -func (f *BlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) { +func (f *BlockLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) { // Fetch the bucket index. idx, err := ReadIndex(ctx, f.bkt, f.userID, f.cfgProvider, f.logger) if errors.Is(err, ErrIndexNotFound) { // This is a legit case happening when the first blocks of a tenant have recently been uploaded by ingesters // and their bucket index has not been created yet. // Fallback to BaseBlockIDsFetcher. - return f.baseBlockIDsFetcher.GetActiveAndPartialBlockIDs(ctx, ch) + return f.baseLister.GetActiveAndPartialBlockIDs(ctx, ch) } if errors.Is(err, ErrIndexCorrupted) { // In case a single tenant bucket index is corrupted, we want to return empty active blocks and parital blocks, so skipping this compaction cycle level.Error(f.logger).Log("msg", "corrupted bucket index found", "user", f.userID, "err", err) // Fallback to BaseBlockIDsFetcher. - return f.baseBlockIDsFetcher.GetActiveAndPartialBlockIDs(ctx, ch) + return f.baseLister.GetActiveAndPartialBlockIDs(ctx, ch) } if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { diff --git a/pkg/storage/tsdb/bucketindex/block_ids_fetcher_test.go b/pkg/storage/tsdb/bucketindex/block_ids_fetcher_test.go index bd25f15dcf..5e64875ad1 100644 --- a/pkg/storage/tsdb/bucketindex/block_ids_fetcher_test.go +++ b/pkg/storage/tsdb/bucketindex/block_ids_fetcher_test.go @@ -42,7 +42,7 @@ func TestBlockIDsFetcher_Fetch(t *testing.T) { UpdatedAt: now.Unix(), })) - blockIdsFetcher := NewBlockIDsFetcher(logger, bkt, userID, nil) + blockIdsFetcher := NewBlockLister(logger, bkt, userID, nil) ch := make(chan ulid.ULID) var wg sync.WaitGroup var blockIds []ulid.ULID @@ -94,7 +94,7 @@ func TestBlockIDsFetcherFetcher_Fetch_NoBucketIndex(t *testing.T) { require.NoError(t, json.NewEncoder(&buf).Encode(mark)) require.NoError(t, bkt.Upload(ctx, path.Join(userID, mark.ID.String(), metadata.DeletionMarkFilename), &buf)) } - blockIdsFetcher := NewBlockIDsFetcher(logger, bkt, userID, nil) + blockIdsFetcher := NewBlockLister(logger, bkt, userID, nil) ch := make(chan ulid.ULID) var wg sync.WaitGroup var blockIds []ulid.ULID diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index cfeb58c1c6..8cb5f36a47 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -14,6 +14,7 @@ import ( "github.com/thanos-io/thanos/pkg/store" "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/util" ) const ( @@ -48,6 +49,9 @@ var ( errInvalidStripeSize = errors.New("invalid TSDB stripe size") errInvalidOutOfOrderCapMax = errors.New("invalid TSDB OOO chunks capacity (in samples)") errEmptyBlockranges = errors.New("empty block ranges for TSDB") + + ErrInvalidBucketIndexBlockDiscoveryStrategy = errors.New("bucket index block discovery strategy can only be enabled when bucket index is enabled") + ErrBlockDiscoveryStrategy = errors.New("invalid block discovery strategy") ) // BlocksStorageConfig holds the config information for the blocks storage. @@ -252,6 +256,7 @@ type BucketStoreConfig struct { IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"` IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within"` BucketIndex BucketIndexConfig `yaml:"bucket_index"` + BlockDiscoveryStrategy string `yaml:"block_discovery_strategy"` // Chunk pool. MaxChunkPoolBytes uint64 `yaml:"max_chunk_pool_bytes"` @@ -315,6 +320,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.Uint64Var(&cfg.EstimatedMaxChunkSizeBytes, "blocks-storage.bucket-store.estimated-max-chunk-size-bytes", store.EstimatedMaxChunkSize, "Estimated max chunk size in bytes. Setting a large value might result in over fetching data while a small value might result in data refetch. Default value is 16KiB.") f.BoolVar(&cfg.LazyExpandedPostingsEnabled, "blocks-storage.bucket-store.lazy-expanded-postings-enabled", false, "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings.") f.IntVar(&cfg.SeriesBatchSize, "blocks-storage.bucket-store.series-batch-size", store.SeriesBatchSize, "Controls how many series to fetch per batch in Store Gateway. Default value is 10000.") + f.StringVar(&cfg.BlockDiscoveryStrategy, "blocks-storage.bucket-store.block-discovery-strategy", string(ConcurrentDiscovery), "One of "+strings.Join(supportedBlockDiscoveryStrategies, ", ")+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations. bucket_index strategy can be used in Compactor only and utilizes the existing bucket index to fetch block IDs to sync. This avoids iterating the bucket but can be impacted by delays of cleaner creating bucket index.") } // Validate the config. @@ -331,6 +337,9 @@ func (cfg *BucketStoreConfig) Validate() error { if err != nil { return errors.Wrap(err, "metadata-cache configuration") } + if !util.StringsContain(supportedBlockDiscoveryStrategies, cfg.BlockDiscoveryStrategy) { + return ErrInvalidBucketIndexBlockDiscoveryStrategy + } return nil } @@ -347,3 +356,18 @@ func (cfg *BucketIndexConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix st f.DurationVar(&cfg.IdleTimeout, prefix+"idle-timeout", time.Hour, "How long a unused bucket index should be cached. Once this timeout expires, the unused bucket index is removed from the in-memory cache. This option is used only by querier.") f.DurationVar(&cfg.MaxStalePeriod, prefix+"max-stale-period", time.Hour, "The maximum allowed age of a bucket index (last updated) before queries start failing because the bucket index is too old. The bucket index is periodically updated by the compactor, while this check is enforced in the querier (at query time).") } + +// BlockDiscoveryStrategy configures how to list block IDs from object storage. +type BlockDiscoveryStrategy string + +const ( + ConcurrentDiscovery BlockDiscoveryStrategy = "concurrent" + RecursiveDiscovery BlockDiscoveryStrategy = "recursive" + BucketIndexDiscovery BlockDiscoveryStrategy = "bucket_index" +) + +var supportedBlockDiscoveryStrategies = []string{ + string(ConcurrentDiscovery), + string(RecursiveDiscovery), + string(BucketIndexDiscovery), +} diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index e0b38c4d9b..d07149dd38 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -552,13 +552,25 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro // BucketStore metrics are correctly updated. fetcherBkt := NewShardingBucketReaderAdapter(userID, u.shardingStrategy, userBkt) - var err error - blockIdsFetcher := block.NewRecursiveLister(userLogger, fetcherBkt) + var ( + err error + blockLister block.Lister + ) + switch tsdb.BlockDiscoveryStrategy(u.cfg.BucketStore.BlockDiscoveryStrategy) { + case tsdb.ConcurrentDiscovery: + blockLister = block.NewConcurrentLister(userLogger, userBkt) + case tsdb.RecursiveDiscovery: + blockLister = block.NewRecursiveLister(userLogger, userBkt) + case tsdb.BucketIndexDiscovery: + return nil, tsdb.ErrInvalidBucketIndexBlockDiscoveryStrategy + default: + return nil, tsdb.ErrBlockDiscoveryStrategy + } fetcher, err = block.NewMetaFetcher( userLogger, u.cfg.BucketStore.MetaSyncConcurrency, fetcherBkt, - blockIdsFetcher, + blockLister, u.syncDirForUser(userID), // The fetcher stores cached metas in the "meta-syncer/" sub directory fetcherReg, filters,