diff --git a/pkg/compactor/background_chunks_series_set.go b/pkg/compactor/background_chunks_series_set.go new file mode 100644 index 0000000000..7e300fed2a --- /dev/null +++ b/pkg/compactor/background_chunks_series_set.go @@ -0,0 +1,59 @@ +package compactor + +import ( + "context" + + "github.com/prometheus/prometheus/storage" +) + +type backgrounChunkSeriesSet struct { + nextSet chan storage.ChunkSeries + actual storage.ChunkSeries + cs storage.ChunkSeriesSet +} + +func (b *backgrounChunkSeriesSet) Next() bool { + s, ok := <-b.nextSet + b.actual = s + return ok +} + +func (b *backgrounChunkSeriesSet) At() storage.ChunkSeries { + return b.actual +} + +func (b *backgrounChunkSeriesSet) Err() error { + return b.cs.Err() +} + +func (b *backgrounChunkSeriesSet) Warnings() storage.Warnings { + return b.cs.Warnings() +} + +func (b *backgrounChunkSeriesSet) run(ctx context.Context) { + for { + if !b.cs.Next() { + close(b.nextSet) + return + } + + select { + case b.nextSet <- b.cs.At(): + case <-ctx.Done(): + return + } + } +} + +func NewBackgroundChunkSeriesSet(ctx context.Context, cs storage.ChunkSeriesSet) storage.ChunkSeriesSet { + r := &backgrounChunkSeriesSet{ + cs: cs, + nextSet: make(chan storage.ChunkSeries, 1000), + } + + go func() { + r.run(ctx) + }() + + return r +} diff --git a/pkg/compactor/block_visit_marker.go b/pkg/compactor/block_visit_marker.go index 2e43dca6c0..cd5553c99a 100644 --- a/pkg/compactor/block_visit_marker.go +++ b/pkg/compactor/block_visit_marker.go @@ -21,8 +21,10 @@ import ( ) const ( - // BlockVisitMarkerFile is the known json filename for representing the most recent compactor visit. - BlockVisitMarkerFile = "visit-mark.json" + // BlockVisitMarkerFileSuffix is the known suffix of json filename for representing the most recent compactor visit. + BlockVisitMarkerFileSuffix = "-visit-mark.json" + // BlockVisitMarkerFilePrefix is the known prefix of json filename for representing the most recent compactor visit. + BlockVisitMarkerFilePrefix = "partition-" // VisitMarkerVersion1 is the current supported version of visit-mark file. VisitMarkerVersion1 = 1 ) @@ -30,26 +32,45 @@ const ( var ( ErrorBlockVisitMarkerNotFound = errors.New("block visit marker not found") ErrorUnmarshalBlockVisitMarker = errors.New("unmarshal block visit marker JSON") + ErrorNotBlockVisitMarker = errors.New("file is not block visit marker") +) + +type VisitStatus string + +const ( + Pending VisitStatus = "pending" + Completed VisitStatus = "completed" ) type BlockVisitMarker struct { - CompactorID string `json:"compactorID"` + CompactorID string `json:"compactorID"` + Status VisitStatus `json:"status"` + PartitionedGroupID uint32 `json:"partitionedGroupID"` + PartitionID int `json:"partitionID"` // VisitTime is a unix timestamp of when the block was visited (mark updated). VisitTime int64 `json:"visitTime"` // Version of the file. Version int `json:"version"` } -func (b *BlockVisitMarker) isVisited(blockVisitMarkerTimeout time.Duration) bool { - return time.Now().Before(time.Unix(b.VisitTime, 0).Add(blockVisitMarkerTimeout)) +func (b *BlockVisitMarker) isVisited(blockVisitMarkerTimeout time.Duration, partitionID int) bool { + return b.isCompleted() || partitionID == b.PartitionID && time.Now().Before(time.Unix(b.VisitTime, 0).Add(blockVisitMarkerTimeout)) +} + +func (b *BlockVisitMarker) isVisitedByCompactor(blockVisitMarkerTimeout time.Duration, partitionID int, compactorID string) bool { + return b.CompactorID == compactorID && b.isVisited(blockVisitMarkerTimeout, partitionID) +} + +func (b *BlockVisitMarker) isCompleted() bool { + return b.Status == Completed } -func (b *BlockVisitMarker) isVisitedByCompactor(blockVisitMarkerTimeout time.Duration, compactorID string) bool { - return b.CompactorID == compactorID && time.Now().Before(time.Unix(b.VisitTime, 0).Add(blockVisitMarkerTimeout)) +func GetBlockVisitMarkerFile(blockID string, partitionID int) string { + return path.Join(blockID, fmt.Sprintf("%s%d%s", BlockVisitMarkerFilePrefix, partitionID, BlockVisitMarkerFileSuffix)) } -func ReadBlockVisitMarker(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger, blockID string, blockVisitMarkerReadFailed prometheus.Counter) (*BlockVisitMarker, error) { - visitMarkerFile := path.Join(blockID, BlockVisitMarkerFile) +func ReadBlockVisitMarker(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger, blockID string, partitionID int, blockVisitMarkerReadFailed prometheus.Counter) (*BlockVisitMarker, error) { + visitMarkerFile := GetBlockVisitMarkerFile(blockID, partitionID) visitMarkerFileReader, err := bkt.ReaderWithExpectedErrs(bkt.IsObjNotFoundErr).Get(ctx, visitMarkerFile) if err != nil { if bkt.IsObjNotFoundErr(err) { @@ -75,8 +96,8 @@ func ReadBlockVisitMarker(ctx context.Context, bkt objstore.InstrumentedBucketRe return &blockVisitMarker, nil } -func UpdateBlockVisitMarker(ctx context.Context, bkt objstore.Bucket, blockID string, reader io.Reader, blockVisitMarkerWriteFailed prometheus.Counter) error { - blockVisitMarkerFilePath := path.Join(blockID, BlockVisitMarkerFile) +func UpdateBlockVisitMarker(ctx context.Context, bkt objstore.Bucket, blockID string, partitionID int, reader io.Reader, blockVisitMarkerWriteFailed prometheus.Counter) error { + blockVisitMarkerFilePath := GetBlockVisitMarkerFile(blockID, partitionID) if err := bkt.Upload(ctx, blockVisitMarkerFilePath, reader); err != nil { blockVisitMarkerWriteFailed.Inc() return err @@ -84,6 +105,14 @@ func UpdateBlockVisitMarker(ctx context.Context, bkt objstore.Bucket, blockID st return nil } +func generateBlocksInfo(blocks []*metadata.Meta) string { + var blockIds []string + for _, block := range blocks { + blockIds = append(blockIds, block.ULID.String()) + } + return strings.Join(blockIds, ",") +} + func markBlocksVisited( ctx context.Context, bkt objstore.Bucket, @@ -99,39 +128,112 @@ func markBlocksVisited( } reader := bytes.NewReader(visitMarkerFileContent) for _, block := range blocks { + select { + // Exit early if possible. + case <-ctx.Done(): + return + default: + } + blockID := block.ULID.String() - if err := UpdateBlockVisitMarker(ctx, bkt, blockID, reader, blockVisitMarkerWriteFailed); err != nil { - level.Error(logger).Log("msg", "unable to upsert visit marker file content for block", "blockID", blockID, "err", err) + if err := UpdateBlockVisitMarker(ctx, bkt, blockID, marker.PartitionID, reader, blockVisitMarkerWriteFailed); err != nil { + level.Error(logger).Log("msg", "unable to upsert visit marker file content for block", "partition_id", marker.PartitionID, "block_id", blockID, "err", err) } reader.Reset(visitMarkerFileContent) } + level.Debug(logger).Log("msg", "marked block visited", "partition_id", marker.PartitionID, "blocks", generateBlocksInfo(blocks)) } -func markBlocksVisitedHeartBeat(ctx context.Context, bkt objstore.Bucket, logger log.Logger, blocks []*metadata.Meta, compactorID string, blockVisitMarkerFileUpdateInterval time.Duration, blockVisitMarkerWriteFailed prometheus.Counter) { - var blockIds []string - for _, block := range blocks { - blockIds = append(blockIds, block.ULID.String()) - } - blocksInfo := strings.Join(blockIds, ",") - level.Info(logger).Log("msg", fmt.Sprintf("start heart beat for blocks: %s", blocksInfo)) +func markBlocksVisitedHeartBeat( + ctx context.Context, + bkt objstore.Bucket, + logger log.Logger, + blocks []*metadata.Meta, + partitionedGroupID uint32, + partitionID int, + compactorID string, + blockVisitMarkerFileUpdateInterval time.Duration, + blockVisitMarkerWriteFailed prometheus.Counter, + errChan chan error, +) { + blocksInfo := generateBlocksInfo(blocks) + level.Info(logger).Log("msg", "start visit marker heart beat", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "blocks", blocksInfo) ticker := time.NewTicker(blockVisitMarkerFileUpdateInterval) defer ticker.Stop() + isComplete := false heartBeat: for { level.Debug(logger).Log("msg", fmt.Sprintf("heart beat for blocks: %s", blocksInfo)) blockVisitMarker := BlockVisitMarker{ - VisitTime: time.Now().Unix(), - CompactorID: compactorID, - Version: VisitMarkerVersion1, + VisitTime: time.Now().Unix(), + CompactorID: compactorID, + Status: Pending, + PartitionedGroupID: partitionedGroupID, + PartitionID: partitionID, + Version: VisitMarkerVersion1, } markBlocksVisited(ctx, bkt, logger, blocks, blockVisitMarker, blockVisitMarkerWriteFailed) select { case <-ctx.Done(): + level.Warn(logger).Log("msg", "visit marker heart beat got cancelled", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "blocks", blocksInfo) break heartBeat case <-ticker.C: continue + case err := <-errChan: + isComplete = err == nil + if err != nil { + level.Warn(logger).Log("msg", "stop visit marker heart beat due to error", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "blocks", blocksInfo, "err", err) + } + break heartBeat + } + } + if isComplete { + level.Info(logger).Log("msg", "update visit marker to completed status", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "blocks", blocksInfo) + markBlocksVisitMarkerCompleted(context.Background(), bkt, logger, blocks, partitionedGroupID, partitionID, compactorID, blockVisitMarkerWriteFailed) + } + level.Info(logger).Log("msg", "stop visit marker heart beat", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "blocks", blocksInfo) +} + +func markBlocksVisitMarkerCompleted( + ctx context.Context, + bkt objstore.Bucket, + logger log.Logger, + blocks []*metadata.Meta, + partitionedGroupID uint32, + partitionID int, + compactorID string, + blockVisitMarkerWriteFailed prometheus.Counter, +) { + blockVisitMarker := BlockVisitMarker{ + VisitTime: time.Now().Unix(), + CompactorID: compactorID, + Status: Completed, + PartitionedGroupID: partitionedGroupID, + PartitionID: partitionID, + Version: VisitMarkerVersion1, + } + visitMarkerFileContent, err := json.Marshal(blockVisitMarker) + if err != nil { + blockVisitMarkerWriteFailed.Inc() + return + } + reader := bytes.NewReader(visitMarkerFileContent) + for _, block := range blocks { + blockID := block.ULID.String() + if err := UpdateBlockVisitMarker(ctx, bkt, blockID, blockVisitMarker.PartitionID, reader, blockVisitMarkerWriteFailed); err != nil { + level.Error(logger).Log("msg", "unable to upsert completed visit marker file content for block", "partitioned_group_id", blockVisitMarker.PartitionedGroupID, "partition_id", blockVisitMarker.PartitionID, "block_id", blockID, "err", err) + } else { + level.Info(logger).Log("msg", "block partition is completed", "partitioned_group_id", blockVisitMarker.PartitionedGroupID, "partition_id", blockVisitMarker.PartitionID, "block_id", blockID) } + reader.Reset(visitMarkerFileContent) } - level.Info(logger).Log("msg", fmt.Sprintf("stop heart beat for blocks: %s", blocksInfo)) +} + +func IsBlockVisitMarker(path string) bool { + return strings.HasSuffix(path, BlockVisitMarkerFileSuffix) +} + +func IsNotBlockVisitMarkerError(err error) bool { + return errors.Is(err, ErrorNotBlockVisitMarker) } diff --git a/pkg/compactor/block_visit_marker_test.go b/pkg/compactor/block_visit_marker_test.go index 18b7c8e1b4..6f99bb2005 100644 --- a/pkg/compactor/block_visit_marker_test.go +++ b/pkg/compactor/block_visit_marker_test.go @@ -2,6 +2,7 @@ package compactor import ( "context" + "fmt" "testing" "time" @@ -85,10 +86,77 @@ func TestMarkBlocksVisited(t *testing.T) { logger := log.NewNopLogger() markBlocksVisited(ctx, bkt, logger, tcase.blocks, tcase.visitMarker, dummyCounter) for _, meta := range tcase.blocks { - res, err := ReadBlockVisitMarker(ctx, objstore.WithNoopInstr(bkt), logger, meta.ULID.String(), dummyCounter) + res, err := ReadBlockVisitMarker(ctx, objstore.WithNoopInstr(bkt), logger, meta.ULID.String(), tcase.visitMarker.PartitionID, dummyCounter) require.NoError(t, err) require.Equal(t, tcase.visitMarker, *res) } }) } } + +func TestMarkBlockVisitedHeartBeat(t *testing.T) { + partitionedGroupID := uint32(12345) + partitionID := 0 + compactorID := "test-compactor" + for _, tcase := range []struct { + name string + isCancelled bool + compactionErr error + expectedStatus VisitStatus + }{ + { + name: "heart beat got cancelled", + isCancelled: true, + compactionErr: nil, + expectedStatus: Pending, + }, + { + name: "heart beat complete without error", + isCancelled: false, + compactionErr: nil, + expectedStatus: Completed, + }, + { + name: "heart beat stopped due to compaction error", + isCancelled: false, + compactionErr: fmt.Errorf("some compaction failure"), + expectedStatus: Pending, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + ulid0 := ulid.MustNew(uint64(time.Now().UnixMilli()+0), nil) + ulid1 := ulid.MustNew(uint64(time.Now().UnixMilli()+1), nil) + blocks := []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: ulid0, + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: ulid1, + }, + }, + } + ctx, cancel := context.WithCancel(context.Background()) + dummyCounter := prometheus.NewCounter(prometheus.CounterOpts{}) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + logger := log.NewNopLogger() + errChan := make(chan error, 1) + go markBlocksVisitedHeartBeat(ctx, objstore.WithNoopInstr(bkt), logger, blocks, partitionedGroupID, partitionID, compactorID, time.Second, dummyCounter, errChan) + time.Sleep(2 * time.Second) + if tcase.isCancelled { + cancel() + } else { + errChan <- tcase.compactionErr + defer cancel() + } + time.Sleep(2 * time.Second) + for _, meta := range blocks { + res, err := ReadBlockVisitMarker(context.Background(), objstore.WithNoopInstr(bkt), logger, meta.ULID.String(), partitionID, dummyCounter) + require.NoError(t, err) + require.Equal(t, tcase.expectedStatus, res.Status) + } + }) + } +} diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 59e7fd1c40..4c0ec7c2ad 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -2,7 +2,10 @@ package compactor import ( "context" + "encoding/json" "fmt" + "io" + "path" "sync" "time" @@ -22,6 +25,7 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/concurrency" util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/cortexproject/cortex/pkg/util/runutil" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -62,9 +66,11 @@ type BlocksCleaner struct { tenantBlocksMarkedForNoCompaction *prometheus.GaugeVec tenantPartialBlocks *prometheus.GaugeVec tenantBucketIndexLastUpdate *prometheus.GaugeVec + compactorPartitionError *prometheus.CounterVec + partitionedGroupInfoReadFailed prometheus.Counter } -func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner { +func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer, partitionedGroupInfoReadFailed prometheus.Counter) *BlocksCleaner { c := &BlocksCleaner{ cfg: cfg, bucketClient: bucketClient, @@ -124,6 +130,12 @@ func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, use Name: "cortex_bucket_index_last_successful_update_timestamp_seconds", Help: "Timestamp of the last successful update of a tenant's bucket index.", }, []string{"user"}), + compactorPartitionError: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: compactorPartitionErrorCountName, + Help: compactorPartitionErrorCountHelp, + ConstLabels: prometheus.Labels{"reason": "parent-block-mismatch"}, + }, []string{"user"}), + partitionedGroupInfoReadFailed: partitionedGroupInfoReadFailed, } c.Service = services.NewTimerService(cfg.CleanupInterval, c.starting, c.ticker, nil) @@ -288,6 +300,13 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID level.Info(userLogger).Log("msg", "deleted files under "+block.DebugMetas+" for tenant marked for deletion", "count", deleted) } + // Clean up partitioned group info files + if deleted, err := bucket.DeletePrefix(ctx, userBucket, PartitionedGroupDirectory, userLogger); err != nil { + return errors.Wrap(err, "failed to delete "+PartitionedGroupDirectory) + } else if deleted > 0 { + level.Info(userLogger).Log("msg", "deleted files under "+PartitionedGroupDirectory+" for tenant marked for deletion", "count", deleted) + } + // Tenant deletion mark file is inside Markers as well. if deleted, err := bucket.DeletePrefix(ctx, userBucket, bucketindex.MarkersPathname, userLogger); err != nil { return errors.Wrap(err, "failed to delete marker files") @@ -389,6 +408,8 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b return err } + c.cleanPartitionedGroupInfo(ctx, userBucket, userLogger, userID, idx) + c.tenantBlocks.WithLabelValues(userID).Set(float64(len(idx.Blocks))) c.tenantBlocksMarkedForDelete.WithLabelValues(userID).Set(float64(len(idx.BlockDeletionMarks))) c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction)) @@ -398,6 +419,132 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b return nil } +func (c *BlocksCleaner) findResultBlocksForPartitionedGroup(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, index *bucketindex.Index, partitionedGroupInfo *PartitionedGroupInfo) map[int]ulid.ULID { + partitionedGroupID := partitionedGroupInfo.PartitionedGroupID + deletionMarkMap := index.BlockDeletionMarks.GetULIDSet() + var possibleResultBlocks []ulid.ULID + for _, b := range index.Blocks { + if b.MinTime >= partitionedGroupInfo.RangeStart && b.MaxTime <= partitionedGroupInfo.RangeEnd { + if _, ok := deletionMarkMap[b.ID]; !ok { + level.Info(userLogger).Log("msg", "found possible result block", "partitioned_group_id", partitionedGroupID, "block", b.ID.String()) + possibleResultBlocks = append(possibleResultBlocks, b.ID) + } + } + } + + resultBlocks := make(map[int]ulid.ULID) + for _, b := range possibleResultBlocks { + meta, err := block.DownloadMeta(ctx, userLogger, userBucket, b) + if err != nil { + level.Info(userLogger).Log("msg", "unable to get meta for block", "partitioned_group_id", partitionedGroupID, "block", b.String()) + continue + } + partitionInfo, err := GetPartitionInfo(meta) + if err != nil { + level.Warn(userLogger).Log("msg", "failed to get partition info for block", "partitioned_group_id", partitionedGroupID, "block", b.String(), "err", err) + continue + } + if partitionInfo == nil { + level.Info(userLogger).Log("msg", "unable to get partition info for block", "partitioned_group_id", partitionedGroupID, "block", b.String()) + continue + } + if partitionInfo.PartitionedGroupID == partitionedGroupID { + level.Info(userLogger).Log("msg", "found result block", "partitioned_group_id", partitionedGroupID, "partition_id", partitionInfo.PartitionID, "block", b.String()) + resultBlocks[partitionInfo.PartitionID] = b + } + level.Info(userLogger).Log("msg", fmt.Sprintf("block does not belong to this partitioned group: %d", partitionedGroupID), "partitioned_group_id", partitionInfo.PartitionedGroupID, "partition_id", partitionInfo.PartitionID, "block", b.String()) + } + + return resultBlocks +} + +func (c *BlocksCleaner) validatePartitionedResultBlock(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, userID string, resultBlock ulid.ULID, partition Partition, partitionedGroupID uint32) error { + meta, err := readMeta(ctx, userBucket, userLogger, resultBlock) + if err != nil { + level.Warn(userLogger).Log("msg", "unable to read meta of result block", "partitioned_group_id", partitionedGroupID, "partition_id", partition.PartitionID, "block", resultBlock.String()) + return err + } + expectedSourceBlocks := partition.getBlocksSet() + if len(expectedSourceBlocks) != len(meta.Compaction.Parents) { + c.compactorPartitionError.WithLabelValues(userID).Inc() + level.Warn(userLogger).Log("msg", "result block has different number of parent blocks as partitioned group info", "partitioned_group_id", partitionedGroupID, "partition_id", partition.PartitionID, "block", resultBlock.String()) + return fmt.Errorf("result block %s has different number of parent blocks as partitioned group info with partitioned group id %d, partition id %d", resultBlock.String(), partitionedGroupID, partition.PartitionID) + } + for _, parentBlock := range meta.Compaction.Parents { + if _, ok := expectedSourceBlocks[parentBlock.ULID]; !ok { + c.compactorPartitionError.WithLabelValues(userID).Inc() + level.Warn(userLogger).Log("msg", "parent blocks of result block does not match partitioned group info", "partitioned_group_id", partitionedGroupID, "partition_id", partition.PartitionID, "block", resultBlock.String()) + return fmt.Errorf("parent blocks of result block %s does not match partitioned group info with partitioned group id %d, partition id %d", resultBlock.String(), partitionedGroupID, partition.PartitionID) + } + } + return nil +} + +func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, userID string, index *bucketindex.Index) { + var deletePartitionedGroupInfo []string + err := userBucket.Iter(ctx, PartitionedGroupDirectory, func(file string) error { + partitionedGroupInfo, err := ReadPartitionedGroupInfoFile(ctx, userBucket, userLogger, file, c.partitionedGroupInfoReadFailed) + if err != nil { + level.Warn(userLogger).Log("msg", "failed to read partitioned group info", "partitioned_group_info", file) + return nil + } + resultBlocks := c.findResultBlocksForPartitionedGroup(ctx, userBucket, userLogger, index, partitionedGroupInfo) + partitionedGroupID := partitionedGroupInfo.PartitionedGroupID + for _, partition := range partitionedGroupInfo.Partitions { + if _, ok := resultBlocks[partition.PartitionID]; !ok { + level.Info(userLogger).Log("msg", "unable to find result block for partition in partitioned group", "partitioned_group_id", partitionedGroupID, "partition_id", partition.PartitionID) + return nil + } + resultBlock := resultBlocks[partition.PartitionID] + err := c.validatePartitionedResultBlock(ctx, userBucket, userLogger, userID, resultBlock, partition, partitionedGroupID) + if err != nil { + level.Warn(userLogger).Log("msg", "validate result block failed", "partitioned_group_id", partitionedGroupID, "partition_id", partition.PartitionID, "block", resultBlock.String(), "err", err) + return nil + } + level.Info(userLogger).Log("msg", "result block has expected parent blocks", "partitioned_group_id", partitionedGroupID, "partition_id", partition.PartitionID, "block", resultBlock.String()) + } + + // since the partitioned group were all complete, we can make sure + // all source blocks would be deleted. + blocks := partitionedGroupInfo.getAllBlocks() + for _, blockID := range blocks { + metaExists, err := userBucket.Exists(ctx, path.Join(blockID.String(), metadata.MetaFilename)) + if err != nil { + level.Info(userLogger).Log("msg", "block already deleted", "partitioned_group_id", partitionedGroupID, "block", blockID.String()) + continue + } + if metaExists { + deletionMarkerExists, err := userBucket.Exists(ctx, path.Join(blockID.String(), metadata.DeletionMarkFilename)) + if err == nil && deletionMarkerExists { + level.Info(userLogger).Log("msg", "block already marked for deletion", "partitioned_group_id", partitionedGroupID, "block", blockID.String()) + continue + } + if err := block.MarkForDeletion(ctx, userLogger, userBucket, blockID, "delete block during partitioned group completion check", c.blocksMarkedForDeletion); err != nil { + level.Warn(userLogger).Log("msg", "unable to mark block for deletion", "partitioned_group_id", partitionedGroupID, "block", blockID.String()) + // if one block can not be marked for deletion, we should + // skip delete this partitioned group. next iteration + // would try it again. + return nil + } + level.Info(userLogger).Log("msg", "marked block for deletion during partitioned group info clean up", "partitioned_group_id", partitionedGroupID, "block", blockID.String()) + } + } + level.Info(userLogger).Log("msg", "partitioned group info can be cleaned up", "partitioned_group_id", partitionedGroupID) + deletePartitionedGroupInfo = append(deletePartitionedGroupInfo, file) + return nil + }) + if err != nil { + level.Warn(userLogger).Log("msg", "error return when going through partitioned group directory", "err", err) + } + for _, partitionedGroupInfoFile := range deletePartitionedGroupInfo { + if err := userBucket.Delete(ctx, partitionedGroupInfoFile); err != nil { + level.Warn(userLogger).Log("msg", "failed to delete partitioned group info", "partitioned_group_info", partitionedGroupInfoFile, "err", err) + } else { + level.Info(userLogger).Log("msg", "deleted partitioned group info", "partitioned_group_info", partitionedGroupInfoFile) + } + } +} + // cleanUserPartialBlocks delete partial blocks which are safe to be deleted. The provided partials map // and index are updated accordingly. func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket objstore.InstrumentedBucket, userLogger log.Logger) { @@ -488,3 +635,20 @@ func listBlocksOutsideRetentionPeriod(idx *bucketindex.Index, threshold time.Tim return } + +func readMeta(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, blockID ulid.ULID) (*metadata.Meta, error) { + metaReader, err := userBucket.Get(ctx, path.Join(blockID.String(), block.MetaFilename)) + if err != nil { + return nil, err + } + defer runutil.CloseWithLogOnErr(userLogger, metaReader, "close meta reader") + b, err := io.ReadAll(metaReader) + if err != nil { + return nil, err + } + meta := metadata.Meta{} + if err = json.Unmarshal(b, &meta); err != nil { + return nil, err + } + return &meta, nil +} diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index 8220b9286d..719308ce68 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -79,11 +79,13 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions block6 := createTSDBBlock(t, bucketClient, "user-1", 40, 50, nil) block7 := createTSDBBlock(t, bucketClient, "user-2", 10, 20, nil) block8 := createTSDBBlock(t, bucketClient, "user-2", 40, 50, nil) + block11 := ulid.MustNew(11, rand.Reader) createDeletionMark(t, bucketClient, "user-1", block2, now.Add(-deletionDelay).Add(time.Hour)) // Block hasn't reached the deletion threshold yet. createDeletionMark(t, bucketClient, "user-1", block3, now.Add(-deletionDelay).Add(-time.Hour)) // Block reached the deletion threshold. createDeletionMark(t, bucketClient, "user-1", block4, now.Add(-deletionDelay).Add(time.Hour)) // Partial block hasn't reached the deletion threshold yet. createDeletionMark(t, bucketClient, "user-1", block5, now.Add(-deletionDelay).Add(-time.Hour)) // Partial block reached the deletion threshold. require.NoError(t, bucketClient.Delete(ctx, path.Join("user-1", block6.String(), metadata.MetaFilename))) // Partial block without deletion mark. + createBlockVisitMarker(t, bucketClient, "user-1", block11) // Partial block only has visit marker. createDeletionMark(t, bucketClient, "user-2", block7, now.Add(-deletionDelay).Add(-time.Hour)) // Block reached the deletion threshold. // Blocks for user-3, marked for deletion. @@ -122,7 +124,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) cfgProvider := newMockConfigProvider() - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, prometheus.NewCounter(prometheus.CounterOpts{})) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck @@ -147,6 +149,8 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions {path: path.Join("user-1", bucketindex.BlockDeletionMarkFilepath(block5)), expectedExists: false}, // Should not delete a partial block without deletion mark. {path: path.Join("user-1", block6.String(), "index"), expectedExists: true}, + // Should delete a partial block with only visit marker. + {path: path.Join("user-1", GetBlockVisitMarkerFile(block11.String(), 0)), expectedExists: false}, // Should completely delete blocks for user-3, marked for deletion {path: path.Join("user-3", block9.String(), metadata.MetaFilename), expectedExists: false}, {path: path.Join("user-3", block9.String(), "index"), expectedExists: false}, @@ -166,7 +170,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions assert.Equal(t, float64(1), testutil.ToFloat64(cleaner.runsStarted)) assert.Equal(t, float64(1), testutil.ToFloat64(cleaner.runsCompleted)) assert.Equal(t, float64(0), testutil.ToFloat64(cleaner.runsFailed)) - assert.Equal(t, float64(6), testutil.ToFloat64(cleaner.blocksCleanedTotal)) + assert.Equal(t, float64(7), testutil.ToFloat64(cleaner.blocksCleanedTotal)) assert.Equal(t, float64(0), testutil.ToFloat64(cleaner.blocksFailedTotal)) // Check the updated bucket index. @@ -179,7 +183,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions { userID: "user-1", expectedIndex: true, - expectedBlocks: []ulid.ULID{block1, block2 /* deleted: block3, block4, block5, partial: block6 */}, + expectedBlocks: []ulid.ULID{block1, block2 /* deleted: block3, block4, block5, block11, partial: block6 */}, expectedMarks: []ulid.ULID{block2}, }, { userID: "user-2", @@ -265,7 +269,7 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) { scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) cfgProvider := newMockConfigProvider() - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, prometheus.NewCounter(prometheus.CounterOpts{})) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck @@ -325,7 +329,7 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) { scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) cfgProvider := newMockConfigProvider() - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, prometheus.NewCounter(prometheus.CounterOpts{})) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck @@ -376,7 +380,7 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) cfgProvider := newMockConfigProvider() - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, prometheus.NewCounter(prometheus.CounterOpts{})) require.NoError(t, cleaner.cleanUsers(ctx, true)) assert.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(` @@ -507,7 +511,7 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) cfgProvider := newMockConfigProvider() - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, prometheus.NewCounter(prometheus.CounterOpts{})) assertBlockExists := func(user string, block ulid.ULID, expectExists bool) { exists, err := bucketClient.Exists(ctx, path.Join(user, block.String(), metadata.MetaFilename)) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index e665d3ec0d..11b3f0b7a5 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -40,8 +41,10 @@ const ( // ringKey is the key under which we store the compactors ring in the KVStore. ringKey = "compactor" - blocksMarkedForDeletionName = "cortex_compactor_blocks_marked_for_deletion_total" - blocksMarkedForDeletionHelp = "Total number of blocks marked for deletion in compactor." + blocksMarkedForDeletionName = "cortex_compactor_blocks_marked_for_deletion_total" + blocksMarkedForDeletionHelp = "Total number of blocks marked for deletion in compactor." + compactorPartitionErrorCountName = "cortex_compactor_partition_error" + compactorPartitionErrorCountHelp = "Count of errors happened during partitioning compaction." ) var ( @@ -52,12 +55,12 @@ var ( errInvalidShardingStrategy = errors.New("invalid sharding strategy") errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") - DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ prometheus.Counter, _ prometheus.Counter, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string, _ *compact.GatherNoCompactionMarkFilter) compact.Grouper { + DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ prometheus.Counter, _ prometheus.Counter, _ prometheus.Counter, _ prometheus.Counter, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string, _ *compact.GatherNoCompactionMarkFilter) compact.Grouper { return compact.NewDefaultGrouper( logger, bkt, - false, // Do not accept malformed indexes - true, // Enable vertical compaction + cfg.AcceptMalformedIndex, + true, // Enable vertical compaction reg, blocksMarkedForDeletion, garbageCollectedBlocks, @@ -67,13 +70,13 @@ var ( cfg.BlocksFetchConcurrency) } - ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Grouper { + ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, partitionedGroupInfoReadFailed prometheus.Counter, partitionedGroupInfoWriteFailed prometheus.Counter, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Grouper { return NewShuffleShardingGrouper( ctx, logger, bkt, - false, // Do not accept malformed indexes - true, // Enable vertical compaction + cfg.AcceptMalformedIndex, + true, // Enable vertical compaction reg, blocksMarkedForDeletion, blocksMarkedForNoCompaction, @@ -92,7 +95,10 @@ var ( cfg.BlockVisitMarkerTimeout, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed, - noCompactionMarkFilter.NoCompactMarkedBlocks) + partitionedGroupInfoReadFailed, + partitionedGroupInfoWriteFailed, + noCompactionMarkFilter.NoCompactMarkedBlocks, + ) } DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) { @@ -120,6 +126,14 @@ var ( } return compactor, plannerFactory, nil } + + DefaultBlockDeletableCheckerFactory = func(_ context.Context, _ objstore.InstrumentedBucket, _ log.Logger, _ prometheus.Counter, _ prometheus.Counter) compact.BlockDeletableChecker { + return compact.DefaultBlockDeletableChecker{} + } + + PartitionCompactionBlockDeletableCheckerFactory = func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, blockVisitMarkerReadFailed prometheus.Counter, partitionedGroupInfoWriteFailed prometheus.Counter) compact.BlockDeletableChecker { + return NewPartitionCompactionBlockDeletableChecker(ctx, bkt, logger, blockVisitMarkerReadFailed, partitionedGroupInfoWriteFailed) + } ) // BlocksGrouperFactory builds and returns the grouper to use to compact a tenant's blocks. @@ -135,6 +149,8 @@ type BlocksGrouperFactory func( remainingPlannedCompactions prometheus.Gauge, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, + partitionedGroupInfoReadFailed prometheus.Counter, + partitionedGroupInfoWriteFailed prometheus.Counter, ring *ring.Ring, ringLifecycler *ring.Lifecycler, limit Limits, @@ -161,6 +177,14 @@ type PlannerFactory func( blockVisitMarkerWriteFailed prometheus.Counter, ) compact.Planner +type BlockDeletableCheckerFactory func( + ctx context.Context, + bkt objstore.InstrumentedBucket, + logger log.Logger, + blockVisitMarkerReadFailed prometheus.Counter, + partitionedGroupInfoReadFailed prometheus.Counter, +) compact.BlockDeletableChecker + // Limits defines limits used by the Compactor. type Limits interface { CompactorTenantShardSize(userID string) int @@ -208,6 +232,12 @@ type Config struct { // Block visit marker file config BlockVisitMarkerTimeout time.Duration `yaml:"block_visit_marker_timeout"` BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"` + + AcceptMalformedIndex bool `yaml:"accept_malformed_index"` + + // Partitioning config + PartitionIndexSizeLimitInBytes int64 `yaml:"partition_index_size_limit_in_bytes"` + PartitionSeriesCountLimit int64 `yaml:"partition_series_count_limit"` } // RegisterFlags registers the Compactor flags. @@ -244,6 +274,11 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.BlockVisitMarkerTimeout, "compactor.block-visit-marker-timeout", 5*time.Minute, "How long block visit marker file should be considered as expired and able to be picked up by compactor again.") f.DurationVar(&cfg.BlockVisitMarkerFileUpdateInterval, "compactor.block-visit-marker-file-update-interval", 1*time.Minute, "How frequently block visit marker file should be updated duration compaction.") + + f.BoolVar(&cfg.AcceptMalformedIndex, "compactor.accept-malformed-index", false, "When enabled, index verification will ignore out of order label names.") + + f.Int64Var(&cfg.PartitionIndexSizeLimitInBytes, "compactor.partition-index-size-limit-in-bytes", 0, "Index size limit in bytes for each compaction partition. 0 means no limit") + f.Int64Var(&cfg.PartitionSeriesCountLimit, "compactor.partition-series-count-limit", 0, "Time series count limit for each compaction partition. 0 means no limit") } func (cfg *Config) Validate(limits validation.Limits) error { @@ -304,6 +339,8 @@ type Compactor struct { blocksPlannerFactory PlannerFactory + blockDeletableCheckerFactory BlockDeletableCheckerFactory + // Client used to run operations on the bucket storing blocks. bucketClient objstore.Bucket @@ -314,22 +351,24 @@ type Compactor struct { ringSubservicesWatcher *services.FailureWatcher // Metrics. - compactionRunsStarted prometheus.Counter - compactionRunsInterrupted prometheus.Counter - compactionRunsCompleted prometheus.Counter - compactionRunsFailed prometheus.Counter - compactionRunsLastSuccess prometheus.Gauge - compactionRunDiscoveredTenants prometheus.Gauge - compactionRunSkippedTenants prometheus.Gauge - compactionRunSucceededTenants prometheus.Gauge - compactionRunFailedTenants prometheus.Gauge - compactionRunInterval prometheus.Gauge - blocksMarkedForDeletion prometheus.Counter - blocksMarkedForNoCompaction prometheus.Counter - garbageCollectedBlocks prometheus.Counter - remainingPlannedCompactions prometheus.Gauge - blockVisitMarkerReadFailed prometheus.Counter - blockVisitMarkerWriteFailed prometheus.Counter + compactionRunsStarted prometheus.Counter + compactionRunsInterrupted prometheus.Counter + compactionRunsCompleted prometheus.Counter + compactionRunsFailed prometheus.Counter + compactionRunsLastSuccess prometheus.Gauge + compactionRunDiscoveredTenants prometheus.Gauge + compactionRunSkippedTenants prometheus.Gauge + compactionRunSucceededTenants prometheus.Gauge + compactionRunFailedTenants prometheus.Gauge + compactionRunInterval prometheus.Gauge + blocksMarkedForDeletion prometheus.Counter + blocksMarkedForNoCompaction prometheus.Counter + garbageCollectedBlocks prometheus.Counter + remainingPlannedCompactions prometheus.Gauge + blockVisitMarkerReadFailed prometheus.Counter + blockVisitMarkerWriteFailed prometheus.Counter + partitionedGroupInfoReadFailed prometheus.Counter + partitionedGroupInfoWriteFailed prometheus.Counter // TSDB syncer metrics syncerMetrics *syncerMetrics @@ -359,7 +398,14 @@ func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfi } } - cortexCompactor, err := newCompactor(compactorCfg, storageCfg, cfgProvider, logger, registerer, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory, limits) + var blockDeletableCheckerFactory BlockDeletableCheckerFactory + if compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle { + blockDeletableCheckerFactory = PartitionCompactionBlockDeletableCheckerFactory + } else { + blockDeletableCheckerFactory = DefaultBlockDeletableCheckerFactory + } + + cortexCompactor, err := newCompactor(compactorCfg, storageCfg, cfgProvider, logger, registerer, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory, blockDeletableCheckerFactory, limits) if err != nil { return nil, errors.Wrap(err, "failed to create Cortex blocks compactor") } @@ -376,6 +422,7 @@ func newCompactor( bucketClientFactory func(ctx context.Context) (objstore.Bucket, error), blocksGrouperFactory BlocksGrouperFactory, blocksCompactorFactory BlocksCompactorFactory, + blockDeletableCheckerFactory BlockDeletableCheckerFactory, limits Limits, ) (*Compactor, error) { var remainingPlannedCompactions prometheus.Gauge @@ -386,17 +433,18 @@ func newCompactor( }) } c := &Compactor{ - compactorCfg: compactorCfg, - storageCfg: storageCfg, - cfgProvider: cfgProvider, - parentLogger: logger, - logger: log.With(logger, "component", "compactor"), - registerer: registerer, - syncerMetrics: newSyncerMetrics(registerer), - bucketClientFactory: bucketClientFactory, - blocksGrouperFactory: blocksGrouperFactory, - blocksCompactorFactory: blocksCompactorFactory, - allowedTenants: util.NewAllowedTenants(compactorCfg.EnabledTenants, compactorCfg.DisabledTenants), + compactorCfg: compactorCfg, + storageCfg: storageCfg, + cfgProvider: cfgProvider, + parentLogger: logger, + logger: log.With(logger, "component", "compactor"), + registerer: registerer, + syncerMetrics: newSyncerMetrics(registerer), + bucketClientFactory: bucketClientFactory, + blocksGrouperFactory: blocksGrouperFactory, + blocksCompactorFactory: blocksCompactorFactory, + blockDeletableCheckerFactory: blockDeletableCheckerFactory, + allowedTenants: util.NewAllowedTenants(compactorCfg.EnabledTenants, compactorCfg.DisabledTenants), compactionRunsStarted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "cortex_compactor_runs_started_total", @@ -459,6 +507,14 @@ func newCompactor( Name: "cortex_compactor_block_visit_marker_write_failed", Help: "Number of block visit marker file failed to be written.", }), + partitionedGroupInfoReadFailed: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_compactor_partitioned_group_info_read_failed", + Help: "Number of partitioned group info file failed to be read.", + }), + partitionedGroupInfoWriteFailed: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_compactor_partitioned_group_info_write_failed", + Help: "Number of partitioned group info file failed to be written.", + }), remainingPlannedCompactions: remainingPlannedCompactions, limits: limits, } @@ -507,7 +563,7 @@ func (c *Compactor) starting(ctx context.Context) error { CleanupConcurrency: c.compactorCfg.CleanupConcurrency, BlockDeletionMarksMigrationEnabled: c.compactorCfg.BlockDeletionMarksMigrationEnabled, TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay, - }, c.bucketClient, c.usersScanner, c.cfgProvider, c.parentLogger, c.registerer) + }, c.bucketClient, c.usersScanner, c.cfgProvider, c.parentLogger, c.registerer, c.partitionedGroupInfoReadFailed) // Initialize the compactors ring if sharding is enabled. if c.compactorCfg.ShardingEnabled { @@ -605,23 +661,19 @@ func (c *Compactor) running(ctx context.Context) error { } func (c *Compactor) compactUsers(ctx context.Context) { - failed := false + succeeded := false interrupted := false + compactionErrorCount := 0 c.compactionRunsStarted.Inc() defer func() { - // interruptions and successful runs are considered - // mutually exclusive but we consider a run failed if any - // tenant runs failed even if later runs are interrupted - if !interrupted && !failed { + if succeeded && compactionErrorCount == 0 { c.compactionRunsCompleted.Inc() c.compactionRunsLastSuccess.SetToCurrentTime() - } - if interrupted { + } else if interrupted { c.compactionRunsInterrupted.Inc() - } - if failed { + } else { c.compactionRunsFailed.Inc() } @@ -635,7 +687,6 @@ func (c *Compactor) compactUsers(ctx context.Context) { level.Info(c.logger).Log("msg", "discovering users from bucket") users, err := c.discoverUsersWithRetries(ctx) if err != nil { - failed = true level.Error(c.logger).Log("msg", "failed to discover users from bucket", "err", err) return } @@ -656,7 +707,7 @@ func (c *Compactor) compactUsers(ctx context.Context) { // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). if ctx.Err() != nil { interrupted = true - level.Info(c.logger).Log("msg", "interrupting compaction of user blocks", "user", userID) + level.Info(c.logger).Log("msg", "interrupting compaction of user blocks", "err", err) return } @@ -686,15 +737,8 @@ func (c *Compactor) compactUsers(ctx context.Context) { level.Info(c.logger).Log("msg", "starting compaction of user blocks", "user", userID) if err = c.compactUserWithRetries(ctx, userID); err != nil { - // TODO: patch thanos error types to support errors.Is(err, context.Canceled) here - if ctx.Err() != nil && ctx.Err() == context.Canceled { - interrupted = true - level.Info(c.logger).Log("msg", "interrupting compaction of user blocks", "user", userID) - return - } - c.compactionRunFailedTenants.Inc() - failed = true + compactionErrorCount++ level.Error(c.logger).Log("msg", "failed to compact user blocks", "user", userID, "err", err) continue } @@ -729,6 +773,8 @@ func (c *Compactor) compactUsers(ctx context.Context) { } } } + + succeeded = true } func (c *Compactor) compactUserWithRetries(ctx context.Context, userID string) error { @@ -762,7 +808,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { // Filters out duplicate blocks that can be formed from two or more overlapping // blocks that fully submatches the source blocks of the older blocks. - deduplicateBlocksFilter := block.NewDeduplicateFilter(c.compactorCfg.BlockSyncConcurrency) + deduplicateBlocksFilter := &DisabledDeduplicateFilter{} // While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter. // No delay is used -- all blocks with deletion marker are ignored, and not considered for compaction. @@ -811,14 +857,19 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { return errors.Wrap(err, "failed to create syncer") } - currentCtx, cancel := context.WithCancel(ctx) - defer cancel() - compactor, err := compact.NewBucketCompactor( + blockDeletableChecker := c.blockDeletableCheckerFactory(ctx, bucket, ulogger, c.blockVisitMarkerReadFailed, c.partitionedGroupInfoReadFailed) + shardedCompactionLifecycleCallback := ShardedCompactionLifecycleCallback{ + userBucket: bucket, + partitionedGroupInfoReadFailed: c.partitionedGroupInfoReadFailed, + } + compactor, err := compact.NewBucketCompactorWithCheckerAndCallback( ulogger, syncer, - c.blocksGrouperFactory(currentCtx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks, c.remainingPlannedCompactions, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, c.ring, c.ringLifecycler, c.limits, userID, noCompactMarkerFilter), - c.blocksPlannerFactory(currentCtx, bucket, ulogger, c.compactorCfg, noCompactMarkerFilter, c.ringLifecycler, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed), + c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks, c.remainingPlannedCompactions, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, c.partitionedGroupInfoReadFailed, c.partitionedGroupInfoWriteFailed, c.ring, c.ringLifecycler, c.limits, userID, noCompactMarkerFilter), + c.blocksPlannerFactory(ctx, bucket, ulogger, c.compactorCfg, noCompactMarkerFilter, c.ringLifecycler, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed), c.blocksCompactor, + blockDeletableChecker, + shardedCompactionLifecycleCallback, path.Join(c.compactorCfg.DataDir, "compact"), bucket, c.compactorCfg.CompactionConcurrency, @@ -951,3 +1002,15 @@ func (c *Compactor) listTenantsWithMetaSyncDirectories() map[string]struct{} { return result } + +type DisabledDeduplicateFilter struct { +} + +func (f *DisabledDeduplicateFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error { + // don't do any deduplicate filtering + return nil +} + +func (f *DisabledDeduplicateFilter) DuplicateIDs() []ulid.ULID { + return nil +} diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 3810f7c511..d597c3c174 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1423,6 +1423,13 @@ func createNoCompactionMark(t *testing.T, bkt objstore.Bucket, userID string, bl require.NoError(t, bkt.Upload(context.Background(), markPath, strings.NewReader(content))) } +func createBlockVisitMarker(t *testing.T, bkt objstore.Bucket, userID string, blockID ulid.ULID) { + content := mockBlockVisitMarker() + markPath := path.Join(userID, GetBlockVisitMarkerFile(blockID.String(), 0)) + + require.NoError(t, bkt.Upload(context.Background(), markPath, strings.NewReader(content))) +} + func findCompactorByUserID(compactors []*Compactor, logs []*concurrency.SyncBuffer, userID string) (*Compactor, *concurrency.SyncBuffer, error) { var compactor *Compactor var log *concurrency.SyncBuffer @@ -1554,7 +1561,7 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket, li blocksGrouperFactory = DefaultBlocksGrouperFactory } - c, err := newCompactor(compactorCfg, storageCfg, overrides, logger, registry, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory, overrides) + c, err := newCompactor(compactorCfg, storageCfg, overrides, logger, registry, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory, DefaultBlockDeletableCheckerFactory, overrides) require.NoError(t, err) return c, tsdbCompactor, tsdbPlanner, logs, registry @@ -1579,12 +1586,17 @@ func (m *tsdbCompactorMock) Compact(dest string, dirs []string, open []*tsdb.Blo return args.Get(0).(ulid.ULID), args.Error(1) } +func (m *tsdbCompactorMock) CompactWithBlockPopulator(dest string, dirs []string, open []*tsdb.Block, blockPopulator tsdb.BlockPopulator) (ulid.ULID, error) { + args := m.Called(dest, dirs, open, blockPopulator) + return args.Get(0).(ulid.ULID), args.Error(1) +} + type tsdbPlannerMock struct { mock.Mock noCompactMarkFilters []*compact.GatherNoCompactionMarkFilter } -func (m *tsdbPlannerMock) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { +func (m *tsdbPlannerMock) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, errChan chan error, extensions any) ([]*metadata.Meta, error) { args := m.Called(ctx, metasByMinTime) return args.Get(0).([]*metadata.Meta), args.Error(1) } @@ -1680,6 +1692,21 @@ func mockBlockMetaJSONWithTime(id string, orgID string, minTime int64, maxTime i return string(content) } +func mockBlockVisitMarker() string { + blockVisitMarker := BlockVisitMarker{ + CompactorID: "dummy", + VisitTime: time.Now().Unix(), + Version: 1, + } + + content, err := json.Marshal(blockVisitMarker) + if err != nil { + panic("failed to marshal mocked block visit marker") + } + + return string(content) +} + func TestCompactor_DeleteLocalSyncFiles(t *testing.T) { numUsers := 10 diff --git a/pkg/compactor/meta_extenstions.go b/pkg/compactor/meta_extenstions.go new file mode 100644 index 0000000000..91b9ea8cec --- /dev/null +++ b/pkg/compactor/meta_extenstions.go @@ -0,0 +1,51 @@ +package compactor + +import ( + "fmt" + + "github.com/thanos-io/thanos/pkg/block/metadata" +) + +type CortexMetaExtensions struct { + PartitionInfo *PartitionInfo `json:"partition_info,omitempty"` +} + +type PartitionInfo struct { + PartitionedGroupID uint32 `json:"partitioned_group_id"` + PartitionCount int `json:"partition_count"` + PartitionID int `json:"partition_id"` +} + +func ConvertToCortexMetaExtensions(extensions any) (*CortexMetaExtensions, error) { + cortexExtensions, err := metadata.ConvertExtensions(extensions, &CortexMetaExtensions{}) + if err != nil { + return nil, err + } + if cortexExtensions == nil { + return nil, nil + } + converted, ok := cortexExtensions.(*CortexMetaExtensions) + if !ok { + return nil, fmt.Errorf("unable to convert extensions to CortexMetaExtensions") + } + return converted, nil +} + +func ConvertToPartitionInfo(extensions any) (*PartitionInfo, error) { + cortexExtensions, err := ConvertToCortexMetaExtensions(extensions) + if err != nil { + return nil, err + } + if cortexExtensions == nil { + return nil, nil + } + return cortexExtensions.PartitionInfo, nil +} + +func GetCortexMetaExtensionsFromMeta(meta metadata.Meta) (*CortexMetaExtensions, error) { + return ConvertToCortexMetaExtensions(meta.Thanos.Extensions) +} + +func GetPartitionInfo(meta metadata.Meta) (*PartitionInfo, error) { + return ConvertToPartitionInfo(meta.Thanos.Extensions) +} diff --git a/pkg/compactor/partition_compaction_complete_checker.go b/pkg/compactor/partition_compaction_complete_checker.go new file mode 100644 index 0000000000..cfb98a2e66 --- /dev/null +++ b/pkg/compactor/partition_compaction_complete_checker.go @@ -0,0 +1,73 @@ +package compactor + +import ( + "context" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/compact" +) + +type PartitionCompactionBlockDeletableChecker struct { + ctx context.Context + bkt objstore.InstrumentedBucket + logger log.Logger + blockVisitMarkerReadFailed prometheus.Counter + partitionedGroupInfoReadFailed prometheus.Counter +} + +func NewPartitionCompactionBlockDeletableChecker( + ctx context.Context, + bkt objstore.InstrumentedBucket, + logger log.Logger, + blockVisitMarkerReadFailed prometheus.Counter, + partitionedGroupInfoReadFailed prometheus.Counter, +) *PartitionCompactionBlockDeletableChecker { + return &PartitionCompactionBlockDeletableChecker{ + ctx: ctx, + bkt: bkt, + logger: logger, + blockVisitMarkerReadFailed: blockVisitMarkerReadFailed, + partitionedGroupInfoReadFailed: partitionedGroupInfoReadFailed, + } +} + +func (p *PartitionCompactionBlockDeletableChecker) CanDelete(group *compact.Group, blockID ulid.ULID) bool { + partitionInfo, err := ConvertToPartitionInfo(group.Extensions()) + if err != nil { + return false + } + if partitionInfo == nil { + return true + } + partitionedGroupID := partitionInfo.PartitionedGroupID + currentPartitionID := partitionInfo.PartitionID + partitionedGroupInfo, err := ReadPartitionedGroupInfo(p.ctx, p.bkt, p.logger, partitionedGroupID, p.partitionedGroupInfoReadFailed) + if err != nil { + level.Warn(p.logger).Log("msg", "unable to read partitioned group info", "partitioned_group_id", partitionedGroupID, "block_id", blockID, "err", err) + return false + } + return p.IsPartitionedBlockComplete(partitionedGroupInfo, currentPartitionID, blockID) +} + +func (p *PartitionCompactionBlockDeletableChecker) IsPartitionedBlockComplete(partitionedGroupInfo *PartitionedGroupInfo, currentPartitionID int, blockID ulid.ULID) bool { + partitionedGroupID := partitionedGroupInfo.PartitionedGroupID + for _, partitionID := range partitionedGroupInfo.getPartitionIDsByBlock(blockID) { + // Skip current partition ID since current one is completed + if partitionID != currentPartitionID { + blockVisitMarker, err := ReadBlockVisitMarker(p.ctx, p.bkt, p.logger, blockID.String(), partitionID, p.blockVisitMarkerReadFailed) + if err != nil { + level.Warn(p.logger).Log("msg", "unable to read all visit markers for block", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "block_id", blockID, "err", err) + return false + } + if !blockVisitMarker.isCompleted() { + level.Warn(p.logger).Log("msg", "block has incomplete partition", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "block_id", blockID) + return false + } + } + } + level.Info(p.logger).Log("msg", "block has all partitions completed", "partitioned_group_id", partitionedGroupID, "block_id", blockID) + return true +} diff --git a/pkg/compactor/partition_compaction_complete_checker_test.go b/pkg/compactor/partition_compaction_complete_checker_test.go new file mode 100644 index 0000000000..1d90987612 --- /dev/null +++ b/pkg/compactor/partition_compaction_complete_checker_test.go @@ -0,0 +1,387 @@ +package compactor + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/compact" + + "github.com/cortexproject/cortex/pkg/storage/bucket" +) + +func TestPartitionCompactionCompleteChecker(t *testing.T) { + ulid0 := ulid.MustNew(0, nil) + ulid1 := ulid.MustNew(1, nil) + ulid2 := ulid.MustNew(2, nil) + + rangeStart := (1 * time.Hour).Milliseconds() + rangeEnd := (2 * time.Hour).Milliseconds() + partitionedGroupID := uint32(12345) + compactorID := "compactor1" + timeBefore1h := time.Now().Add(-1 * time.Hour).Unix() + timeNow := time.Now().Unix() + + for _, tcase := range []struct { + name string + partitionedGroupInfo PartitionedGroupInfo + blocks map[ulid.ULID]struct { + expectComplete bool + visitMarkers []BlockVisitMarker + } + }{ + { + name: "all partitions are complete 1", + partitionedGroupInfo: PartitionedGroupInfo{ + PartitionedGroupID: partitionedGroupID, + PartitionCount: 2, + Partitions: []Partition{ + { + PartitionID: 0, + Blocks: []ulid.ULID{ + ulid0, + ulid1, + }, + }, + { + PartitionID: 1, + Blocks: []ulid.ULID{ + ulid0, + ulid2, + }, + }, + }, + RangeStart: rangeStart, + RangeEnd: rangeEnd, + Version: VisitMarkerVersion1, + }, + blocks: map[ulid.ULID]struct { + expectComplete bool + visitMarkers []BlockVisitMarker + }{ + ulid0: { + expectComplete: true, + visitMarkers: []BlockVisitMarker{ + { + Status: Completed, + PartitionedGroupID: partitionedGroupID, + PartitionID: 0, + CompactorID: compactorID, + VisitTime: timeBefore1h, + Version: VisitMarkerVersion1, + }, + { + Status: Completed, + PartitionedGroupID: partitionedGroupID, + PartitionID: 1, + CompactorID: compactorID, + VisitTime: timeBefore1h, + Version: VisitMarkerVersion1, + }, + }, + }, + ulid1: { + expectComplete: true, + visitMarkers: []BlockVisitMarker{ + { + Status: Completed, + PartitionedGroupID: partitionedGroupID, + PartitionID: 0, + CompactorID: compactorID, + VisitTime: timeBefore1h, + Version: VisitMarkerVersion1, + }, + }, + }, + ulid2: { + expectComplete: true, + visitMarkers: []BlockVisitMarker{ + { + Status: Completed, + PartitionedGroupID: partitionedGroupID, + PartitionID: 1, + CompactorID: compactorID, + VisitTime: timeBefore1h, + Version: VisitMarkerVersion1, + }, + }, + }, + }, + }, + { + name: "all partitions are complete 2", + partitionedGroupInfo: PartitionedGroupInfo{ + PartitionedGroupID: partitionedGroupID, + PartitionCount: 3, + Partitions: []Partition{ + { + PartitionID: 0, + Blocks: []ulid.ULID{ + ulid0, + }, + }, + { + PartitionID: 1, + Blocks: []ulid.ULID{ + ulid1, + }, + }, + { + PartitionID: 2, + Blocks: []ulid.ULID{ + ulid2, + }, + }, + }, + RangeStart: rangeStart, + RangeEnd: rangeEnd, + Version: VisitMarkerVersion1, + }, + blocks: map[ulid.ULID]struct { + expectComplete bool + visitMarkers []BlockVisitMarker + }{ + ulid0: { + expectComplete: true, + visitMarkers: []BlockVisitMarker{ + { + Status: Completed, + PartitionedGroupID: partitionedGroupID, + PartitionID: 0, + CompactorID: compactorID, + VisitTime: timeNow, + Version: VisitMarkerVersion1, + }, + }, + }, + ulid1: { + expectComplete: true, + visitMarkers: []BlockVisitMarker{ + { + Status: Completed, + PartitionedGroupID: partitionedGroupID, + PartitionID: 1, + CompactorID: compactorID, + VisitTime: timeNow, + Version: VisitMarkerVersion1, + }, + }, + }, + ulid2: { + expectComplete: true, + visitMarkers: []BlockVisitMarker{ + { + Status: Completed, + PartitionedGroupID: partitionedGroupID, + PartitionID: 2, + CompactorID: compactorID, + VisitTime: timeNow, + Version: VisitMarkerVersion1, + }, + }, + }, + }, + }, + { + name: "not all partitions are complete 1", + partitionedGroupInfo: PartitionedGroupInfo{ + PartitionedGroupID: partitionedGroupID, + PartitionCount: 3, + Partitions: []Partition{ + { + PartitionID: 0, + Blocks: []ulid.ULID{ + ulid0, + }, + }, + { + PartitionID: 1, + Blocks: []ulid.ULID{ + ulid0, + ulid1, + }, + }, + { + PartitionID: 2, + Blocks: []ulid.ULID{ + ulid2, + }, + }, + }, + RangeStart: rangeStart, + RangeEnd: rangeEnd, + Version: VisitMarkerVersion1, + }, + blocks: map[ulid.ULID]struct { + expectComplete bool + visitMarkers []BlockVisitMarker + }{ + ulid0: { + expectComplete: false, + visitMarkers: []BlockVisitMarker{ + { + Status: Completed, + PartitionedGroupID: partitionedGroupID, + PartitionID: 0, + CompactorID: compactorID, + VisitTime: timeBefore1h, + Version: VisitMarkerVersion1, + }, + { + Status: Pending, + PartitionedGroupID: partitionedGroupID, + PartitionID: 1, + CompactorID: compactorID, + VisitTime: timeBefore1h, + Version: VisitMarkerVersion1, + }, + }, + }, + ulid1: { + expectComplete: true, + visitMarkers: []BlockVisitMarker{ + { + Status: Completed, + PartitionedGroupID: partitionedGroupID, + PartitionID: 1, + CompactorID: compactorID, + VisitTime: timeBefore1h, + Version: VisitMarkerVersion1, + }, + }, + }, + ulid2: { + expectComplete: true, + visitMarkers: []BlockVisitMarker{ + { + Status: Completed, + PartitionedGroupID: partitionedGroupID, + PartitionID: 2, + CompactorID: compactorID, + VisitTime: timeBefore1h, + Version: VisitMarkerVersion1, + }, + }, + }, + }, + }, + { + name: "not all partitions are complete 2", + partitionedGroupInfo: PartitionedGroupInfo{ + PartitionedGroupID: partitionedGroupID, + PartitionCount: 3, + Partitions: []Partition{ + { + PartitionID: 0, + Blocks: []ulid.ULID{ + ulid0, + }, + }, + { + PartitionID: 1, + Blocks: []ulid.ULID{ + ulid0, + ulid1, + }, + }, + { + PartitionID: 2, + Blocks: []ulid.ULID{ + ulid2, + }, + }, + }, + RangeStart: rangeStart, + RangeEnd: rangeEnd, + Version: VisitMarkerVersion1, + }, + blocks: map[ulid.ULID]struct { + expectComplete bool + visitMarkers []BlockVisitMarker + }{ + ulid0: { + expectComplete: false, + visitMarkers: []BlockVisitMarker{ + { + Status: Completed, + PartitionedGroupID: partitionedGroupID, + PartitionID: 0, + CompactorID: compactorID, + VisitTime: timeNow, + Version: VisitMarkerVersion1, + }, + { + Status: Pending, + PartitionedGroupID: partitionedGroupID, + PartitionID: 1, + CompactorID: compactorID, + VisitTime: timeNow, + Version: VisitMarkerVersion1, + }, + }, + }, + ulid1: { + expectComplete: true, + visitMarkers: []BlockVisitMarker{ + { + Status: Completed, + PartitionedGroupID: partitionedGroupID, + PartitionID: 1, + CompactorID: compactorID, + VisitTime: timeNow, + Version: VisitMarkerVersion1, + }, + }, + }, + ulid2: { + expectComplete: true, + visitMarkers: []BlockVisitMarker{ + { + Status: Completed, + PartitionedGroupID: partitionedGroupID, + PartitionID: 2, + CompactorID: compactorID, + VisitTime: timeNow, + Version: VisitMarkerVersion1, + }, + }, + }, + }, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + bkt := &bucket.ClientMock{} + partitionedGroupInfoFileContent, _ := json.Marshal(tcase.partitionedGroupInfo) + bkt.MockGet(GetPartitionedGroupFile(partitionedGroupID), string(partitionedGroupInfoFileContent), nil) + checker := NewPartitionCompactionBlockDeletableChecker( + context.Background(), + objstore.WithNoopInstr(bkt), + log.NewNopLogger(), + prometheus.NewCounter(prometheus.CounterOpts{}), + prometheus.NewCounter(prometheus.CounterOpts{}), + ) + group := compact.Group{} + // set partitionID to -1, so it will go through all partitionIDs when checking + group.SetExtensions(&CortexMetaExtensions{ + PartitionInfo: &PartitionInfo{ + PartitionedGroupID: tcase.partitionedGroupInfo.PartitionedGroupID, + PartitionCount: tcase.partitionedGroupInfo.PartitionCount, + PartitionID: -1, + }, + }) + for blockID, blockTCase := range tcase.blocks { + for _, visitMarker := range blockTCase.visitMarkers { + visitMarkerFileContent, _ := json.Marshal(visitMarker) + bkt.MockGet(GetBlockVisitMarkerFile(blockID.String(), visitMarker.PartitionID), string(visitMarkerFileContent), nil) + } + require.Equal(t, blockTCase.expectComplete, checker.CanDelete(&group, blockID)) + } + }) + } +} diff --git a/pkg/compactor/partitioned_group_info.go b/pkg/compactor/partitioned_group_info.go new file mode 100644 index 0000000000..a4022c192d --- /dev/null +++ b/pkg/compactor/partitioned_group_info.go @@ -0,0 +1,145 @@ +package compactor + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "path" + "strings" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/objstore" + + "github.com/cortexproject/cortex/pkg/util/runutil" +) + +const ( + PartitionedGroupDirectory = "partitioned-groups" + PartitionedGroupInfoVersion1 = 1 +) + +var ( + ErrorPartitionedGroupInfoNotFound = errors.New("partitioned group info not found") + ErrorUnmarshalPartitionedGroupInfo = errors.New("unmarshal partitioned group info JSON") +) + +type Partition struct { + PartitionID int `json:"partitionID"` + Blocks []ulid.ULID `json:"blocks"` +} + +func (p *Partition) getBlocksSet() map[ulid.ULID]struct{} { + res := make(map[ulid.ULID]struct{}) + for _, blockID := range p.Blocks { + res[blockID] = struct{}{} + } + return res +} + +type PartitionedGroupInfo struct { + PartitionedGroupID uint32 `json:"partitionedGroupID"` + PartitionCount int `json:"partitionCount"` + Partitions []Partition `json:"partitions"` + RangeStart int64 `json:"rangeStart"` + RangeEnd int64 `json:"rangeEnd"` + // Version of the file. + Version int `json:"version"` +} + +func (p *PartitionedGroupInfo) getPartitionIDsByBlock(blockID ulid.ULID) []int { + var partitionIDs []int +partitionLoop: + for _, partition := range p.Partitions { + for _, block := range partition.Blocks { + if block == blockID { + partitionIDs = append(partitionIDs, partition.PartitionID) + continue partitionLoop + } + } + } + return partitionIDs +} + +func (p *PartitionedGroupInfo) getAllBlocks() []ulid.ULID { + uniqueBlocks := make(map[ulid.ULID]struct{}) + for _, partition := range p.Partitions { + for _, block := range partition.Blocks { + uniqueBlocks[block] = struct{}{} + } + } + blocks := make([]ulid.ULID, len(uniqueBlocks)) + i := 0 + for block := range uniqueBlocks { + blocks[i] = block + i++ + } + return blocks +} + +func (p PartitionedGroupInfo) String() string { + var partitions []string + for _, partition := range p.Partitions { + partitions = append(partitions, fmt.Sprintf("(PartitionID: %d, Blocks: %s)", partition.PartitionID, partition.Blocks)) + } + return fmt.Sprintf("{PartitionedGroupID: %d, PartitionCount: %d, Partitions: %s}", p.PartitionedGroupID, p.PartitionCount, strings.Join(partitions, ", ")) +} + +func GetPartitionedGroupFile(partitionedGroupID uint32) string { + return path.Join(PartitionedGroupDirectory, fmt.Sprintf("%d.json", partitionedGroupID)) +} + +func ReadPartitionedGroupInfo(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger, partitionedGroupID uint32, partitionedGroupInfoReadFailed prometheus.Counter) (*PartitionedGroupInfo, error) { + return ReadPartitionedGroupInfoFile(ctx, bkt, logger, GetPartitionedGroupFile(partitionedGroupID), partitionedGroupInfoReadFailed) +} + +func ReadPartitionedGroupInfoFile(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger, partitionedGroupFile string, partitionedGroupInfoReadFailed prometheus.Counter) (*PartitionedGroupInfo, error) { + partitionedGroupReader, err := bkt.ReaderWithExpectedErrs(bkt.IsObjNotFoundErr).Get(ctx, partitionedGroupFile) + if err != nil { + if bkt.IsObjNotFoundErr(err) { + return nil, errors.Wrapf(ErrorPartitionedGroupInfoNotFound, "partitioned group file: %s", partitionedGroupReader) + } + partitionedGroupInfoReadFailed.Inc() + return nil, errors.Wrapf(err, "get partitioned group file: %s", partitionedGroupReader) + } + defer runutil.CloseWithLogOnErr(logger, partitionedGroupReader, "close partitioned group reader") + p, err := io.ReadAll(partitionedGroupReader) + if err != nil { + partitionedGroupInfoReadFailed.Inc() + return nil, errors.Wrapf(err, "read partitioned group file: %s", partitionedGroupFile) + } + partitionedGroupInfo := PartitionedGroupInfo{} + if err = json.Unmarshal(p, &partitionedGroupInfo); err != nil { + partitionedGroupInfoReadFailed.Inc() + return nil, errors.Wrapf(ErrorUnmarshalPartitionedGroupInfo, "partitioned group file: %s, error: %v", partitionedGroupFile, err.Error()) + } + if partitionedGroupInfo.Version != VisitMarkerVersion1 { + partitionedGroupInfoReadFailed.Inc() + return nil, errors.Errorf("unexpected partitioned group file version %d, expected %d", partitionedGroupInfo.Version, VisitMarkerVersion1) + } + return &partitionedGroupInfo, nil +} + +func UpdatePartitionedGroupInfo(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger, partitionedGroupInfo PartitionedGroupInfo, partitionedGroupInfoReadFailed prometheus.Counter, partitionedGroupInfoWriteFailed prometheus.Counter) (*PartitionedGroupInfo, error) { + existingPartitionedGroup, _ := ReadPartitionedGroupInfo(ctx, bkt, logger, partitionedGroupInfo.PartitionedGroupID, partitionedGroupInfoReadFailed) + if existingPartitionedGroup != nil { + level.Warn(logger).Log("msg", "partitioned group info already exists", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID) + return existingPartitionedGroup, nil + } + partitionedGroupFile := GetPartitionedGroupFile(partitionedGroupInfo.PartitionedGroupID) + partitionedGroupInfoContent, err := json.Marshal(partitionedGroupInfo) + if err != nil { + partitionedGroupInfoWriteFailed.Inc() + return nil, err + } + reader := bytes.NewReader(partitionedGroupInfoContent) + if err := bkt.Upload(ctx, partitionedGroupFile, reader); err != nil { + return nil, err + } + return &partitionedGroupInfo, nil +} diff --git a/pkg/compactor/partitioned_group_info_test.go b/pkg/compactor/partitioned_group_info_test.go new file mode 100644 index 0000000000..f8bee62d17 --- /dev/null +++ b/pkg/compactor/partitioned_group_info_test.go @@ -0,0 +1,159 @@ +package compactor + +import ( + "context" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + + cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" +) + +func TestPartitionedGroupInfo(t *testing.T) { + ulid0 := ulid.MustNew(0, nil) + ulid1 := ulid.MustNew(1, nil) + ulid2 := ulid.MustNew(2, nil) + rangeStart := (1 * time.Hour).Milliseconds() + rangeEnd := (2 * time.Hour).Milliseconds() + partitionedGroupID := uint32(12345) + for _, tcase := range []struct { + name string + partitionedGroupInfo PartitionedGroupInfo + }{ + { + name: "write partitioned group info 1", + partitionedGroupInfo: PartitionedGroupInfo{ + PartitionedGroupID: partitionedGroupID, + PartitionCount: 2, + Partitions: []Partition{ + { + PartitionID: 0, + Blocks: []ulid.ULID{ + ulid0, + ulid1, + }, + }, + { + PartitionID: 1, + Blocks: []ulid.ULID{ + ulid0, + ulid2, + }, + }, + }, + RangeStart: rangeStart, + RangeEnd: rangeEnd, + Version: VisitMarkerVersion1, + }, + }, + { + name: "write partitioned group info 2", + partitionedGroupInfo: PartitionedGroupInfo{ + PartitionedGroupID: partitionedGroupID, + PartitionCount: 3, + Partitions: []Partition{ + { + PartitionID: 0, + Blocks: []ulid.ULID{ + ulid0, + }, + }, + { + PartitionID: 1, + Blocks: []ulid.ULID{ + ulid1, + }, + }, + { + PartitionID: 2, + Blocks: []ulid.ULID{ + ulid2, + }, + }, + }, + RangeStart: rangeStart, + RangeEnd: rangeEnd, + Version: VisitMarkerVersion1, + }, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + ctx := context.Background() + dummyReadCounter := prometheus.NewCounter(prometheus.CounterOpts{}) + dummyWriteCounter := prometheus.NewCounter(prometheus.CounterOpts{}) + testBkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + bkt := objstore.WithNoopInstr(testBkt) + logger := log.NewNopLogger() + writeRes, err := UpdatePartitionedGroupInfo(ctx, bkt, logger, tcase.partitionedGroupInfo, dummyReadCounter, dummyWriteCounter) + require.NoError(t, err) + require.Equal(t, tcase.partitionedGroupInfo, *writeRes) + readRes, err := ReadPartitionedGroupInfo(ctx, bkt, logger, tcase.partitionedGroupInfo.PartitionedGroupID, dummyReadCounter) + require.NoError(t, err) + require.Equal(t, tcase.partitionedGroupInfo, *readRes) + }) + } +} + +func TestGetPartitionIDsByBlock(t *testing.T) { + ulid0 := ulid.MustNew(0, nil) + ulid1 := ulid.MustNew(1, nil) + ulid2 := ulid.MustNew(2, nil) + ulid3 := ulid.MustNew(3, nil) + partitionedGroupInfo := PartitionedGroupInfo{ + PartitionedGroupID: uint32(12345), + PartitionCount: 3, + Partitions: []Partition{ + { + PartitionID: 0, + Blocks: []ulid.ULID{ + ulid0, + ulid1, + }, + }, + { + PartitionID: 1, + Blocks: []ulid.ULID{ + ulid0, + ulid2, + }, + }, + { + PartitionID: 2, + Blocks: []ulid.ULID{ + ulid0, + ulid1, + ulid2, + ulid3, + }, + }, + }, + RangeStart: (1 * time.Hour).Milliseconds(), + RangeEnd: (2 * time.Hour).Milliseconds(), + Version: VisitMarkerVersion1, + } + + res0 := partitionedGroupInfo.getPartitionIDsByBlock(ulid0) + require.Equal(t, 3, len(res0)) + require.Contains(t, res0, 0) + require.Contains(t, res0, 1) + require.Contains(t, res0, 2) + + res1 := partitionedGroupInfo.getPartitionIDsByBlock(ulid1) + require.Equal(t, 2, len(res1)) + require.Contains(t, res1, 0) + require.Contains(t, res1, 2) + + res2 := partitionedGroupInfo.getPartitionIDsByBlock(ulid2) + require.Equal(t, 2, len(res2)) + require.Contains(t, res2, 1) + require.Contains(t, res2, 2) + + res3 := partitionedGroupInfo.getPartitionIDsByBlock(ulid3) + require.Equal(t, 1, len(res3)) + require.Contains(t, res3, 2) +} diff --git a/pkg/compactor/sharded_block_populator.go b/pkg/compactor/sharded_block_populator.go new file mode 100644 index 0000000000..e178b59cea --- /dev/null +++ b/pkg/compactor/sharded_block_populator.go @@ -0,0 +1,221 @@ +package compactor + +import ( + "context" + "io" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/prometheus/prometheus/tsdb/index" +) + +type ShardedBlockPopulator struct { + partitionCount int + partitionID int + logger log.Logger +} + +// PopulateBlock fills the index and chunk writers with new data gathered as the union +// of the provided blocks. It returns meta information for the new block. +// It expects sorted blocks input by mint. +// The main logic is copied from tsdb.DefaultPopulateBlockFunc +func (c ShardedBlockPopulator) PopulateBlock(ctx context.Context, metrics *tsdb.CompactorMetrics, _ log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []tsdb.BlockReader, meta *tsdb.BlockMeta, indexw tsdb.IndexWriter, chunkw tsdb.ChunkWriter) (err error) { + if len(blocks) == 0 { + return errors.New("cannot populate block from no readers") + } + + var ( + sets []storage.ChunkSeriesSet + setsMtx sync.Mutex + symbols index.StringIter + closers []io.Closer + overlapping bool + ) + defer func() { + errs := tsdb_errors.NewMulti(err) + if cerr := tsdb_errors.CloseAll(closers); cerr != nil { + errs.Add(errors.Wrap(cerr, "close")) + } + err = errs.Err() + metrics.PopulatingBlocks.Set(0) + }() + metrics.PopulatingBlocks.Set(1) + + globalMaxt := blocks[0].Meta().MaxTime + g, _ := errgroup.WithContext(ctx) + g.SetLimit(8) + for i, b := range blocks { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if !overlapping { + if i > 0 && b.Meta().MinTime < globalMaxt { + metrics.OverlappingBlocks.Inc() + overlapping = true + level.Info(c.logger).Log("msg", "Found overlapping blocks during compaction", "ulid", meta.ULID) + } + if b.Meta().MaxTime > globalMaxt { + globalMaxt = b.Meta().MaxTime + } + } + + indexr, err := b.Index() + if err != nil { + return errors.Wrapf(err, "open index reader for block %+v", b.Meta()) + } + closers = append(closers, indexr) + + chunkr, err := b.Chunks() + if err != nil { + return errors.Wrapf(err, "open chunk reader for block %+v", b.Meta()) + } + closers = append(closers, chunkr) + + tombsr, err := b.Tombstones() + if err != nil { + return errors.Wrapf(err, "open tombstone reader for block %+v", b.Meta()) + } + closers = append(closers, tombsr) + + k, v := index.AllPostingsKey() + all, err := indexr.Postings(k, v) + if err != nil { + return err + } + all = indexr.SortedPostings(all) + g.Go(func() error { + shardStart := time.Now() + shardedPosting, err := NewShardedPosting(all, uint64(c.partitionCount), uint64(c.partitionID), indexr.Series) + if err != nil { + return err + } + level.Debug(c.logger).Log("msg", "finished sharding", "duration", time.Since(shardStart)) + // Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp. + setsMtx.Lock() + sets = append(sets, tsdb.NewBlockChunkSeriesSet(meta.ULID, indexr, chunkr, tombsr, shardedPosting, meta.MinTime, meta.MaxTime-1, false)) + setsMtx.Unlock() + return nil + }) + syms := indexr.Symbols() + if i == 0 { + symbols = syms + continue + } + symbols = tsdb.NewMergedStringIter(symbols, syms) + } + if err := g.Wait(); err != nil { + return err + } + + for symbols.Next() { + if err := indexw.AddSymbol(symbols.At()); err != nil { + return errors.Wrap(err, "add symbol") + } + } + if symbols.Err() != nil { + return errors.Wrap(symbols.Err(), "next symbol") + } + + var ( + ref = storage.SeriesRef(0) + ch = make(chan func() error, 1000) + ) + + set := sets[0] + if len(sets) > 1 { + iCtx, cancel := context.WithCancel(ctx) + // Merge series using specified chunk series merger. + // The default one is the compacting series merger. + set = NewBackgroundChunkSeriesSet(iCtx, storage.NewMergeChunkSeriesSet(sets, mergeFunc)) + defer cancel() + } + + go func() { + // Iterate over all sorted chunk series. + for set.Next() { + select { + case <-ctx.Done(): + ch <- func() error { return ctx.Err() } + default: + } + s := set.At() + curChksIter := s.Iterator(nil) + + var chks []chunks.Meta + var wg sync.WaitGroup + r := ref + wg.Add(1) + go func() { + for curChksIter.Next() { + // We are not iterating in streaming way over chunk as + // it's more efficient to do bulk write for index and + // chunk file purposes. + chks = append(chks, curChksIter.At()) + } + wg.Done() + }() + + ch <- func() error { + wg.Wait() + if curChksIter.Err() != nil { + return errors.Wrap(curChksIter.Err(), "chunk iter") + } + + // Skip the series with all deleted chunks. + if len(chks) == 0 { + return nil + } + + if err := chunkw.WriteChunks(chks...); err != nil { + return errors.Wrap(err, "write chunks") + } + if err := indexw.AddSeries(r, s.Labels(), chks...); err != nil { + return errors.Wrap(err, "add series") + } + + meta.Stats.NumChunks += uint64(len(chks)) + meta.Stats.NumSeries++ + for _, chk := range chks { + meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples()) + } + + for _, chk := range chks { + if err := chunkPool.Put(chk.Chunk); err != nil { + return errors.Wrap(err, "put chunk") + } + } + + return nil + } + + ref++ + } + close(ch) + }() + + for callback := range ch { + err := callback() + if err != nil { + return err + } + } + + if set.Err() != nil { + return errors.Wrap(set.Err(), "iterate compaction set") + } + + return nil +} diff --git a/pkg/compactor/sharded_compaction_lifecycle_callback.go b/pkg/compactor/sharded_compaction_lifecycle_callback.go new file mode 100644 index 0000000000..fbc1e93c92 --- /dev/null +++ b/pkg/compactor/sharded_compaction_lifecycle_callback.go @@ -0,0 +1,74 @@ +package compactor + +import ( + "context" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/compact" +) + +type ShardedCompactionLifecycleCallback struct { + userBucket objstore.InstrumentedBucket + partitionedGroupInfoReadFailed prometheus.Counter +} + +func (c ShardedCompactionLifecycleCallback) PreCompactionCallback(_ context.Context, _ log.Logger, _ *compact.Group, _ []*metadata.Meta) error { + return nil +} + +func (c ShardedCompactionLifecycleCallback) PostCompactionCallback(ctx context.Context, logger log.Logger, cg *compact.Group, blockID ulid.ULID) error { + partitionInfo, err := ConvertToPartitionInfo(cg.Extensions()) + if err != nil { + return err + } + if partitionInfo == nil { + return nil + } + partitionedGroupID := partitionInfo.PartitionedGroupID + partitionedGroupInfo, err := ReadPartitionedGroupInfo(ctx, c.userBucket, logger, partitionedGroupID, c.partitionedGroupInfoReadFailed) + if err != nil { + return err + } + // Only try to delete PartitionedGroupFile if there is only one partition. + // For partition count greater than one, cleaner should handle the deletion. + if partitionedGroupInfo.PartitionCount == 1 { + partitionedGroupFile := GetPartitionedGroupFile(partitionedGroupID) + if err := c.userBucket.Delete(ctx, partitionedGroupFile); err != nil { + level.Warn(logger).Log("msg", "failed to delete partitioned group info", "partitioned_group_id", partitionedGroupID, "partitioned_group_info", partitionedGroupFile, "err", err) + } else { + level.Info(logger).Log("msg", "deleted partitioned group info", "partitioned_group_id", partitionedGroupID, "partitioned_group_info", partitionedGroupFile) + } + } + return nil +} + +func (c ShardedCompactionLifecycleCallback) GetBlockPopulator(_ context.Context, logger log.Logger, cg *compact.Group) (tsdb.BlockPopulator, error) { + partitionInfo, err := ConvertToPartitionInfo(cg.Extensions()) + if err != nil { + return nil, err + } + if partitionInfo == nil { + return tsdb.DefaultBlockPopulator{}, nil + } + if partitionInfo.PartitionCount <= 0 { + partitionInfo = &PartitionInfo{ + PartitionCount: 1, + PartitionID: partitionInfo.PartitionID, + PartitionedGroupID: partitionInfo.PartitionedGroupID, + } + cg.SetExtensions(&CortexMetaExtensions{ + PartitionInfo: partitionInfo, + }) + } + populateBlockFunc := ShardedBlockPopulator{ + partitionCount: partitionInfo.PartitionCount, + partitionID: partitionInfo.PartitionID, + logger: logger, + } + return populateBlockFunc, nil +} diff --git a/pkg/compactor/sharded_posting.go b/pkg/compactor/sharded_posting.go new file mode 100644 index 0000000000..7d03d137ed --- /dev/null +++ b/pkg/compactor/sharded_posting.go @@ -0,0 +1,25 @@ +package compactor + +import ( + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/index" +) + +func NewShardedPosting(postings index.Postings, partitionCount uint64, partitionID uint64, labelsFn func(ref storage.SeriesRef, builder *labels.ScratchBuilder, chks *[]chunks.Meta) error) (index.Postings, error) { + bufChks := make([]chunks.Meta, 0) + series := make([]storage.SeriesRef, 0) + var builder labels.ScratchBuilder + for postings.Next() { + err := labelsFn(postings.At(), &builder, &bufChks) + if err != nil { + return nil, err + } + if builder.Labels().Hash()%partitionCount == partitionID { + posting := postings.At() + series = append(series, posting) + } + } + return index.NewListPostings(series), nil +} diff --git a/pkg/compactor/shuffle_sharding_grouper.go b/pkg/compactor/shuffle_sharding_grouper.go index 291dd655f8..3c0a55c587 100644 --- a/pkg/compactor/shuffle_sharding_grouper.go +++ b/pkg/compactor/shuffle_sharding_grouper.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "hash/fnv" + "math" "sort" "strings" "time" @@ -16,6 +17,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/objstore" + thanosblock "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" @@ -50,11 +52,12 @@ type ShuffleShardingGrouper struct { ringLifecyclerAddr string ringLifecyclerID string - blockVisitMarkerTimeout time.Duration - blockVisitMarkerReadFailed prometheus.Counter - blockVisitMarkerWriteFailed prometheus.Counter - - noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark + noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark + blockVisitMarkerTimeout time.Duration + blockVisitMarkerReadFailed prometheus.Counter + blockVisitMarkerWriteFailed prometheus.Counter + partitionedGroupInfoReadFailed prometheus.Counter + partitionedGroupInfoWriteFailed prometheus.Counter } func NewShuffleShardingGrouper( @@ -81,6 +84,8 @@ func NewShuffleShardingGrouper( blockVisitMarkerTimeout time.Duration, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, + partitionedGroupInfoReadFailed prometheus.Counter, + partitionedGroupInfoWriteFailed prometheus.Counter, noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark, ) *ShuffleShardingGrouper { if logger == nil { @@ -120,19 +125,21 @@ func NewShuffleShardingGrouper( Name: "thanos_compact_group_vertical_compactions_total", Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.", }, []string{"group"}), - compactorCfg: compactorCfg, - ring: ring, - ringLifecyclerAddr: ringLifecyclerAddr, - ringLifecyclerID: ringLifecyclerID, - limits: limits, - userID: userID, - blockFilesConcurrency: blockFilesConcurrency, - blocksFetchConcurrency: blocksFetchConcurrency, - compactionConcurrency: compactionConcurrency, - blockVisitMarkerTimeout: blockVisitMarkerTimeout, - blockVisitMarkerReadFailed: blockVisitMarkerReadFailed, - blockVisitMarkerWriteFailed: blockVisitMarkerWriteFailed, - noCompBlocksFunc: noCompBlocksFunc, + compactorCfg: compactorCfg, + ring: ring, + ringLifecyclerAddr: ringLifecyclerAddr, + ringLifecyclerID: ringLifecyclerID, + limits: limits, + userID: userID, + blockFilesConcurrency: blockFilesConcurrency, + blocksFetchConcurrency: blocksFetchConcurrency, + compactionConcurrency: compactionConcurrency, + blockVisitMarkerTimeout: blockVisitMarkerTimeout, + blockVisitMarkerReadFailed: blockVisitMarkerReadFailed, + blockVisitMarkerWriteFailed: blockVisitMarkerWriteFailed, + partitionedGroupInfoReadFailed: partitionedGroupInfoReadFailed, + partitionedGroupInfoWriteFailed: partitionedGroupInfoWriteFailed, + noCompBlocksFunc: noCompBlocksFunc, } } @@ -181,23 +188,23 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re sort.SliceStable(groups, func(i, j int) bool { iGroup := groups[i] jGroup := groups[j] - iMinTime := iGroup.minTime() - iMaxTime := iGroup.maxTime() - jMinTime := jGroup.minTime() - jMaxTime := jGroup.maxTime() - iLength := iMaxTime - iMinTime - jLength := jMaxTime - jMinTime + iRangeStart := iGroup.rangeStart + iRangeEnd := iGroup.rangeEnd + jRangeStart := jGroup.rangeStart + jRangeEnd := jGroup.rangeEnd + iLength := iRangeEnd - iRangeStart + jLength := jRangeEnd - jRangeStart if iLength != jLength { return iLength < jLength } - if iMinTime != jMinTime { - return iMinTime < jMinTime + if iRangeStart != jRangeStart { + return iRangeStart < jRangeStart } - iGroupHash := hashGroup(g.userID, iGroup.rangeStart, iGroup.rangeEnd) + iGroupHash := hashGroup(g.userID, iRangeStart, iRangeEnd) iGroupKey := createGroupKey(iGroupHash, iGroup) - jGroupHash := hashGroup(g.userID, jGroup.rangeStart, jGroup.rangeEnd) + jGroupHash := hashGroup(g.userID, jRangeStart, jRangeEnd) jGroupKey := createGroupKey(jGroupHash, jGroup) // Guarantee stable sort for tests. return iGroupKey < jGroupKey @@ -219,85 +226,268 @@ mainLoop: groupHash := hashGroup(g.userID, group.rangeStart, group.rangeEnd) - if isVisited, err := g.isGroupVisited(group.blocks, g.ringLifecyclerID); err != nil { - level.Warn(g.logger).Log("msg", "unable to check if blocks in group are visited", "group hash", groupHash, "err", err, "group", group.String()) - continue - } else if isVisited { - level.Info(g.logger).Log("msg", "skipping group because at least one block in group is visited", "group_hash", groupHash) + partitionedGroupInfo, err := g.generatePartitionBlockGroup(group, groupHash) + if err != nil { + level.Warn(g.logger).Log("msg", "unable to update partitioned group info", "partitioned_group_id", groupHash, "err", err) continue } + level.Debug(g.logger).Log("msg", "generated partitioned groups", "groups", partitionedGroupInfo) + + partitionedGroupID := partitionedGroupInfo.PartitionedGroupID + partitionCount := partitionedGroupInfo.PartitionCount + for _, partition := range partitionedGroupInfo.Partitions { + partitionID := partition.PartitionID + partitionedGroup, err := createBlocksGroup(blocks, partition.Blocks, partitionedGroupInfo.RangeStart, partitionedGroupInfo.RangeEnd) + if err != nil { + level.Error(g.logger).Log("msg", "unable to create partitioned group", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "err", err) + continue + } + if isVisited, err := g.isGroupVisited(partitionedGroup.blocks, partitionID, g.ringLifecyclerID); err != nil { + level.Warn(g.logger).Log("msg", "unable to check if blocks in partition are visited", "group hash", groupHash, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "err", err, "group", group.String()) + continue + } else if isVisited { + level.Info(g.logger).Log("msg", "skipping group because at least one block in partition is visited", "group_hash", groupHash, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID) + continue + } - remainingCompactions++ - groupKey := createGroupKey(groupHash, group) + remainingCompactions++ + partitionedGroupKey := createGroupKeyWithPartitionID(groupHash, partitionID, *partitionedGroup) + + level.Info(g.logger).Log("msg", "found compactable group for user", "group_hash", groupHash, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "group", partitionedGroup.String()) + blockVisitMarker := BlockVisitMarker{ + VisitTime: time.Now().Unix(), + CompactorID: g.ringLifecyclerID, + Status: Pending, + PartitionedGroupID: partitionedGroupID, + PartitionID: partitionID, + Version: VisitMarkerVersion1, + } + markBlocksVisited(g.ctx, g.bkt, g.logger, partitionedGroup.blocks, blockVisitMarker, g.blockVisitMarkerWriteFailed) + + resolution := partitionedGroup.blocks[0].Thanos.Downsample.Resolution + externalLabels := labels.FromMap(partitionedGroup.blocks[0].Thanos.Labels) + thanosGroup, err := compact.NewGroup( + log.With(g.logger, "groupKey", partitionedGroupKey, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "rangeStart", partitionedGroup.rangeStartTime().String(), "rangeEnd", partitionedGroup.rangeEndTime().String(), "externalLabels", externalLabels, "downsampleResolution", resolution), + g.bkt, + partitionedGroupKey, + externalLabels, + resolution, + g.acceptMalformedIndex, + true, // Enable vertical compaction. + g.compactions.WithLabelValues(partitionedGroupKey), + g.compactionRunsStarted.WithLabelValues(partitionedGroupKey), + g.compactionRunsCompleted.WithLabelValues(partitionedGroupKey), + g.compactionFailures.WithLabelValues(partitionedGroupKey), + g.verticalCompactions.WithLabelValues(partitionedGroupKey), + g.garbageCollectedBlocks, + g.blocksMarkedForDeletion, + g.blocksMarkedForNoCompact, + g.hashFunc, + g.blockFilesConcurrency, + g.blocksFetchConcurrency, + ) + if err != nil { + level.Error(g.logger).Log("msg", "failed to create partitioned group", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "blocks", partition.Blocks) + } - level.Info(g.logger).Log("msg", "found compactable group for user", "group_hash", groupHash, "group", group.String()) - blockVisitMarker := BlockVisitMarker{ - VisitTime: time.Now().Unix(), - CompactorID: g.ringLifecyclerID, - Version: VisitMarkerVersion1, + for _, m := range partitionedGroup.blocks { + if err := thanosGroup.AppendMeta(m); err != nil { + level.Error(g.logger).Log("msg", "failed to add block to partitioned group", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "block", m.ULID) + } + } + thanosGroup.SetExtensions(&CortexMetaExtensions{ + PartitionInfo: &PartitionInfo{ + PartitionedGroupID: partitionedGroupID, + PartitionCount: partitionCount, + PartitionID: partitionID, + }, + }) + + outGroups = append(outGroups, thanosGroup) + if len(outGroups) >= g.compactionConcurrency { + break mainLoop + } } - markBlocksVisited(g.ctx, g.bkt, g.logger, group.blocks, blockVisitMarker, g.blockVisitMarkerWriteFailed) - - // All the blocks within the same group have the same downsample - // resolution and external labels. - resolution := group.blocks[0].Thanos.Downsample.Resolution - externalLabels := labels.FromMap(group.blocks[0].Thanos.Labels) - - thanosGroup, err := compact.NewGroup( - log.With(g.logger, "groupKey", groupKey, "rangeStart", group.rangeStartTime().String(), "rangeEnd", group.rangeEndTime().String(), "externalLabels", externalLabels, "downsampleResolution", resolution), - g.bkt, - groupKey, - externalLabels, - resolution, - false, // No malformed index. - true, // Enable vertical compaction. - g.compactions.WithLabelValues(groupKey), - g.compactionRunsStarted.WithLabelValues(groupKey), - g.compactionRunsCompleted.WithLabelValues(groupKey), - g.compactionFailures.WithLabelValues(groupKey), - g.verticalCompactions.WithLabelValues(groupKey), - g.garbageCollectedBlocks, - g.blocksMarkedForDeletion, - g.blocksMarkedForNoCompact, - g.hashFunc, - g.blockFilesConcurrency, - g.blocksFetchConcurrency, - ) - if err != nil { - return nil, errors.Wrap(err, "create compaction group") + } + + level.Info(g.logger).Log("msg", fmt.Sprintf("total groups for compaction: %d", len(outGroups))) + + return outGroups, nil +} + +func (g *ShuffleShardingGrouper) generatePartitionBlockGroup(group blocksGroup, groupHash uint32) (*PartitionedGroupInfo, error) { + partitionedGroupInfo, err := g.partitionBlockGroup(group, groupHash) + if err != nil { + return nil, err + } + updatedPartitionedGroupInfo, err := UpdatePartitionedGroupInfo(g.ctx, g.bkt, g.logger, *partitionedGroupInfo, g.partitionedGroupInfoReadFailed, g.partitionedGroupInfoWriteFailed) + if err != nil { + level.Warn(g.logger).Log("msg", "unable to update partitioned group info", "partitioned_group_id", partitionedGroupInfo.PartitionedGroupID, "err", err) + return nil, err + } + level.Debug(g.logger).Log("msg", "generated partitioned groups", "groups", updatedPartitionedGroupInfo) + return updatedPartitionedGroupInfo, nil +} + +func (g *ShuffleShardingGrouper) partitionBlockGroup(group blocksGroup, groupHash uint32) (*PartitionedGroupInfo, error) { + partitionCount := g.calculatePartitionCount(group) + blocksByMinTime := g.groupBlocksByMinTime(group) + partitionedGroups, err := g.partitionBlocksGroup(partitionCount, blocksByMinTime, group.rangeStart, group.rangeEnd) + if err != nil { + return nil, err + } + + var partitions []Partition + for partitionID, partitionedGroup := range partitionedGroups { + var blockIDs []ulid.ULID + for _, m := range partitionedGroup.blocks { + blockIDs = append(blockIDs, m.ULID) } + partitions = append(partitions, Partition{ + PartitionID: partitionID, + Blocks: blockIDs, + }) + } + partitionedGroupInfo := PartitionedGroupInfo{ + PartitionedGroupID: groupHash, + PartitionCount: partitionCount, + Partitions: partitions, + RangeStart: group.rangeStart, + RangeEnd: group.rangeEnd, + Version: PartitionedGroupInfoVersion1, + } + return &partitionedGroupInfo, nil +} - for _, m := range group.blocks { - if err := thanosGroup.AppendMeta(m); err != nil { - return nil, errors.Wrap(err, "add block to compaction group") +func (g *ShuffleShardingGrouper) calculatePartitionCount(group blocksGroup) int { + indexSizeLimit := g.compactorCfg.PartitionIndexSizeLimitInBytes + seriesCountLimit := g.compactorCfg.PartitionSeriesCountLimit + totalIndexSizeInBytes := int64(0) + totalSeriesCount := int64(0) + for _, block := range group.blocks { + blockFiles := block.Thanos.Files + totalSeriesCount += int64(block.Stats.NumSeries) + var indexFile *metadata.File + for _, file := range blockFiles { + if file.RelPath == thanosblock.IndexFilename { + indexFile = &file } } + if indexFile == nil { + level.Debug(g.logger).Log("msg", "unable to find index file in metadata", "block", block.ULID) + break + } + indexSize := indexFile.SizeBytes + totalIndexSizeInBytes += indexSize + } + partitionNumberBasedOnIndex := 1 + if indexSizeLimit > 0 && totalIndexSizeInBytes > indexSizeLimit { + partitionNumberBasedOnIndex = g.findNearestPartitionNumber(float64(totalIndexSizeInBytes), float64(indexSizeLimit)) + } + partitionNumberBasedOnSeries := 1 + if seriesCountLimit > 0 && totalSeriesCount > seriesCountLimit { + partitionNumberBasedOnSeries = g.findNearestPartitionNumber(float64(totalSeriesCount), float64(seriesCountLimit)) + } + partitionNumber := partitionNumberBasedOnIndex + if partitionNumberBasedOnSeries > partitionNumberBasedOnIndex { + partitionNumber = partitionNumberBasedOnSeries + } + level.Debug(g.logger).Log("msg", "calculated partition number for group", "group", group.String(), "partition_number", partitionNumber, "total_index_size", totalIndexSizeInBytes, "index_size_limit", indexSizeLimit, "total_series_count", totalSeriesCount, "series_count_limit", seriesCountLimit) + return partitionNumber +} + +func (g *ShuffleShardingGrouper) findNearestPartitionNumber(size float64, limit float64) int { + return int(math.Pow(2, math.Ceil(math.Log2(size/limit)))) +} - outGroups = append(outGroups, thanosGroup) - if len(outGroups) == g.compactionConcurrency { - break mainLoop +func (g *ShuffleShardingGrouper) groupBlocksByMinTime(group blocksGroup) map[int64][]*metadata.Meta { + blocksByMinTime := make(map[int64][]*metadata.Meta) + for _, block := range group.blocks { + blockRange := block.MaxTime - block.MinTime + minTime := block.MinTime + for _, tr := range g.compactorCfg.BlockRanges.ToMilliseconds() { + if blockRange <= tr { + minTime = tr * (block.MinTime / tr) + break + } } + blocksByMinTime[minTime] = append(blocksByMinTime[minTime], block) } + return blocksByMinTime +} - level.Info(g.logger).Log("msg", fmt.Sprintf("total groups for compaction: %d", len(outGroups))) +func (g *ShuffleShardingGrouper) partitionBlocksGroup(partitionCount int, blocksByMinTime map[int64][]*metadata.Meta, rangeStart int64, rangeEnd int64) (map[int]blocksGroup, error) { + partitionedGroups := make(map[int]blocksGroup) + addToPartitionedGroups := func(blocks []*metadata.Meta, partitionID int) { + if _, ok := partitionedGroups[partitionID]; !ok { + partitionedGroups[partitionID] = blocksGroup{ + rangeStart: rangeStart, + rangeEnd: rangeEnd, + blocks: []*metadata.Meta{}, + } + } + partitionedGroup := partitionedGroups[partitionID] + partitionedGroup.blocks = append(partitionedGroup.blocks, blocks...) + partitionedGroups[partitionID] = partitionedGroup + } - return outGroups, nil + for _, blocksInSameTimeInterval := range blocksByMinTime { + numOfBlocks := len(blocksInSameTimeInterval) + numBlocksCheck := math.Log2(float64(numOfBlocks)) + if math.Ceil(numBlocksCheck) == math.Floor(numBlocksCheck) { + // Case that number of blocks in this time interval is 2^n, should + // use modulo calculation to find blocks for each partition ID. + for _, block := range blocksInSameTimeInterval { + partitionInfo, err := GetPartitionInfo(*block) + if err != nil { + return nil, err + } + if partitionInfo == nil { + // For legacy blocks with level > 1, treat PartitionID is always 0. + // So it can be included in every partition. + partitionInfo = &PartitionInfo{ + PartitionID: 0, + } + } + if numOfBlocks < partitionCount { + for partitionID := partitionInfo.PartitionID; partitionID < partitionCount; partitionID += numOfBlocks { + addToPartitionedGroups([]*metadata.Meta{block}, partitionID) + } + } else if numOfBlocks == partitionCount { + addToPartitionedGroups([]*metadata.Meta{block}, partitionInfo.PartitionID) + } else { + addToPartitionedGroups([]*metadata.Meta{block}, partitionInfo.PartitionID%partitionCount) + } + } + } else { + // Case that number of blocks in this time interval is not 2^n, should + // include all blocks in all partitions. + for partitionID := 0; partitionID < partitionCount; partitionID++ { + addToPartitionedGroups(blocksInSameTimeInterval, partitionID) + } + } + } + return partitionedGroups, nil } -func (g *ShuffleShardingGrouper) isGroupVisited(blocks []*metadata.Meta, compactorID string) (bool, error) { +func (g *ShuffleShardingGrouper) isGroupVisited(blocks []*metadata.Meta, partitionID int, compactorID string) (bool, error) { for _, block := range blocks { blockID := block.ULID.String() - blockVisitMarker, err := ReadBlockVisitMarker(g.ctx, g.bkt, g.logger, blockID, g.blockVisitMarkerReadFailed) + blockVisitMarker, err := ReadBlockVisitMarker(g.ctx, g.bkt, g.logger, blockID, partitionID, g.blockVisitMarkerReadFailed) if err != nil { if errors.Is(err, ErrorBlockVisitMarkerNotFound) { - level.Debug(g.logger).Log("msg", "no visit marker file for block", "blockID", blockID) + level.Warn(g.logger).Log("msg", "no visit marker file for block", "partition_id", partitionID, "block_id", blockID) continue } - level.Error(g.logger).Log("msg", "unable to read block visit marker file", "blockID", blockID, "err", err) + level.Error(g.logger).Log("msg", "unable to read block visit marker file", "partition_id", partitionID, "block_id", blockID, "err", err) return true, err } - if compactorID != blockVisitMarker.CompactorID && blockVisitMarker.isVisited(g.blockVisitMarkerTimeout) { - level.Debug(g.logger).Log("msg", fmt.Sprintf("visited block: %s", blockID)) + if blockVisitMarker.isCompleted() { + level.Info(g.logger).Log("msg", "block visit marker with partition ID is completed", "partition_id", partitionID, "block_id", blockID) + return true, nil + } + if compactorID != blockVisitMarker.CompactorID && blockVisitMarker.isVisited(g.blockVisitMarkerTimeout, partitionID) { + level.Info(g.logger).Log("msg", "visited block with partition ID", "partition_id", partitionID, "block_id", blockID) return true, nil } } @@ -331,6 +521,24 @@ func createGroupKey(groupHash uint32, group blocksGroup) string { return fmt.Sprintf("%v%s", groupHash, group.blocks[0].Thanos.GroupKey()) } +func createGroupKeyWithPartitionID(groupHash uint32, partitionID int, group blocksGroup) string { + return fmt.Sprintf("%v%d%s", groupHash, partitionID, group.blocks[0].Thanos.GroupKey()) +} + +func createBlocksGroup(blocks map[ulid.ULID]*metadata.Meta, blockIDs []ulid.ULID, rangeStart int64, rangeEnd int64) (*blocksGroup, error) { + var group blocksGroup + group.rangeStart = rangeStart + group.rangeEnd = rangeEnd + for _, blockID := range blockIDs { + m, ok := blocks[blockID] + if !ok { + return nil, fmt.Errorf("block not found: %s", blockID) + } + group.blocks = append(group.blocks, m) + } + return &group, nil +} + // blocksGroup struct and functions copied and adjusted from https://github.com/cortexproject/cortex/pull/2616 type blocksGroup struct { rangeStart int64 // Included. @@ -428,7 +636,28 @@ func groupBlocksByCompactableRanges(blocks []*metadata.Meta, ranges []int64) []b } } - groups = append(groups, group) + firstBlockPartitionInfo, err := GetPartitionInfo(*group.blocks[0]) + if err != nil || firstBlockPartitionInfo == nil { + firstBlockPartitionInfo = &PartitionInfo{ + PartitionedGroupID: 0, + PartitionCount: 1, + PartitionID: 0, + } + } + for _, block := range group.blocks { + blockPartitionInfo, err := GetPartitionInfo(*block) + if err != nil || blockPartitionInfo == nil { + blockPartitionInfo = &PartitionInfo{ + PartitionedGroupID: 0, + PartitionCount: 1, + PartitionID: 0, + } + } + if blockPartitionInfo.PartitionedGroupID <= 0 || blockPartitionInfo.PartitionedGroupID != firstBlockPartitionInfo.PartitionedGroupID { + groups = append(groups, group) + continue nextGroup + } + } } } @@ -446,10 +675,10 @@ func groupBlocksByCompactableRanges(blocks []*metadata.Meta, ranges []int64) []b } // If the group covers the full range, it's fine. - if group.maxTime()-group.minTime() == group.rangeLength() { - idx++ - continue - } + //if group.maxTime()-group.minTime() == group.rangeLength() { + // idx++ + // continue + //} // If the group's maxTime is after 1 block range, we can compact assuming that // all the required blocks have already been uploaded. @@ -491,6 +720,11 @@ func groupBlocksByRange(blocks []*metadata.Meta, tr int64) []blocksGroup { continue } + if skipHighLevelBlock(m, tr) { + i++ + continue + } + // Add all blocks to the current group that are within [t0, t0+tr]. for ; i < len(blocks); i++ { // If the block does not start within this group, then we should break the iteration @@ -505,6 +739,10 @@ func groupBlocksByRange(blocks []*metadata.Meta, tr int64) []blocksGroup { continue } + if skipHighLevelBlock(blocks[i], tr) { + continue + } + group.blocks = append(group.blocks, blocks[i]) } @@ -516,6 +754,14 @@ func groupBlocksByRange(blocks []*metadata.Meta, tr int64) []blocksGroup { return ret } +func skipHighLevelBlock(block *metadata.Meta, tr int64) bool { + // Skip blocks that have rounded range equal to tr, and level > 1 + // Because tr is divisible by the previous tr, block range falls in + // (tr/2, tr] should be rounded to tr. + blockRange := block.MaxTime - block.MinTime + return blockRange <= tr && blockRange > tr/2 && block.Compaction.Level > 1 +} + func getRangeStart(m *metadata.Meta, tr int64) int64 { // Compute start of aligned time range of size tr closest to the current block's start. // This code has been copied from TSDB. diff --git a/pkg/compactor/shuffle_sharding_grouper_test.go b/pkg/compactor/shuffle_sharding_grouper_test.go index a791f2eaa0..4bc703cbac 100644 --- a/pkg/compactor/shuffle_sharding_grouper_test.go +++ b/pkg/compactor/shuffle_sharding_grouper_test.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/json" - "path" "testing" "time" @@ -359,11 +358,19 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { Name: "cortex_compactor_block_visit_marker_write_failed", Help: "Number of block visit marker file failed to be written.", }) + partitionedGroupInfoReadFailed := promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_compactor_partitioned_group_info_read_failed", + Help: "Number of partitioned group info file failed to be read.", + }) + partitionedGroupInfoWriteFailed := promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_compactor_partitioned_group_info_write_failed", + Help: "Number of partitioned group info file failed to be written.", + }) bkt := &bucket.ClientMock{} blockVisitMarkerTimeout := 5 * time.Minute for _, visitedBlock := range testData.visitedBlocks { - visitMarkerFile := path.Join(visitedBlock.id.String(), BlockVisitMarkerFile) + visitMarkerFile := GetBlockVisitMarkerFile(visitedBlock.id.String(), 0) expireTime := time.Now() if visitedBlock.isExpired { expireTime = expireTime.Add(-1 * blockVisitMarkerTimeout) @@ -409,6 +416,8 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { blockVisitMarkerTimeout, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed, + partitionedGroupInfoReadFailed, + partitionedGroupInfoWriteFailed, noCompactFilter, ) actual, err := g.Groups(testData.blocks) diff --git a/pkg/compactor/shuffle_sharding_planner.go b/pkg/compactor/shuffle_sharding_planner.go index 7265ede93a..9942d43f1a 100644 --- a/pkg/compactor/shuffle_sharding_planner.go +++ b/pkg/compactor/shuffle_sharding_planner.go @@ -52,7 +52,18 @@ func NewShuffleShardingPlanner( } } -func (p *ShuffleShardingPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { +func (p *ShuffleShardingPlanner) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, errChan chan error, extensions any) ([]*metadata.Meta, error) { + partitionInfo, err := ConvertToPartitionInfo(extensions) + if err != nil { + return nil, err + } + if partitionInfo == nil { + return nil, fmt.Errorf("partitionInfo cannot be nil") + } + return p.PlanWithPartition(ctx, metasByMinTime, partitionInfo.PartitionID, errChan) +} + +func (p *ShuffleShardingPlanner) PlanWithPartition(_ context.Context, metasByMinTime []*metadata.Meta, partitionID int, errChan chan error) ([]*metadata.Meta, error) { // Ensure all blocks fits within the largest range. This is a double check // to ensure there's no bug in the previous blocks grouping, given this Plan() // is just a pass-through. @@ -63,6 +74,7 @@ func (p *ShuffleShardingPlanner) Plan(_ context.Context, metasByMinTime []*metad noCompactMarked := p.noCompBlocksFunc() resultMetas := make([]*metadata.Meta, 0, len(metasByMinTime)) + var partitionGroupID uint32 for _, b := range metasByMinTime { blockID := b.ULID.String() if _, excluded := noCompactMarked[b.ULID]; excluded { @@ -73,25 +85,30 @@ func (p *ShuffleShardingPlanner) Plan(_ context.Context, metasByMinTime []*metad return nil, fmt.Errorf("block %s with time range %d:%d is outside the largest expected range %d:%d", blockID, b.MinTime, b.MaxTime, rangeStart, rangeEnd) } - blockVisitMarker, err := ReadBlockVisitMarker(p.ctx, p.bkt, p.logger, blockID, p.blockVisitMarkerReadFailed) + blockVisitMarker, err := ReadBlockVisitMarker(p.ctx, p.bkt, p.logger, blockID, partitionID, p.blockVisitMarkerReadFailed) if err != nil { // shuffle_sharding_grouper should put visit marker file for blocks ready for // compaction. So error should be returned if visit marker file does not exist. - return nil, fmt.Errorf("unable to get visit marker file for block %s: %s", blockID, err.Error()) + return nil, fmt.Errorf("unable to get visit marker file for block %s with partition ID %d: %s", blockID, partitionID, err.Error()) + } + if blockVisitMarker.isCompleted() { + return nil, fmt.Errorf("block %s with partition ID %d is in completed status", blockID, partitionID) } - if !blockVisitMarker.isVisitedByCompactor(p.blockVisitMarkerTimeout, p.ringLifecyclerID) { - level.Warn(p.logger).Log("msg", "block is not visited by current compactor", "block_id", blockID, "compactor_id", p.ringLifecyclerID) + if !blockVisitMarker.isVisitedByCompactor(p.blockVisitMarkerTimeout, partitionID, p.ringLifecyclerID) { + level.Warn(p.logger).Log("msg", "block is not visited by current compactor", "block_id", blockID, "partition_id", partitionID, "compactor_id", p.ringLifecyclerID) return nil, nil } + partitionGroupID = blockVisitMarker.PartitionedGroupID resultMetas = append(resultMetas, b) } if len(resultMetas) < 2 { + level.Info(p.logger).Log("msg", "result meta size is less than 2", "partitioned_group_id", partitionGroupID, "partition_id", partitionID, "size", len(resultMetas)) return nil, nil } - go markBlocksVisitedHeartBeat(p.ctx, p.bkt, p.logger, resultMetas, p.ringLifecyclerID, p.blockVisitMarkerFileUpdateInterval, p.blockVisitMarkerWriteFailed) + go markBlocksVisitedHeartBeat(p.ctx, p.bkt, p.logger, resultMetas, partitionGroupID, partitionID, p.ringLifecyclerID, p.blockVisitMarkerFileUpdateInterval, p.blockVisitMarkerWriteFailed, errChan) return resultMetas, nil } diff --git a/pkg/compactor/shuffle_sharding_planner_test.go b/pkg/compactor/shuffle_sharding_planner_test.go index 9ecbae1afb..8f5536564e 100644 --- a/pkg/compactor/shuffle_sharding_planner_test.go +++ b/pkg/compactor/shuffle_sharding_planner_test.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "path" "testing" "time" @@ -342,7 +341,7 @@ func TestShuffleShardingPlanner_Plan(t *testing.T) { t.Run(testName, func(t *testing.T) { bkt := &bucket.ClientMock{} for _, visitedBlock := range testData.visitedBlocks { - visitMarkerFile := path.Join(visitedBlock.id.String(), BlockVisitMarkerFile) + visitMarkerFile := GetBlockVisitMarkerFile(visitedBlock.id.String(), 0) expireTime := time.Now() if visitedBlock.isExpired { expireTime = expireTime.Add(-1 * blockVisitMarkerTimeout) @@ -383,7 +382,7 @@ func TestShuffleShardingPlanner_Plan(t *testing.T) { blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed, ) - actual, err := p.Plan(context.Background(), testData.blocks) + actual, err := p.Plan(context.Background(), testData.blocks, nil, nil) if testData.expectedErr != nil { assert.Equal(t, err, testData.expectedErr) diff --git a/pkg/storage/tsdb/bucketindex/index.go b/pkg/storage/tsdb/bucketindex/index.go index 5c5f6cb5d4..6d93fa35e5 100644 --- a/pkg/storage/tsdb/bucketindex/index.go +++ b/pkg/storage/tsdb/bucketindex/index.go @@ -226,6 +226,14 @@ func (s BlockDeletionMarks) GetULIDs() []ulid.ULID { return ids } +func (s BlockDeletionMarks) GetULIDSet() map[ulid.ULID]struct{} { + res := make(map[ulid.ULID]struct{}) + for _, m := range s { + res[m.ID] = struct{}{} + } + return res +} + func (s BlockDeletionMarks) Clone() BlockDeletionMarks { clone := make(BlockDeletionMarks, len(s)) for i, m := range s { diff --git a/vendor/github.com/thanos-io/thanos/pkg/block/fetcher.go b/vendor/github.com/thanos-io/thanos/pkg/block/fetcher.go index e97fc62f8b..a8582343d7 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/block/fetcher.go +++ b/vendor/github.com/thanos-io/thanos/pkg/block/fetcher.go @@ -577,24 +577,28 @@ func (f *LabelShardedMetaFilter) Filter(_ context.Context, metas map[ulid.ULID]* return nil } -var _ MetadataFilter = &DeduplicateFilter{} +var _ MetadataFilter = &DefaultDeduplicateFilter{} -// DeduplicateFilter is a BaseFetcher filter that filters out older blocks that have exactly the same data. +type DeduplicateFilter interface { + DuplicateIDs() []ulid.ULID +} + +// DefaultDeduplicateFilter is a BaseFetcher filter that filters out older blocks that have exactly the same data. // Not go-routine safe. -type DeduplicateFilter struct { +type DefaultDeduplicateFilter struct { duplicateIDs []ulid.ULID concurrency int mu sync.Mutex } -// NewDeduplicateFilter creates DeduplicateFilter. -func NewDeduplicateFilter(concurrency int) *DeduplicateFilter { - return &DeduplicateFilter{concurrency: concurrency} +// NewDeduplicateFilter creates DefaultDeduplicateFilter. +func NewDeduplicateFilter(concurrency int) *DefaultDeduplicateFilter { + return &DefaultDeduplicateFilter{concurrency: concurrency} } // Filter filters out duplicate blocks that can be formed // from two or more overlapping blocks that fully submatches the source blocks of the older blocks. -func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error { +func (f *DefaultDeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error { f.duplicateIDs = f.duplicateIDs[:0] var wg sync.WaitGroup @@ -626,7 +630,7 @@ func (f *DeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metad return nil } -func (f *DeduplicateFilter) filterGroup(metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec) { +func (f *DefaultDeduplicateFilter) filterGroup(metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec) { sort.Slice(metaSlice, func(i, j int) bool { ilen := len(metaSlice[i].Compaction.Sources) jlen := len(metaSlice[j].Compaction.Sources) @@ -668,8 +672,8 @@ childLoop: f.mu.Unlock() } -// DuplicateIDs returns slice of block ids that are filtered out by DeduplicateFilter. -func (f *DeduplicateFilter) DuplicateIDs() []ulid.ULID { +// DuplicateIDs returns slice of block ids that are filtered out by DefaultDeduplicateFilter. +func (f *DefaultDeduplicateFilter) DuplicateIDs() []ulid.ULID { return f.duplicateIDs } diff --git a/vendor/github.com/thanos-io/thanos/pkg/block/metadata/meta.go b/vendor/github.com/thanos-io/thanos/pkg/block/metadata/meta.go index 787a03c241..f3d3fc9a67 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/block/metadata/meta.go +++ b/vendor/github.com/thanos-io/thanos/pkg/block/metadata/meta.go @@ -11,10 +11,6 @@ package metadata import ( "encoding/json" "fmt" - "io" - "os" - "path/filepath" - "github.com/go-kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -25,6 +21,9 @@ import ( "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/tombstones" "gopkg.in/yaml.v3" + "io" + "os" + "path/filepath" "github.com/thanos-io/thanos/pkg/runutil" ) @@ -90,6 +89,27 @@ type Thanos struct { // Rewrites is present when any rewrite (deletion, relabel etc) were applied to this block. Optional. Rewrites []Rewrite `json:"rewrites,omitempty"` + + // Extensions are used for plugin any arbitrary additional information for block. Optional. + Extensions any `json:"extensions,omitempty"` +} + +func (m *Thanos) ParseExtensions(v any) (any, error) { + return ConvertExtensions(m.Extensions, v) +} + +func ConvertExtensions(extensions any, v any) (any, error) { + if extensions == nil { + return nil, nil + } + extensionsContent, err := json.Marshal(extensions) + if err != nil { + return nil, err + } + if err = json.Unmarshal(extensionsContent, v); err != nil { + return nil, err + } + return v, nil } type Rewrite struct { diff --git a/vendor/github.com/thanos-io/thanos/pkg/compact/compact.go b/vendor/github.com/thanos-io/thanos/pkg/compact/compact.go index c6c36bf3cc..c6b04934f1 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/compact/compact.go +++ b/vendor/github.com/thanos-io/thanos/pkg/compact/compact.go @@ -5,6 +5,7 @@ package compact import ( "context" + "encoding/json" "fmt" "math" "os" @@ -58,7 +59,7 @@ type Syncer struct { blocks map[ulid.ULID]*metadata.Meta partial map[ulid.ULID]error metrics *syncerMetrics - duplicateBlocksFilter *block.DeduplicateFilter + duplicateBlocksFilter block.DeduplicateFilter ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter } @@ -95,7 +96,7 @@ func newSyncerMetrics(reg prometheus.Registerer, blocksMarkedForDeletion, garbag // NewMetaSyncer returns a new Syncer for the given Bucket and directory. // Blocks must be at least as old as the sync delay for being considered. -func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter *block.DeduplicateFilter, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks prometheus.Counter) (*Syncer, error) { +func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter block.DeduplicateFilter, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks prometheus.Counter) (*Syncer, error) { if logger == nil { logger = log.NewNopLogger() } @@ -350,6 +351,7 @@ type Group struct { hashFunc metadata.HashFunc blockFilesConcurrency int compactBlocksFetchConcurrency int + extensions any } // NewGroup returns a new compaction group. @@ -490,6 +492,25 @@ func (cg *Group) Resolution() int64 { return cg.resolution } +func (cg *Group) Extensions() any { + return cg.extensions +} + +func (cg *Group) ParseExtensions(v any) error { + extensionsContent, err := json.Marshal(cg.extensions) + if err != nil { + return err + } + if err = json.Unmarshal(extensionsContent, v); err != nil { + return err + } + return nil +} + +func (cg *Group) SetExtensions(extensions any) { + cg.extensions = extensions +} + // CompactProgressMetrics contains Prometheus metrics related to compaction progress. type CompactProgressMetrics struct { NumberOfCompactionRuns *prometheus.GaugeVec @@ -535,7 +556,7 @@ func (ps *CompactionProgressCalculator) ProgressCalculate(ctx context.Context, g if len(g.IDs()) == 1 { continue } - plan, err := ps.planner.Plan(ctx, g.metasByMinTime) + plan, err := ps.planner.Plan(ctx, g.metasByMinTime, nil, g.extensions) if err != nil { return errors.Wrapf(err, "could not plan") } @@ -727,7 +748,50 @@ func (rs *RetentionProgressCalculator) ProgressCalculate(ctx context.Context, gr type Planner interface { // Plan returns a list of blocks that should be compacted into single one. // The blocks can be overlapping. The provided metadata has to be ordered by minTime. - Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) + Plan(ctx context.Context, metasByMinTime []*metadata.Meta, errChan chan error, extensions any) ([]*metadata.Meta, error) +} + +type BlockDeletableChecker interface { + CanDelete(group *Group, blockID ulid.ULID) bool +} + +type DefaultBlockDeletableChecker struct { +} + +func (c DefaultBlockDeletableChecker) CanDelete(_ *Group, _ ulid.ULID) bool { + return true +} + +type CompactionLifecycleCallback interface { + PreCompactionCallback(ctx context.Context, logger log.Logger, group *Group, toCompactBlocks []*metadata.Meta) error + PostCompactionCallback(ctx context.Context, logger log.Logger, group *Group, blockID ulid.ULID) error + GetBlockPopulator(ctx context.Context, logger log.Logger, group *Group) (tsdb.BlockPopulator, error) +} + +type DefaultCompactionLifecycleCallback struct { +} + +func (c DefaultCompactionLifecycleCallback) PreCompactionCallback(_ context.Context, _ log.Logger, _ *Group, toCompactBlocks []*metadata.Meta) error { + // Due to #183 we verify that none of the blocks in the plan have overlapping sources. + // This is one potential source of how we could end up with duplicated chunks. + uniqueSources := map[ulid.ULID]struct{}{} + for _, m := range toCompactBlocks { + for _, s := range m.Compaction.Sources { + if _, ok := uniqueSources[s]; ok { + return halt(errors.Errorf("overlapping sources detected for plan %v", toCompactBlocks)) + } + uniqueSources[s] = struct{}{} + } + } + return nil +} + +func (c DefaultCompactionLifecycleCallback) PostCompactionCallback(_ context.Context, _ log.Logger, _ *Group, _ ulid.ULID) error { + return nil +} + +func (c DefaultCompactionLifecycleCallback) GetBlockPopulator(_ context.Context, _ log.Logger, _ *Group) (tsdb.BlockPopulator, error) { + return tsdb.DefaultBlockPopulator{}, nil } // Compactor provides compaction against an underlying storage of time series data. @@ -747,11 +811,12 @@ type Compactor interface { // * The source dirs are marked Deletable. // * Returns empty ulid.ULID{}. Compact(dest string, dirs []string, open []*tsdb.Block) (ulid.ULID, error) + CompactWithBlockPopulator(dest string, dirs []string, open []*tsdb.Block, blockPopulator tsdb.BlockPopulator) (ulid.ULID, error) } // Compact plans and runs a single compaction against the group. The compacted result // is uploaded into the bucket the blocks were retrieved from. -func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp Compactor) (shouldRerun bool, compID ulid.ULID, rerr error) { +func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp Compactor, blockDeletableChecker BlockDeletableChecker, compactionLifecycleCallback CompactionLifecycleCallback) (shouldRerun bool, compID ulid.ULID, rerr error) { cg.compactionRunsStarted.Inc() subDir := filepath.Join(dir, cg.Key()) @@ -771,10 +836,13 @@ func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir") } + errChan := make(chan error, 1) err := tracing.DoInSpanWithErr(ctx, "compaction_group", func(ctx context.Context) (err error) { - shouldRerun, compID, err = cg.compact(ctx, subDir, planner, comp) + shouldRerun, compID, err = cg.compact(ctx, subDir, planner, comp, blockDeletableChecker, compactionLifecycleCallback, errChan) return err }, opentracing.Tags{"group.key": cg.Key()}) + errChan <- err + close(errChan) if err != nil { cg.compactionFailures.Inc() return false, ulid.ULID{}, err @@ -974,7 +1042,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, return nil } -func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Compactor) (shouldRerun bool, compID ulid.ULID, _ error) { +func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Compactor, blockDeletableChecker BlockDeletableChecker, compactionLifecycleCallback CompactionLifecycleCallback, errChan chan error) (shouldRerun bool, compID ulid.ULID, _ error) { cg.mtx.Lock() defer cg.mtx.Unlock() @@ -992,7 +1060,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp var toCompact []*metadata.Meta if err := tracing.DoInSpanWithErr(ctx, "compaction_planning", func(ctx context.Context) (e error) { - toCompact, e = planner.Plan(ctx, cg.metasByMinTime) + toCompact, e = planner.Plan(ctx, cg.metasByMinTime, errChan, cg.extensions) return e }); err != nil { return false, ulid.ULID{}, errors.Wrap(err, "plan compaction") @@ -1002,35 +1070,35 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp return false, ulid.ULID{}, nil } - level.Info(cg.logger).Log("msg", "compaction available and planned; downloading blocks", "plan", fmt.Sprintf("%v", toCompact)) - - // Due to #183 we verify that none of the blocks in the plan have overlapping sources. - // This is one potential source of how we could end up with duplicated chunks. - uniqueSources := map[ulid.ULID]struct{}{} + level.Info(cg.logger).Log("msg", "compaction available and planned", "plan", fmt.Sprintf("%v", toCompact)) // Once we have a plan we need to download the actual data. groupCompactionBegin := time.Now() begin := groupCompactionBegin + + if err := compactionLifecycleCallback.PreCompactionCallback(ctx, cg.logger, cg, toCompact); err != nil { + return false, ulid.ULID{}, errors.Wrapf(err, "failed to run pre compaction callback for plan: %s", fmt.Sprintf("%v", toCompact)) + } + level.Info(cg.logger).Log("msg", "finished running pre compaction callback; downloading blocks", "plan", fmt.Sprintf("%v", toCompact), "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) + + begin = time.Now() g, errCtx := errgroup.WithContext(ctx) g.SetLimit(cg.compactBlocksFetchConcurrency) toCompactDirs := make([]string, 0, len(toCompact)) for _, m := range toCompact { bdir := filepath.Join(dir, m.ULID.String()) - for _, s := range m.Compaction.Sources { - if _, ok := uniqueSources[s]; ok { - return false, ulid.ULID{}, halt(errors.Errorf("overlapping sources detected for plan %v", toCompact)) - } - uniqueSources[s] = struct{}{} - } func(ctx context.Context, meta *metadata.Meta) { g.Go(func() error { + start := time.Now() if err := tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error { return block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir, objstore.WithFetchConcurrency(cg.blockFilesConcurrency)) }, opentracing.Tags{"block.id": meta.ULID}); err != nil { return retry(errors.Wrapf(err, "download block %s", meta.ULID)) } + level.Debug(cg.logger).Log("msg", "downloaded block", "block", meta.ULID.String(), "duration", time.Since(start), "duration_ms", time.Since(start).Milliseconds()) + start = time.Now() // Ensure all input blocks are valid. var stats block.HealthStats if err := tracing.DoInSpanWithErr(ctx, "compaction_block_health_stats", func(ctx context.Context) (e error) { @@ -1056,6 +1124,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp return errors.Wrapf(err, "block id %s, try running with --debug.accept-malformed-index", meta.ULID) } + level.Debug(cg.logger).Log("msg", "verified block", "block", meta.ULID.String(), "duration", time.Since(start), "duration_ms", time.Since(start).Milliseconds()) return nil }) }(errCtx, m) @@ -1072,7 +1141,11 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp begin = time.Now() if err := tracing.DoInSpanWithErr(ctx, "compaction", func(ctx context.Context) (e error) { - compID, e = comp.Compact(dir, toCompactDirs, nil) + populateBlockFunc, e := compactionLifecycleCallback.GetBlockPopulator(ctx, cg.logger, cg) + if e != nil { + return e + } + compID, e = comp.CompactWithBlockPopulator(dir, toCompactDirs, nil, populateBlockFunc) return e }); err != nil { return false, ulid.ULID{}, halt(errors.Wrapf(err, "compact blocks %v", toCompactDirs)) @@ -1082,7 +1155,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp level.Info(cg.logger).Log("msg", "compacted block would have no samples, deleting source blocks", "blocks", sourceBlockStr) for _, meta := range toCompact { if meta.Stats.NumSamples == 0 { - if err := cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())); err != nil { + if err := cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String()), blockDeletableChecker); err != nil { level.Warn(cg.logger).Log("msg", "failed to mark for deletion an empty block found during compaction", "block", meta.ULID) } } @@ -1105,6 +1178,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Downsample: metadata.ThanosDownsample{Resolution: cg.resolution}, Source: metadata.CompactorSource, SegmentFiles: block.GetSegmentFiles(bdir), + Extensions: cg.extensions, }, nil) if err != nil { return false, ulid.ULID{}, errors.Wrapf(err, "failed to finalize the block %s", bdir) @@ -1145,7 +1219,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp // Eventually the block we just uploaded should get synced into the group again (including sync-delay). for _, meta := range toCompact { err = tracing.DoInSpanWithErr(ctx, "compaction_block_delete", func(ctx context.Context) error { - return cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())) + return cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String()), blockDeletableChecker) }, opentracing.Tags{"block.id": meta.ULID}) if err != nil { return false, ulid.ULID{}, retry(errors.Wrapf(err, "mark old block for deletion from bucket")) @@ -1153,22 +1227,30 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp cg.groupGarbageCollectedBlocks.Inc() } + level.Info(cg.logger).Log("msg", "running post compaction callback", "result_block", compID) + if err := compactionLifecycleCallback.PostCompactionCallback(ctx, cg.logger, cg, compID); err != nil { + return false, ulid.ULID{}, retry(errors.Wrapf(err, "failed to run post compaction callback for result block %s", compID)) + } + level.Info(cg.logger).Log("msg", "finished running post compaction callback", "result_block", compID) + level.Info(cg.logger).Log("msg", "finished compacting blocks", "result_block", compID, "source_blocks", sourceBlockStr, "duration", time.Since(groupCompactionBegin), "duration_ms", time.Since(groupCompactionBegin).Milliseconds()) return true, compID, nil } -func (cg *Group) deleteBlock(id ulid.ULID, bdir string) error { +func (cg *Group) deleteBlock(id ulid.ULID, bdir string, blockDeletableChecker BlockDeletableChecker) error { if err := os.RemoveAll(bdir); err != nil { return errors.Wrapf(err, "remove old block dir %s", id) } - // Spawn a new context so we always mark a block for deletion in full on shutdown. - delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) - defer cancel() - level.Info(cg.logger).Log("msg", "marking compacted block for deletion", "old_block", id) - if err := block.MarkForDeletion(delCtx, cg.logger, cg.bkt, id, "source of compacted block", cg.blocksMarkedForDeletion); err != nil { - return errors.Wrapf(err, "mark block %s for deletion from bucket", id) + if blockDeletableChecker.CanDelete(cg, id) { + // Spawn a new context so we always mark a block for deletion in full on shutdown. + delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + level.Info(cg.logger).Log("msg", "marking compacted block for deletion", "old_block", id) + if err := block.MarkForDeletion(delCtx, cg.logger, cg.bkt, id, "source of compacted block", cg.blocksMarkedForDeletion); err != nil { + return errors.Wrapf(err, "mark block %s for deletion from bucket", id) + } } return nil } @@ -1180,6 +1262,8 @@ type BucketCompactor struct { grouper Grouper comp Compactor planner Planner + blockDeletableChecker BlockDeletableChecker + compactionLifecycleCallback CompactionLifecycleCallback compactDir string bkt objstore.Bucket concurrency int @@ -1197,6 +1281,37 @@ func NewBucketCompactor( bkt objstore.Bucket, concurrency int, skipBlocksWithOutOfOrderChunks bool, +) (*BucketCompactor, error) { + if concurrency <= 0 { + return nil, errors.Errorf("invalid concurrency level (%d), concurrency level must be > 0", concurrency) + } + return NewBucketCompactorWithCheckerAndCallback( + logger, + sy, + grouper, + planner, + comp, + DefaultBlockDeletableChecker{}, + DefaultCompactionLifecycleCallback{}, + compactDir, + bkt, + concurrency, + skipBlocksWithOutOfOrderChunks, + ) +} + +func NewBucketCompactorWithCheckerAndCallback( + logger log.Logger, + sy *Syncer, + grouper Grouper, + planner Planner, + comp Compactor, + blockDeletableChecker BlockDeletableChecker, + compactionLifecycleCallback CompactionLifecycleCallback, + compactDir string, + bkt objstore.Bucket, + concurrency int, + skipBlocksWithOutOfOrderChunks bool, ) (*BucketCompactor, error) { if concurrency <= 0 { return nil, errors.Errorf("invalid concurrency level (%d), concurrency level must be > 0", concurrency) @@ -1207,6 +1322,8 @@ func NewBucketCompactor( grouper: grouper, planner: planner, comp: comp, + blockDeletableChecker: blockDeletableChecker, + compactionLifecycleCallback: compactionLifecycleCallback, compactDir: compactDir, bkt: bkt, concurrency: concurrency, @@ -1247,7 +1364,7 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { go func() { defer wg.Done() for g := range groupChan { - shouldRerunGroup, _, err := g.Compact(workCtx, c.compactDir, c.planner, c.comp) + shouldRerunGroup, _, err := g.Compact(workCtx, c.compactDir, c.planner, c.comp, c.blockDeletableChecker, c.compactionLifecycleCallback) if err == nil { if shouldRerunGroup { mtx.Lock() diff --git a/vendor/github.com/thanos-io/thanos/pkg/compact/planner.go b/vendor/github.com/thanos-io/thanos/pkg/compact/planner.go index 5c2a93df8d..808679df11 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/compact/planner.go +++ b/vendor/github.com/thanos-io/thanos/pkg/compact/planner.go @@ -49,7 +49,7 @@ func NewPlanner(logger log.Logger, ranges []int64, noCompBlocks *GatherNoCompact } // TODO(bwplotka): Consider smarter algorithm, this prefers smaller iterative compactions vs big single one: https://github.com/thanos-io/thanos/issues/3405 -func (p *tsdbBasedPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { +func (p *tsdbBasedPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta, _ chan error, _ any) ([]*metadata.Meta, error) { return p.plan(p.noCompBlocksFunc(), metasByMinTime) } @@ -93,6 +93,10 @@ func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompac return nil, nil } +func (p *tsdbBasedPlanner) PlanWithPartition(ctx context.Context, metasByMinTime []*metadata.Meta, partitionID int, errChan chan error) ([]*metadata.Meta, error) { + return nil, errors.New("not support with partitioning") +} + // selectMetas returns the dir metas that should be compacted into a single new block. // If only a single block range is configured, the result is always nil. // Copied and adjusted from https://github.com/prometheus/prometheus/blob/3d8826a3d42566684283a9b7f7e812e412c24407/tsdb/compact.go#L229. @@ -243,7 +247,7 @@ func WithLargeTotalIndexSizeFilter(with *tsdbBasedPlanner, bkt objstore.Bucket, return &largeTotalIndexSizeFilter{tsdbBasedPlanner: with, bkt: bkt, totalMaxIndexSizeBytes: totalMaxIndexSizeBytes, markedForNoCompact: markedForNoCompact} } -func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { +func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta, _ chan error, _ any) ([]*metadata.Meta, error) { noCompactMarked := t.noCompBlocksFunc() copiedNoCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, len(noCompactMarked)) for k, v := range noCompactMarked { @@ -303,3 +307,7 @@ PlanLoop: return plan, nil } } + +func (t *largeTotalIndexSizeFilter) PlanWithPartition(ctx context.Context, metasByMinTime []*metadata.Meta, partitionID int, errChan chan error) ([]*metadata.Meta, error) { + return nil, errors.New("not support with partitioning") +}