Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow using different listing strategies #7134

Merged
merged 5 commits into from
Feb 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7083](https://github.com/thanos-io/thanos/pull/7083) Store Gateway: Fix lazy expanded postings with 0 length failed to be cached.
- [#7080](https://github.com/thanos-io/thanos/pull/7080) Receive: race condition in handler Close() when stopped early
- [#7132](https://github.com/thanos-io/thanos/pull/7132) Documentation: fix broken helm installation instruction
- [#7134](https://github.com/thanos-io/thanos/pull/7134) Store, Compact: Revert the recursive block listing mechanism introduced in https://github.com/thanos-io/thanos/pull/6474 and use the same strategy as in 0.31. Introduce a `--block-discovery-strategy` flag to control the listing strategy so that a recursive lister can still be used if the tradeoff of slower but cheaper discovery is preferred.

### Added

Expand Down
16 changes: 14 additions & 2 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,16 @@ func runCompact(
consistencyDelayMetaFilter := block.NewConsistencyDelayMetaFilter(logger, conf.consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg))
timePartitionMetaFilter := block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime)

baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, baseBlockIDsFetcher, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg))
var blockLister block.Lister
switch syncStrategy(conf.blockListStrategy) {
case concurrentDiscovery:
blockLister = block.NewConcurrentLister(logger, insBkt)
case recursiveDiscovery:
blockLister = block.NewRecursiveLister(logger, insBkt)
default:
return errors.Errorf("unknown sync strategy %s", conf.blockListStrategy)
}
baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, blockLister, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
Expand Down Expand Up @@ -693,6 +701,7 @@ type compactConfig struct {
wait bool
waitInterval time.Duration
disableDownsampling bool
blockListStrategy string
blockMetaFetchConcurrency int
blockFilesConcurrency int
blockViewerSyncBlockInterval time.Duration
Expand Down Expand Up @@ -754,6 +763,9 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
"as querying long time ranges without non-downsampled data is not efficient and useful e.g it is not possible to render all samples for a human eye anyway").
Default("false").BoolVar(&cc.disableDownsampling)

strategies := strings.Join([]string{string(concurrentDiscovery), string(recursiveDiscovery)}, ", ")
cmd.Flag("block-discovery-strategy", "One of "+strategies+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations.").
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We revert the current behavior and change back to the old one? Is it worth a changelog?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am no authority on anything, but I would personally like to have a changelog entry on this. Seeing how it was mentioned in different threads how this is a blocker for upgrading Thanos itself for some, I think it is important enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. I added a changelog entry, let me know if that's sufficient or we should elaborate further.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it. Good enough for me.

Default(string(concurrentDiscovery)).StringVar(&cc.blockListStrategy)
cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage.").
Default("32").IntVar(&cc.blockMetaFetchConcurrency)
cmd.Flag("block-files-concurrency", "Number of goroutines to use when fetching/uploading block files from object storage.").
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func RunDownsample(
insBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name()))

// While fetching blocks, filter out blocks that were marked for no downsample.
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
block.NewDeduplicateFilter(block.FetcherConcurrency),
downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt, block.FetcherConcurrency),
Expand Down
5 changes: 3 additions & 2 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/thanos-io/objstore"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact/downsample"
Expand Down Expand Up @@ -157,7 +158,7 @@ func TestRegression4960_Deadlock(t *testing.T) {

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.GroupKey())))
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, bkt)
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, baseBlockIDsFetcher, "", nil, nil)
testutil.Ok(t, err)

Expand Down Expand Up @@ -197,7 +198,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(meta.Thanos.GroupKey())))
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, bkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, bkt)
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, baseBlockIDsFetcher, "", nil, nil)
testutil.Ok(t, err)

Expand Down
25 changes: 23 additions & 2 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"strconv"
"strings"
"time"

"github.com/alecthomas/units"
Expand Down Expand Up @@ -56,6 +57,13 @@ const (
retryIntervalDuration = 10
)

type syncStrategy string

const (
concurrentDiscovery syncStrategy = "concurrent"
recursiveDiscovery syncStrategy = "recursive"
)

type storeConfig struct {
indexCacheConfigs extflag.PathOrContent
objStoreConfig extflag.PathOrContent
Expand All @@ -74,6 +82,7 @@ type storeConfig struct {
component component.StoreAPI
debugLogging bool
syncInterval time.Duration
blockListStrategy string
blockSyncConcurrency int
blockMetaFetchConcurrency int
filterConf *store.FilterConfig
Expand Down Expand Up @@ -137,6 +146,10 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("sync-block-duration", "Repeat interval for syncing the blocks between local and remote view.").
Default("15m").DurationVar(&sc.syncInterval)

strategies := strings.Join([]string{string(concurrentDiscovery), string(recursiveDiscovery)}, ", ")
cmd.Flag("block-discovery-strategy", "One of "+strategies+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations.").
Default(string(concurrentDiscovery)).StringVar(&sc.blockListStrategy)

cmd.Flag("block-sync-concurrency", "Number of goroutines to use when constructing index-cache.json blocks from object storage. Must be equal or greater than 1.").
Default("20").IntVar(&sc.blockSyncConcurrency)

Expand Down Expand Up @@ -345,9 +358,17 @@ func runStore(
return errors.Wrap(err, "create index cache")
}

var blockLister block.Lister
switch syncStrategy(conf.blockListStrategy) {
case concurrentDiscovery:
blockLister = block.NewConcurrentLister(logger, insBkt)
case recursiveDiscovery:
blockLister = block.NewRecursiveLister(logger, insBkt)
default:
return errors.Errorf("unknown sync strategy %s", conf.blockListStrategy)
}
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, time.Duration(conf.ignoreDeletionMarksDelay), conf.blockMetaFetchConcurrency)
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, baseBlockIDsFetcher, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
metaFetcher, err := block.NewMetaFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, blockLister, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg),
[]block.MetadataFilter{
block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime),
block.NewLabelShardedMetaFilter(relabelConfig),
Expand Down
12 changes: 6 additions & 6 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func registerBucketVerify(app extkingpin.AppClause, objStoreConfig *extflag.Path

// We ignore any block that has the deletion marker file.
filters := []block.MetadataFilter{block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency)}
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters)
if err != nil {
return err
Expand Down Expand Up @@ -423,7 +423,7 @@ func registerBucketLs(app extkingpin.AppClause, objStoreConfig *extflag.PathOrCo
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, 0, block.FetcherConcurrency)
filters = append(filters, ignoreDeletionMarkFilter)
}
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), filters)
if err != nil {
return err
Expand Down Expand Up @@ -525,7 +525,7 @@ func registerBucketInspect(app extkingpin.AppClause, objStoreConfig *extflag.Pat
}
insBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name()))

baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg), nil)
if err != nil {
return err
Expand Down Expand Up @@ -669,7 +669,7 @@ func registerBucketWeb(app extkingpin.AppClause, objStoreConfig *extflag.PathOrC
return err
}
// TODO(bwplotka): Allow Bucket UI to visualize the state of block as well.
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
fetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg),
[]block.MetadataFilter{
block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime),
Expand Down Expand Up @@ -848,7 +848,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat

var sy *compact.Syncer
{
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
Expand Down Expand Up @@ -1391,7 +1391,7 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P

var sy *compact.Syncer
{
baseBlockIDsFetcher := block.NewBaseBlockIDsFetcher(logger, insBkt)
baseBlockIDsFetcher := block.NewConcurrentLister(logger, insBkt)
baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, insBkt, baseBlockIDsFetcher, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
Expand Down
9 changes: 9 additions & 0 deletions docs/components/compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,15 @@ usage: thanos compact [<flags>]
Continuously compacts blocks in an object store bucket.

Flags:
--block-discovery-strategy="concurrent"
One of concurrent, recursive. When set to
concurrent, stores will concurrently issue
one call per directory to discover active
blocks in the bucket. The recursive strategy
iterates through all objects in the bucket,
recursively traversing into each directory.
This avoids N+1 calls at the expense of having
slower bucket iterations.
--block-files-concurrency=1
Number of goroutines to use when
fetching/uploading block files from object
Expand Down
9 changes: 9 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ Store node giving access to blocks in a bucket provider. Now supported GCS, S3,
Azure, Swift, Tencent COS and Aliyun OSS.

Flags:
--block-discovery-strategy="concurrent"
One of concurrent, recursive. When set to
concurrent, stores will concurrently issue
one call per directory to discover active
blocks in the bucket. The recursive strategy
iterates through all objects in the bucket,
recursively traversing into each directory.
This avoids N+1 calls at the expense of having
slower bucket iterations.
--block-meta-fetch-concurrency=32
Number of goroutines to use when fetching block
metadata from object storage.
Expand Down
Loading
Loading