Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partitioning compaction for Cortex #5316

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions pkg/compactor/background_chunks_series_set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package compactor

import (
"context"

"github.com/prometheus/prometheus/storage"
)

type backgrounChunkSeriesSet struct {
nextSet chan storage.ChunkSeries
actual storage.ChunkSeries
cs storage.ChunkSeriesSet
}

func (b *backgrounChunkSeriesSet) Next() bool {
s, ok := <-b.nextSet
b.actual = s
return ok
}

func (b *backgrounChunkSeriesSet) At() storage.ChunkSeries {
return b.actual
}

func (b *backgrounChunkSeriesSet) Err() error {
return b.cs.Err()
}

func (b *backgrounChunkSeriesSet) Warnings() storage.Warnings {
return b.cs.Warnings()
}

func (b *backgrounChunkSeriesSet) run(ctx context.Context) {
for {
if !b.cs.Next() {
close(b.nextSet)
return
}

select {
case b.nextSet <- b.cs.At():
case <-ctx.Done():
return
}
}
}

func NewBackgroundChunkSeriesSet(ctx context.Context, cs storage.ChunkSeriesSet) storage.ChunkSeriesSet {
r := &backgrounChunkSeriesSet{
cs: cs,
nextSet: make(chan storage.ChunkSeries, 1000),
}

go func() {
r.run(ctx)
}()

return r
}
150 changes: 126 additions & 24 deletions pkg/compactor/block_visit_marker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,56 @@ import (
)

const (
// BlockVisitMarkerFile is the known json filename for representing the most recent compactor visit.
BlockVisitMarkerFile = "visit-mark.json"
// BlockVisitMarkerFileSuffix is the known suffix of json filename for representing the most recent compactor visit.
BlockVisitMarkerFileSuffix = "-visit-mark.json"
// BlockVisitMarkerFilePrefix is the known prefix of json filename for representing the most recent compactor visit.
BlockVisitMarkerFilePrefix = "partition-"
// VisitMarkerVersion1 is the current supported version of visit-mark file.
VisitMarkerVersion1 = 1
)

var (
ErrorBlockVisitMarkerNotFound = errors.New("block visit marker not found")
ErrorUnmarshalBlockVisitMarker = errors.New("unmarshal block visit marker JSON")
ErrorNotBlockVisitMarker = errors.New("file is not block visit marker")
)

type VisitStatus string

const (
Pending VisitStatus = "pending"
Completed VisitStatus = "completed"
)

type BlockVisitMarker struct {
CompactorID string `json:"compactorID"`
CompactorID string `json:"compactorID"`
Status VisitStatus `json:"status"`
PartitionedGroupID uint32 `json:"partitionedGroupID"`
PartitionID int `json:"partitionID"`
// VisitTime is a unix timestamp of when the block was visited (mark updated).
VisitTime int64 `json:"visitTime"`
// Version of the file.
Version int `json:"version"`
}

func (b *BlockVisitMarker) isVisited(blockVisitMarkerTimeout time.Duration) bool {
return time.Now().Before(time.Unix(b.VisitTime, 0).Add(blockVisitMarkerTimeout))
func (b *BlockVisitMarker) isVisited(blockVisitMarkerTimeout time.Duration, partitionID int) bool {
return b.isCompleted() || partitionID == b.PartitionID && time.Now().Before(time.Unix(b.VisitTime, 0).Add(blockVisitMarkerTimeout))
}

func (b *BlockVisitMarker) isVisitedByCompactor(blockVisitMarkerTimeout time.Duration, partitionID int, compactorID string) bool {
return b.CompactorID == compactorID && b.isVisited(blockVisitMarkerTimeout, partitionID)
}

func (b *BlockVisitMarker) isCompleted() bool {
return b.Status == Completed
}

func (b *BlockVisitMarker) isVisitedByCompactor(blockVisitMarkerTimeout time.Duration, compactorID string) bool {
return b.CompactorID == compactorID && time.Now().Before(time.Unix(b.VisitTime, 0).Add(blockVisitMarkerTimeout))
func GetBlockVisitMarkerFile(blockID string, partitionID int) string {
return path.Join(blockID, fmt.Sprintf("%s%d%s", BlockVisitMarkerFilePrefix, partitionID, BlockVisitMarkerFileSuffix))
}

func ReadBlockVisitMarker(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger, blockID string, blockVisitMarkerReadFailed prometheus.Counter) (*BlockVisitMarker, error) {
visitMarkerFile := path.Join(blockID, BlockVisitMarkerFile)
func ReadBlockVisitMarker(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger, blockID string, partitionID int, blockVisitMarkerReadFailed prometheus.Counter) (*BlockVisitMarker, error) {
visitMarkerFile := GetBlockVisitMarkerFile(blockID, partitionID)
visitMarkerFileReader, err := bkt.ReaderWithExpectedErrs(bkt.IsObjNotFoundErr).Get(ctx, visitMarkerFile)
if err != nil {
if bkt.IsObjNotFoundErr(err) {
Expand All @@ -75,15 +96,23 @@ func ReadBlockVisitMarker(ctx context.Context, bkt objstore.InstrumentedBucketRe
return &blockVisitMarker, nil
}

func UpdateBlockVisitMarker(ctx context.Context, bkt objstore.Bucket, blockID string, reader io.Reader, blockVisitMarkerWriteFailed prometheus.Counter) error {
blockVisitMarkerFilePath := path.Join(blockID, BlockVisitMarkerFile)
func UpdateBlockVisitMarker(ctx context.Context, bkt objstore.Bucket, blockID string, partitionID int, reader io.Reader, blockVisitMarkerWriteFailed prometheus.Counter) error {
blockVisitMarkerFilePath := GetBlockVisitMarkerFile(blockID, partitionID)
if err := bkt.Upload(ctx, blockVisitMarkerFilePath, reader); err != nil {
blockVisitMarkerWriteFailed.Inc()
return err
}
return nil
}

func generateBlocksInfo(blocks []*metadata.Meta) string {
var blockIds []string
for _, block := range blocks {
blockIds = append(blockIds, block.ULID.String())
}
return strings.Join(blockIds, ",")
}

func markBlocksVisited(
ctx context.Context,
bkt objstore.Bucket,
Expand All @@ -99,39 +128,112 @@ func markBlocksVisited(
}
reader := bytes.NewReader(visitMarkerFileContent)
for _, block := range blocks {
select {
// Exit early if possible.
case <-ctx.Done():
return
default:
}

blockID := block.ULID.String()
if err := UpdateBlockVisitMarker(ctx, bkt, blockID, reader, blockVisitMarkerWriteFailed); err != nil {
level.Error(logger).Log("msg", "unable to upsert visit marker file content for block", "blockID", blockID, "err", err)
if err := UpdateBlockVisitMarker(ctx, bkt, blockID, marker.PartitionID, reader, blockVisitMarkerWriteFailed); err != nil {
level.Error(logger).Log("msg", "unable to upsert visit marker file content for block", "partition_id", marker.PartitionID, "block_id", blockID, "err", err)
}
reader.Reset(visitMarkerFileContent)
}
level.Debug(logger).Log("msg", "marked block visited", "partition_id", marker.PartitionID, "blocks", generateBlocksInfo(blocks))
}

func markBlocksVisitedHeartBeat(ctx context.Context, bkt objstore.Bucket, logger log.Logger, blocks []*metadata.Meta, compactorID string, blockVisitMarkerFileUpdateInterval time.Duration, blockVisitMarkerWriteFailed prometheus.Counter) {
var blockIds []string
for _, block := range blocks {
blockIds = append(blockIds, block.ULID.String())
}
blocksInfo := strings.Join(blockIds, ",")
level.Info(logger).Log("msg", fmt.Sprintf("start heart beat for blocks: %s", blocksInfo))
func markBlocksVisitedHeartBeat(
ctx context.Context,
bkt objstore.Bucket,
logger log.Logger,
blocks []*metadata.Meta,
partitionedGroupID uint32,
partitionID int,
compactorID string,
blockVisitMarkerFileUpdateInterval time.Duration,
blockVisitMarkerWriteFailed prometheus.Counter,
errChan chan error,
) {
blocksInfo := generateBlocksInfo(blocks)
level.Info(logger).Log("msg", "start visit marker heart beat", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "blocks", blocksInfo)
ticker := time.NewTicker(blockVisitMarkerFileUpdateInterval)
defer ticker.Stop()
isComplete := false
heartBeat:
for {
level.Debug(logger).Log("msg", fmt.Sprintf("heart beat for blocks: %s", blocksInfo))
blockVisitMarker := BlockVisitMarker{
VisitTime: time.Now().Unix(),
CompactorID: compactorID,
Version: VisitMarkerVersion1,
VisitTime: time.Now().Unix(),
CompactorID: compactorID,
Status: Pending,
PartitionedGroupID: partitionedGroupID,
PartitionID: partitionID,
Version: VisitMarkerVersion1,
}
markBlocksVisited(ctx, bkt, logger, blocks, blockVisitMarker, blockVisitMarkerWriteFailed)

select {
case <-ctx.Done():
level.Warn(logger).Log("msg", "visit marker heart beat got cancelled", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "blocks", blocksInfo)
break heartBeat
case <-ticker.C:
continue
case err := <-errChan:
isComplete = err == nil
if err != nil {
level.Warn(logger).Log("msg", "stop visit marker heart beat due to error", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "blocks", blocksInfo, "err", err)
}
break heartBeat
}
}
if isComplete {
level.Info(logger).Log("msg", "update visit marker to completed status", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "blocks", blocksInfo)
markBlocksVisitMarkerCompleted(context.Background(), bkt, logger, blocks, partitionedGroupID, partitionID, compactorID, blockVisitMarkerWriteFailed)
}
level.Info(logger).Log("msg", "stop visit marker heart beat", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "blocks", blocksInfo)
}

func markBlocksVisitMarkerCompleted(
ctx context.Context,
bkt objstore.Bucket,
logger log.Logger,
blocks []*metadata.Meta,
partitionedGroupID uint32,
partitionID int,
compactorID string,
blockVisitMarkerWriteFailed prometheus.Counter,
) {
blockVisitMarker := BlockVisitMarker{
VisitTime: time.Now().Unix(),
CompactorID: compactorID,
Status: Completed,
PartitionedGroupID: partitionedGroupID,
PartitionID: partitionID,
Version: VisitMarkerVersion1,
}
visitMarkerFileContent, err := json.Marshal(blockVisitMarker)
if err != nil {
blockVisitMarkerWriteFailed.Inc()
return
}
reader := bytes.NewReader(visitMarkerFileContent)
for _, block := range blocks {
blockID := block.ULID.String()
if err := UpdateBlockVisitMarker(ctx, bkt, blockID, blockVisitMarker.PartitionID, reader, blockVisitMarkerWriteFailed); err != nil {
level.Error(logger).Log("msg", "unable to upsert completed visit marker file content for block", "partitioned_group_id", blockVisitMarker.PartitionedGroupID, "partition_id", blockVisitMarker.PartitionID, "block_id", blockID, "err", err)
} else {
level.Info(logger).Log("msg", "block partition is completed", "partitioned_group_id", blockVisitMarker.PartitionedGroupID, "partition_id", blockVisitMarker.PartitionID, "block_id", blockID)
}
reader.Reset(visitMarkerFileContent)
}
level.Info(logger).Log("msg", fmt.Sprintf("stop heart beat for blocks: %s", blocksInfo))
}

func IsBlockVisitMarker(path string) bool {
return strings.HasSuffix(path, BlockVisitMarkerFileSuffix)
}

func IsNotBlockVisitMarkerError(err error) bool {
return errors.Is(err, ErrorNotBlockVisitMarker)
}
70 changes: 69 additions & 1 deletion pkg/compactor/block_visit_marker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package compactor

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -85,10 +86,77 @@ func TestMarkBlocksVisited(t *testing.T) {
logger := log.NewNopLogger()
markBlocksVisited(ctx, bkt, logger, tcase.blocks, tcase.visitMarker, dummyCounter)
for _, meta := range tcase.blocks {
res, err := ReadBlockVisitMarker(ctx, objstore.WithNoopInstr(bkt), logger, meta.ULID.String(), dummyCounter)
res, err := ReadBlockVisitMarker(ctx, objstore.WithNoopInstr(bkt), logger, meta.ULID.String(), tcase.visitMarker.PartitionID, dummyCounter)
require.NoError(t, err)
require.Equal(t, tcase.visitMarker, *res)
}
})
}
}

func TestMarkBlockVisitedHeartBeat(t *testing.T) {
partitionedGroupID := uint32(12345)
partitionID := 0
compactorID := "test-compactor"
for _, tcase := range []struct {
name string
isCancelled bool
compactionErr error
expectedStatus VisitStatus
}{
{
name: "heart beat got cancelled",
isCancelled: true,
compactionErr: nil,
expectedStatus: Pending,
},
{
name: "heart beat complete without error",
isCancelled: false,
compactionErr: nil,
expectedStatus: Completed,
},
{
name: "heart beat stopped due to compaction error",
isCancelled: false,
compactionErr: fmt.Errorf("some compaction failure"),
expectedStatus: Pending,
},
} {
t.Run(tcase.name, func(t *testing.T) {
ulid0 := ulid.MustNew(uint64(time.Now().UnixMilli()+0), nil)
ulid1 := ulid.MustNew(uint64(time.Now().UnixMilli()+1), nil)
blocks := []*metadata.Meta{
{
BlockMeta: tsdb.BlockMeta{
ULID: ulid0,
},
},
{
BlockMeta: tsdb.BlockMeta{
ULID: ulid1,
},
},
}
ctx, cancel := context.WithCancel(context.Background())
dummyCounter := prometheus.NewCounter(prometheus.CounterOpts{})
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
logger := log.NewNopLogger()
errChan := make(chan error, 1)
go markBlocksVisitedHeartBeat(ctx, objstore.WithNoopInstr(bkt), logger, blocks, partitionedGroupID, partitionID, compactorID, time.Second, dummyCounter, errChan)
time.Sleep(2 * time.Second)
if tcase.isCancelled {
cancel()
} else {
errChan <- tcase.compactionErr
defer cancel()
}
time.Sleep(2 * time.Second)
for _, meta := range blocks {
res, err := ReadBlockVisitMarker(context.Background(), objstore.WithNoopInstr(bkt), logger, meta.ULID.String(), partitionID, dummyCounter)
require.NoError(t, err)
require.Equal(t, tcase.expectedStatus, res.Status)
}
})
}
}
Loading