Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partitioning compaction for Cortex #5465

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ea5dc28
Partitioning compactor with upstreamed Thanos and Prometheus changes
alexqyle Jul 18, 2023
b2eebc1
Updated CHANGELOG
alexqyle Jul 18, 2023
0fa228b
Fixed issue that block cannot be compacted if block was compacted wit…
alexqyle Jul 25, 2023
f8f130c
fix lint
alexqyle Jul 25, 2023
6fda3f1
make unit test clear
alexqyle Jul 26, 2023
4a05910
make visit mark file suffix backward compatible
alexqyle Jul 28, 2023
2cb3d62
fix invalid cortex extension being set in meta
alexqyle Jul 31, 2023
0036a8e
fix lint
alexqyle Jul 31, 2023
044165d
Merge branch 'cortexproject:master' into partitioning-compactor-v3
alexqyle Aug 1, 2023
2a06555
Merge branch 'master' into partitioning-compactor-v3
alexqyle Aug 16, 2023
d902b8b
Make paritioning compactor configuration to be set per tenant
alexqyle Aug 16, 2023
5b892a3
fixed remaining plan metric and added block generation test util
alexqyle Aug 25, 2023
236fd6b
Merge commit 'b91a24d917f5a268b3982b0ddc6bf5686a9719a8' into partitio…
alexqyle Aug 25, 2023
dc3d3af
Merge commit '1a097a924b203c66646df5d02f3f09078ec77531' into partitio…
alexqyle Sep 8, 2023
4f7d9c0
improved performance going through all possible compaction groups
alexqyle Sep 8, 2023
c7060a4
nit fix
alexqyle Sep 8, 2023
146417b
nit fix
alexqyle Sep 8, 2023
20ff2ca
merge from master
alexqyle Sep 13, 2023
392cffc
add extra logic in compaction complete checker to avoid source blocks…
alexqyle Sep 14, 2023
4daba3e
move sample diff validation to post compaction callback
alexqyle Sep 18, 2023
ff3c705
fix lint
alexqyle Sep 18, 2023
7f0bc54
Merge commit '4e162a011a5e23f90c0c934c3d0dee6398d349fd' into partitio…
alexqyle Sep 18, 2023
45749f2
accurately getting result block
alexqyle Sep 19, 2023
da38fbe
Randomly iterating through partitions in grouper. More timing logs in…
alexqyle Sep 28, 2023
e0e2ec4
Introduce partition visit marker to avoid possible deadlock caused by…
alexqyle Sep 29, 2023
4ef90ac
clean up local dir in pre compaction callback
alexqyle Oct 11, 2023
2bf3571
Merge branch 'master' into partitioning-compactor-v3
alexqyle Oct 11, 2023
798a23a
fix compile error after prometheus update
alexqyle Oct 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* [FEATURE] Ruler: Support for filtering rules in the API. #5417
* [FEATURE] Compactor: Add `-compactor.ring.tokens-file-path` to store generated tokens locally. #5432
* [FEATURE] Query Frontend: Add `-frontend.retry-on-too-many-outstanding-requests` to re-enqueue 429 requests if there are multiple query-schedulers available. #5496
* [FEATURE] Compactor: Implemented partitioning compactor based on proposal #4843. #5465
* [FEATURE] Store Gateway: Add `-blocks-storage.bucket-store.max-inflight-requests` for store gateways to reject further requests upon reaching the limit. #5553
* [FEATURE] Store Gateway: Add `cortex_bucket_store_block_load_duration_seconds` histogram to track time to load blocks. #5580
* [FEATURE] AlertManager: Add `cortex_alertmanager_dispatcher_aggregation_groups` and `cortex_alertmanager_dispatcher_alert_processing_duration_seconds` metrics for dispatcher. #5592
Expand Down
8 changes: 8 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3057,6 +3057,14 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -compactor.tenant-shard-size
[compactor_tenant_shard_size: <int> | default = 0]

# Index size limit in bytes for each compaction partition. 0 means no limit
# CLI flag: -compactor.partition-index-size-limit-in-bytes
[compactor_partition_index_size_limit_in_bytes: <int> | default = 0]

# Time series count limit for each compaction partition. 0 means no limit
# CLI flag: -compactor.partition-series-count-limit
[compactor_partition_series_count_limit: <int> | default = 0]

# S3 server-side encryption type. Required to enable server-side encryption
# overrides for a specific tenant. If not set, the default S3 client settings
# are used.
Expand Down
60 changes: 60 additions & 0 deletions pkg/compactor/background_chunks_series_set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package compactor

import (
"context"

"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/annotations"
)

type backgrounChunkSeriesSet struct {
nextSet chan storage.ChunkSeries
actual storage.ChunkSeries
cs storage.ChunkSeriesSet
}

func (b *backgrounChunkSeriesSet) Next() bool {
s, ok := <-b.nextSet
b.actual = s
return ok
}

func (b *backgrounChunkSeriesSet) At() storage.ChunkSeries {
return b.actual
}

func (b *backgrounChunkSeriesSet) Err() error {
return b.cs.Err()
}

func (b *backgrounChunkSeriesSet) Warnings() annotations.Annotations {
return b.cs.Warnings()
}

func (b *backgrounChunkSeriesSet) run(ctx context.Context) {
for {
if !b.cs.Next() {
close(b.nextSet)
return
}

select {
case b.nextSet <- b.cs.At():
case <-ctx.Done():
return
}
}
}

func NewBackgroundChunkSeriesSet(ctx context.Context, cs storage.ChunkSeriesSet) storage.ChunkSeriesSet {
r := &backgrounChunkSeriesSet{
cs: cs,
nextSet: make(chan storage.ChunkSeries, 1000),
}

go func() {
r.run(ctx)
}()

return r
}
132 changes: 12 additions & 120 deletions pkg/compactor/block_visit_marker.go
Original file line number Diff line number Diff line change
@@ -1,151 +1,43 @@
package compactor

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"path"
"strings"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/cortexproject/cortex/pkg/util/runutil"
)

const (
// BlockVisitMarkerFile is the known json filename for representing the most recent compactor visit.
BlockVisitMarkerFile = "visit-mark.json"
// BlockVisitMarkerFileSuffix is the known suffix of json filename for representing the most recent compactor visit.
BlockVisitMarkerFileSuffix = "visit-mark.json"
// BlockVisitMarkerFilePrefix is the known prefix of json filename for representing the most recent compactor visit.
BlockVisitMarkerFilePrefix = "partition-"
// VisitMarkerVersion1 is the current supported version of visit-mark file.
VisitMarkerVersion1 = 1
)

var (
ErrorBlockVisitMarkerNotFound = errors.New("block visit marker not found")
ErrorUnmarshalBlockVisitMarker = errors.New("unmarshal block visit marker JSON")
ErrorNotBlockVisitMarker = errors.New("file is not block visit marker")
ErrorNotBlockVisitMarker = errors.New("file is not block visit marker")
)

type BlockVisitMarker struct {
CompactorID string `json:"compactorID"`
CompactorID string `json:"compactorID"`
Status VisitStatus `json:"status"`
PartitionedGroupID uint32 `json:"partitionedGroupID"`
PartitionID int `json:"partitionID"`
// VisitTime is a unix timestamp of when the block was visited (mark updated).
VisitTime int64 `json:"visitTime"`
// Version of the file.
Version int `json:"version"`
}

func (b *BlockVisitMarker) isVisited(blockVisitMarkerTimeout time.Duration) bool {
return time.Now().Before(time.Unix(b.VisitTime, 0).Add(blockVisitMarkerTimeout))
}

func (b *BlockVisitMarker) isVisitedByCompactor(blockVisitMarkerTimeout time.Duration, compactorID string) bool {
return b.CompactorID == compactorID && time.Now().Before(time.Unix(b.VisitTime, 0).Add(blockVisitMarkerTimeout))
}

func ReadBlockVisitMarker(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger, blockID string, blockVisitMarkerReadFailed prometheus.Counter) (*BlockVisitMarker, error) {
visitMarkerFile := path.Join(blockID, BlockVisitMarkerFile)
visitMarkerFileReader, err := bkt.ReaderWithExpectedErrs(bkt.IsObjNotFoundErr).Get(ctx, visitMarkerFile)
if err != nil {
if bkt.IsObjNotFoundErr(err) {
return nil, errors.Wrapf(ErrorBlockVisitMarkerNotFound, "block visit marker file: %s", visitMarkerFile)
}
blockVisitMarkerReadFailed.Inc()
return nil, errors.Wrapf(err, "get block visit marker file: %s", visitMarkerFile)
}
defer runutil.CloseWithLogOnErr(logger, visitMarkerFileReader, "close block visit marker reader")
b, err := io.ReadAll(visitMarkerFileReader)
if err != nil {
blockVisitMarkerReadFailed.Inc()
return nil, errors.Wrapf(err, "read block visit marker file: %s", visitMarkerFile)
}
blockVisitMarker := BlockVisitMarker{}
if err = json.Unmarshal(b, &blockVisitMarker); err != nil {
blockVisitMarkerReadFailed.Inc()
return nil, errors.Wrapf(ErrorUnmarshalBlockVisitMarker, "block visit marker file: %s, error: %v", visitMarkerFile, err.Error())
}
if blockVisitMarker.Version != VisitMarkerVersion1 {
return nil, errors.Errorf("unexpected block visit mark file version %d, expected %d", blockVisitMarker.Version, VisitMarkerVersion1)
}
return &blockVisitMarker, nil
}

func UpdateBlockVisitMarker(ctx context.Context, bkt objstore.Bucket, blockID string, reader io.Reader, blockVisitMarkerWriteFailed prometheus.Counter) error {
blockVisitMarkerFilePath := path.Join(blockID, BlockVisitMarkerFile)
if err := bkt.Upload(ctx, blockVisitMarkerFilePath, reader); err != nil {
blockVisitMarkerWriteFailed.Inc()
return err
}
return nil
}

func markBlocksVisited(
ctx context.Context,
bkt objstore.Bucket,
logger log.Logger,
blocks []*metadata.Meta,
marker BlockVisitMarker,
blockVisitMarkerWriteFailed prometheus.Counter,
) {
visitMarkerFileContent, err := json.Marshal(marker)
if err != nil {
blockVisitMarkerWriteFailed.Inc()
return
}
reader := bytes.NewReader(visitMarkerFileContent)
for _, block := range blocks {
select {
// Exit early if possible.
case <-ctx.Done():
return
default:
}

blockID := block.ULID.String()
if err := UpdateBlockVisitMarker(ctx, bkt, blockID, reader, blockVisitMarkerWriteFailed); err != nil {
level.Error(logger).Log("msg", "unable to upsert visit marker file content for block", "blockID", blockID, "err", err)
}
reader.Reset(visitMarkerFileContent)
}
}

func markBlocksVisitedHeartBeat(ctx context.Context, bkt objstore.Bucket, logger log.Logger, blocks []*metadata.Meta, compactorID string, blockVisitMarkerFileUpdateInterval time.Duration, blockVisitMarkerWriteFailed prometheus.Counter) {
var blockIds []string
for _, block := range blocks {
blockIds = append(blockIds, block.ULID.String())
}
blocksInfo := strings.Join(blockIds, ",")
level.Info(logger).Log("msg", fmt.Sprintf("start heart beat for blocks: %s", blocksInfo))
ticker := time.NewTicker(blockVisitMarkerFileUpdateInterval)
defer ticker.Stop()
heartBeat:
for {
level.Debug(logger).Log("msg", fmt.Sprintf("heart beat for blocks: %s", blocksInfo))
blockVisitMarker := BlockVisitMarker{
VisitTime: time.Now().Unix(),
CompactorID: compactorID,
Version: VisitMarkerVersion1,
}
markBlocksVisited(ctx, bkt, logger, blocks, blockVisitMarker, blockVisitMarkerWriteFailed)

select {
case <-ctx.Done():
break heartBeat
case <-ticker.C:
continue
}
}
level.Info(logger).Log("msg", fmt.Sprintf("stop heart beat for blocks: %s", blocksInfo))
func GetBlockVisitMarkerFile(blockID string, partitionID int) string {
return path.Join(blockID, fmt.Sprintf("%s%d-%s", BlockVisitMarkerFilePrefix, partitionID, BlockVisitMarkerFileSuffix))
}

func IsBlockVisitMarker(path string) bool {
return strings.HasSuffix(path, BlockVisitMarkerFile)
return strings.HasSuffix(path, BlockVisitMarkerFileSuffix)
}

func IsNotBlockVisitMarkerError(err error) bool {
Expand Down
94 changes: 0 additions & 94 deletions pkg/compactor/block_visit_marker_test.go

This file was deleted.

Loading
Loading