Skip to content

Commit

Permalink
Allow using different listing strategies
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski committed Feb 13, 2024
1 parent e78d867 commit eaa68b7
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 19 deletions.
1 change: 1 addition & 0 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/thanos-io/objstore"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact/downsample"
Expand Down
109 changes: 90 additions & 19 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,26 +170,27 @@ func DefaultModifiedLabelValues() [][]string {
}
}

// Fetcher interface to retieve blockId information from a bucket.
// BlockIDsFetcher lists block IDs from a bucket.
type BlockIDsFetcher interface {
// GetActiveBlocksIDs returning it via channel (streaming) and response.
// GetActiveAndPartialBlockIDs GetActiveBlocksIDs returning it via channel (streaming) and response.
// Active blocks are blocks which contain meta.json, while partial blocks are blocks without meta.json
GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error)
}

type BaseBlockIDsFetcher struct {
// RecursiveBlockIDsFetcher lists block IDs by iterating the object storage bucket recursively.
type RecursiveBlockIDsFetcher struct {
logger log.Logger
bkt objstore.InstrumentedBucketReader
}

func NewBaseBlockIDsFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader) *BaseBlockIDsFetcher {
return &BaseBlockIDsFetcher{
func NewRecursiveLister(logger log.Logger, bkt objstore.InstrumentedBucketReader) *RecursiveBlockIDsFetcher {
return &RecursiveBlockIDsFetcher{
logger: logger,
bkt: bkt,
}
}

func (f *BaseBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) {
func (f *RecursiveBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) {
partialBlocks = make(map[ulid.ULID]bool)
err = f.bkt.Iter(ctx, "", func(name string) error {
parts := strings.Split(name, "/")
Expand All @@ -216,6 +217,76 @@ func (f *BaseBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, c
return partialBlocks, err
}

// BaseBlockIDsFetcher Default lists block IDs by doing a top level iteration of the bucket and using an Exists call to detect partial blocks.
type BaseBlockIDsFetcher struct {
logger log.Logger
bkt objstore.InstrumentedBucketReader
}

func NewBaseBlockIDsFetcher(logger log.Logger, bkt objstore.InstrumentedBucketReader) *BaseBlockIDsFetcher {
return &BaseBlockIDsFetcher{
logger: logger,
bkt: bkt,
}
}

func (f *BaseBlockIDsFetcher) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) {
const concurrency = 64

var metaChan = make(chan ulid.ULID, concurrency)
if err = f.bkt.Iter(ctx, "", func(name string) error {
parts := strings.Split(name, "/")
dir, file := parts[0], parts[len(parts)-1]
id, ok := IsBlockDir(dir)
if !ok {
return nil
}
if !IsBlockMetaFile(file) {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case metaChan <- id:
}
return nil
}); err != nil {
return nil, err
}

partialBlocks = make(map[ulid.ULID]bool)
var (
eg errgroup.Group
mu sync.Mutex
)
for i := 0; i < concurrency; i++ {
eg.Go(func() error {
for uid := range metaChan {
// TODO(bwplotka): If that causes problems (obj store rate limits), add longer ttl to cached items.
// For 1y and 100 block sources this generates ~1.5-3k HEAD RPM. AWS handles 330k RPM per prefix.
// TODO(bwplotka): Consider filtering by consistency delay here (can't do until compactor healthyOverride work).
metaFile := path.Join(uid.String(), MetaFilename)
ok, err := f.bkt.Exists(ctx, metaFile)
if err != nil {
return errors.Wrapf(err, "meta.json file exists: %v", uid)
}
if !ok {
mu.Lock()
partialBlocks[uid] = true
mu.Unlock()
return ErrorSyncMetaNotFound
}
ch <- uid
}
return nil
})
}
if err := eg.Wait(); err != nil {
return nil, err
}
return partialBlocks, nil
}

type MetadataFetcher interface {
Fetch(ctx context.Context) (metas map[ulid.ULID]*metadata.Meta, partial map[ulid.ULID]error, err error)
UpdateOnChange(func([]metadata.Meta, error))
Expand All @@ -234,10 +305,10 @@ type MetadataFilter interface {
// BaseFetcher is a struct that synchronizes filtered metadata of all block in the object storage with the local state.
// Go-routine safe.
type BaseFetcher struct {
logger log.Logger
concurrency int
bkt objstore.InstrumentedBucketReader
blockIDsFetcher BlockIDsFetcher
logger log.Logger
concurrency int
bkt objstore.InstrumentedBucketReader
blockIDsLister BlockIDsFetcher

// Optional local directory to cache meta.json files.
cacheDir string
Expand All @@ -254,7 +325,7 @@ func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.Instrumente
}

// NewBaseFetcherWithMetrics constructs BaseFetcher.
func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsFetcher BlockIDsFetcher, dir string, metrics *BaseFetcherMetrics) (*BaseFetcher, error) {
func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, blockIDsLister BlockIDsFetcher, dir string, metrics *BaseFetcherMetrics) (*BaseFetcher, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -268,13 +339,13 @@ func NewBaseFetcherWithMetrics(logger log.Logger, concurrency int, bkt objstore.
}

return &BaseFetcher{
logger: log.With(logger, "component", "block.BaseFetcher"),
concurrency: concurrency,
bkt: bkt,
blockIDsFetcher: blockIDsFetcher,
cacheDir: cacheDir,
cached: map[ulid.ULID]*metadata.Meta{},
syncs: metrics.Syncs,
logger: log.With(logger, "component", "block.BaseFetcher"),
concurrency: concurrency,
bkt: bkt,
blockIDsLister: blockIDsLister,
cacheDir: cacheDir,
cached: map[ulid.ULID]*metadata.Meta{},
syncs: metrics.Syncs,
}, nil
}

Expand Down Expand Up @@ -445,7 +516,7 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) {
// Workers scheduled, distribute blocks.
eg.Go(func() error {
defer close(ch)
partialBlocks, err = f.blockIDsFetcher.GetActiveAndPartialBlockIDs(ctx, ch)
partialBlocks, err = f.blockIDsLister.GetActiveAndPartialBlockIDs(ctx, ch)
return err
})

Expand Down
1 change: 1 addition & 0 deletions pkg/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/thanos-io/objstore/objtesting"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/model"
Expand Down
1 change: 1 addition & 0 deletions pkg/compact/clean_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/thanos-io/objstore"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
)
Expand Down
1 change: 1 addition & 0 deletions pkg/compact/compact_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/thanos-io/objstore/objtesting"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/dedup"
Expand Down
1 change: 1 addition & 0 deletions pkg/compact/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/thanos-io/objstore"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
Expand Down
1 change: 1 addition & 0 deletions pkg/store/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/filesystem"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
Expand Down

0 comments on commit eaa68b7

Please sign in to comment.