diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index ba09d20b46..6be126c1b9 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -220,6 +220,7 @@ func runCompact( component, ) api := blocksAPI.NewBlocksAPI(logger, conf.label, flagsMap) + noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(logger, bkt) var sy *compact.Syncer { // Make sure all compactor meta syncs are done through Syncer.SyncMeta for readability. @@ -229,6 +230,7 @@ func runCompact( block.NewConsistencyDelayMetaFilter(logger, conf.consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)), ignoreDeletionMarkFilter, duplicateBlocksFilter, + noCompactMarkerFilter, }, []block.MetadataModifier{block.NewReplicaLabelRemover(logger, conf.dedupReplicaLabels)}, ) cf.UpdateOnChange(func(blocks []metadata.Meta, err error) { @@ -280,7 +282,7 @@ func runCompact( grouper := compact.NewDefaultGrouper(logger, bkt, conf.acceptMalformedIndex, enableVerticalCompaction, reg, blocksMarkedForDeletion, garbageCollectedBlocks) blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, blocksCleaned, blockCleanupFailures) - compactor, err := compact.NewBucketCompactor(logger, sy, grouper, compact.NewTSDBBasedPlanner(logger, levels), comp, compactDir, bkt, conf.compactionConcurrency) + compactor, err := compact.NewBucketCompactor(logger, sy, grouper, compact.NewPlanner(logger, levels, noCompactMarkerFilter), comp, compactDir, bkt, conf.compactionConcurrency) if err != nil { cancel() return errors.Wrap(err, "create bucket compactor") diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 45cd47b626..129b827f8e 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -71,6 +71,9 @@ const ( // but don't have a replacement block yet. markedForDeletionMeta = "marked-for-deletion" + // MarkedForNoCompactionMeta is label for blocks which are loaded but also marked for no compaction. This label is also counted in `loaded` label metric. + MarkedForNoCompactionMeta = "marked-for-no-compact" + // Modified label values. replicaRemovedMeta = "replica-label-removed" ) @@ -111,6 +114,7 @@ func newFetcherMetrics(reg prometheus.Registerer) *fetcherMetrics { []string{timeExcludedMeta}, []string{duplicateMeta}, []string{markedForDeletionMeta}, + []string{MarkedForNoCompactionMeta}, ) m.modified = extprom.NewTxGaugeVec( reg, @@ -782,19 +786,20 @@ func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.UL f.deletionMarkMap = make(map[ulid.ULID]*metadata.DeletionMark) for id := range metas { - deletionMark, err := metadata.ReadDeletionMark(ctx, f.bkt, f.logger, id.String()) - if err == metadata.ErrorDeletionMarkNotFound { - continue - } - if errors.Cause(err) == metadata.ErrorUnmarshalDeletionMark { - level.Warn(f.logger).Log("msg", "found partial deletion-mark.json; if we will see it happening often for the same block, consider manually deleting deletion-mark.json from the object storage", "block", id, "err", err) - continue - } - if err != nil { + m := &metadata.DeletionMark{} + if err := metadata.ReadMarker(ctx, f.logger, f.bkt, id.String(), m); err != nil { + if errors.Cause(err) == metadata.ErrorMarkerNotFound { + continue + } + if errors.Cause(err) == metadata.ErrorUnmarshalMarker { + level.Warn(f.logger).Log("msg", "found partial deletion-mark.json; if we will see it happening often for the same block, consider manually deleting deletion-mark.json from the object storage", "block", id, "err", err) + continue + } return err } - f.deletionMarkMap[id] = deletionMark - if time.Since(time.Unix(deletionMark.DeletionTime, 0)).Seconds() > f.delay.Seconds() { + + f.deletionMarkMap[id] = m + if time.Since(time.Unix(m.DeletionTime, 0)).Seconds() > f.delay.Seconds() { synced.WithLabelValues(markedForDeletionMeta).Inc() delete(metas, id) } diff --git a/pkg/block/metadata/deletionmark.go b/pkg/block/metadata/deletionmark.go deleted file mode 100644 index 5f2a9f04ad..0000000000 --- a/pkg/block/metadata/deletionmark.go +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package metadata - -import ( - "context" - "encoding/json" - "io/ioutil" - "path" - - "github.com/go-kit/kit/log" - "github.com/oklog/ulid" - "github.com/pkg/errors" - "github.com/thanos-io/thanos/pkg/objstore" - "github.com/thanos-io/thanos/pkg/runutil" -) - -const ( - // DeletionMarkFilename is the known json filename to store details about when block is marked for deletion. - DeletionMarkFilename = "deletion-mark.json" - - // DeletionMarkVersion1 is the version of deletion-mark file supported by Thanos. - DeletionMarkVersion1 = 1 -) - -// ErrorDeletionMarkNotFound is the error when deletion-mark.json file is not found. -var ErrorDeletionMarkNotFound = errors.New("deletion-mark.json not found") - -// ErrorUnmarshalDeletionMark is the error when unmarshalling deletion-mark.json file. -// This error can occur because deletion-mark.json has been partially uploaded to block storage -// or the deletion-mark.json file is not a valid json file. -var ErrorUnmarshalDeletionMark = errors.New("unmarshal deletion-mark.json") - -// DeletionMark stores block id and when block was marked for deletion. -type DeletionMark struct { - // ID of the tsdb block. - ID ulid.ULID `json:"id"` - - // DeletionTime is a unix timestamp of when the block was marked to be deleted. - DeletionTime int64 `json:"deletion_time"` - - // Version of the file. - Version int `json:"version"` -} - -// ReadDeletionMark reads the given deletion mark file from /deletion-mark.json in bucket. -func ReadDeletionMark(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger, dir string) (*DeletionMark, error) { - deletionMarkFile := path.Join(dir, DeletionMarkFilename) - - r, err := bkt.ReaderWithExpectedErrs(bkt.IsObjNotFoundErr).Get(ctx, deletionMarkFile) - if err != nil { - if bkt.IsObjNotFoundErr(err) { - return nil, ErrorDeletionMarkNotFound - } - return nil, errors.Wrapf(err, "get file: %s", deletionMarkFile) - } - - defer runutil.CloseWithLogOnErr(logger, r, "close bkt deletion-mark reader") - - metaContent, err := ioutil.ReadAll(r) - if err != nil { - return nil, errors.Wrapf(err, "read file: %s", deletionMarkFile) - } - - deletionMark := DeletionMark{} - if err := json.Unmarshal(metaContent, &deletionMark); err != nil { - return nil, errors.Wrapf(ErrorUnmarshalDeletionMark, "file: %s; err: %v", deletionMarkFile, err.Error()) - } - - if deletionMark.Version != DeletionMarkVersion1 { - return nil, errors.Errorf("unexpected deletion-mark file version %d", deletionMark.Version) - } - - return &deletionMark, nil -} diff --git a/pkg/block/metadata/deletionmark_test.go b/pkg/block/metadata/deletionmark_test.go deleted file mode 100644 index 220c9dc171..0000000000 --- a/pkg/block/metadata/deletionmark_test.go +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package metadata - -import ( - "bytes" - "context" - "encoding/json" - "io/ioutil" - "os" - "path" - "testing" - "time" - - "github.com/oklog/ulid" - "github.com/pkg/errors" - "go.uber.org/goleak" - - "github.com/thanos-io/thanos/pkg/objstore" - "github.com/thanos-io/thanos/pkg/testutil" -) - -func TestMain(m *testing.M) { - goleak.VerifyTestMain(m) -} - -func TestReadDeletionMark(t *testing.T) { - ctx := context.Background() - - tmpDir, err := ioutil.TempDir("", "test-read-deletion-mark") - testutil.Ok(t, err) - defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() - - bkt := objstore.WithNoopInstr(objstore.NewInMemBucket()) - { - blockWithoutDeletionMark := ulid.MustNew(uint64(1), nil) - _, err := ReadDeletionMark(ctx, bkt, nil, path.Join(tmpDir, blockWithoutDeletionMark.String())) - - testutil.NotOk(t, err) - testutil.Equals(t, ErrorDeletionMarkNotFound, err) - } - { - blockWithPartialDeletionMark := ulid.MustNew(uint64(2), nil) - - testutil.Ok(t, bkt.Upload(ctx, path.Join(tmpDir, blockWithPartialDeletionMark.String(), DeletionMarkFilename), bytes.NewBufferString("not a valid deletion-mark.json"))) - _, err = ReadDeletionMark(ctx, bkt, nil, path.Join(tmpDir, blockWithPartialDeletionMark.String())) - - testutil.NotOk(t, err) - testutil.Equals(t, ErrorUnmarshalDeletionMark, errors.Cause(err)) - } - { - blockWithDifferentVersionDeletionMark := ulid.MustNew(uint64(3), nil) - var buf bytes.Buffer - testutil.Ok(t, json.NewEncoder(&buf).Encode(&DeletionMark{ - ID: blockWithDifferentVersionDeletionMark, - DeletionTime: time.Now().Unix(), - Version: 2, - })) - - testutil.Ok(t, bkt.Upload(ctx, path.Join(tmpDir, blockWithDifferentVersionDeletionMark.String(), DeletionMarkFilename), &buf)) - _, err = ReadDeletionMark(ctx, bkt, nil, path.Join(tmpDir, blockWithDifferentVersionDeletionMark.String())) - - testutil.NotOk(t, err) - testutil.Equals(t, "unexpected deletion-mark file version 2", err.Error()) - } - { - blockWithValidDeletionMark := ulid.MustNew(uint64(3), nil) - var buf bytes.Buffer - testutil.Ok(t, json.NewEncoder(&buf).Encode(&DeletionMark{ - ID: blockWithValidDeletionMark, - DeletionTime: time.Now().Unix(), - Version: 1, - })) - - testutil.Ok(t, bkt.Upload(ctx, path.Join(tmpDir, blockWithValidDeletionMark.String(), DeletionMarkFilename), &buf)) - _, err = ReadDeletionMark(ctx, bkt, nil, path.Join(tmpDir, blockWithValidDeletionMark.String())) - - testutil.Ok(t, err) - } -} diff --git a/pkg/block/metadata/markers.go b/pkg/block/metadata/markers.go new file mode 100644 index 0000000000..075311192a --- /dev/null +++ b/pkg/block/metadata/markers.go @@ -0,0 +1,115 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package metadata + +import ( + "context" + "encoding/json" + "io/ioutil" + "path" + + "github.com/go-kit/kit/log" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/runutil" +) + +const ( + // DeletionMarkFilename is the known json filename for optional file storing details about when block is marked for deletion. + // If such file is present in block dir, it means the block is meant to be deleted after certain delay. + DeletionMarkFilename = "deletion-mark.json" + // NoCompactMarkFilename is the known json filename for optional file storing details about why block has to be excluded from compaction. + // If such file is present in block dir, it means the block has to excluded from compaction (both vertical and horizontal) or rewrite (e.g deletions). + NoCompactMarkFilename = "no-compact-mark.json" + + // DeletionMarkVersion1 is the version of deletion-mark file supported by Thanos. + DeletionMarkVersion1 = 1 + // NoCompactMarkVersion1 is the version of no-compact-mark file supported by Thanos. + NoCompactMarkVersion1 = 1 +) + +var ( + // ErrorMarkerNotFound is the error when marker file is not found. + ErrorMarkerNotFound = errors.New("marker not found") + // ErrorUnmarshalMarker is the error when unmarshalling marker JSON file. + // This error can occur because marker has been partially uploaded to block storage + // or the marker file is not a valid json file. + ErrorUnmarshalMarker = errors.New("unmarshal marker JSON") +) + +type Marker interface { + markerFilename() string +} + +// DeletionMark stores block id and when block was marked for deletion. +type DeletionMark struct { + // ID of the tsdb block. + ID ulid.ULID `json:"id"` + // Version of the file. + Version int `json:"version"` + + // DeletionTime is a unix timestamp of when the block was marked to be deleted. + DeletionTime int64 `json:"deletion_time"` +} + +func (m *DeletionMark) markerFilename() string { return DeletionMarkFilename } + +// NoCompactReason is a reason for a block to be excluded from compaction. +type NoCompactReason string + +const ( + // ManualNoCompactReason is a custom reason of excluding from compaction that should be added when no-compact mark is added for unknown/user specified reason. + ManualNoCompactReason NoCompactReason = "manual" + // IndexSizeExceedingNoCompactReason is a reason of index being too big (for example exceeding 64GB limit: https://github.com/thanos-io/thanos/issues/1424) + // This reason can be ignored when vertical block sharding will be implemented. + IndexSizeExceedingNoCompactReason = "index-size-exceeding" +) + +// NoCompactMark marker stores reason of block being excluded from compaction if needed. +type NoCompactMark struct { + // ID of the tsdb block. + ID ulid.ULID `json:"id"` + // Version of the file. + Version int `json:"version"` + + Reason NoCompactReason `json:"reason"` + // Details is a human readable string giving details of reason. + Details string `json:"details"` +} + +func (n *NoCompactMark) markerFilename() string { return NoCompactMarkFilename } + +// ReadMarker reads the given mark file from /.json in bucket. +func ReadMarker(ctx context.Context, logger log.Logger, bkt objstore.InstrumentedBucketReader, dir string, marker Marker) error { + markerFile := path.Join(dir, marker.markerFilename()) + r, err := bkt.ReaderWithExpectedErrs(bkt.IsObjNotFoundErr).Get(ctx, markerFile) + if err != nil { + if bkt.IsObjNotFoundErr(err) { + return ErrorMarkerNotFound + } + return errors.Wrapf(err, "get file: %s", markerFile) + } + defer runutil.CloseWithLogOnErr(logger, r, "close bkt marker reader") + + metaContent, err := ioutil.ReadAll(r) + if err != nil { + return errors.Wrapf(err, "read file: %s", markerFile) + } + + if err := json.Unmarshal(metaContent, marker); err != nil { + return errors.Wrapf(ErrorUnmarshalMarker, "file: %s; err: %v", markerFile, err.Error()) + } + switch marker.markerFilename() { + case NoCompactMarkFilename: + if version := marker.(*NoCompactMark).Version; version != NoCompactMarkVersion1 { + return errors.Errorf("unexpected no-compact-mark file version %d, expected %d", version, NoCompactMarkVersion1) + } + case DeletionMarkFilename: + if version := marker.(*DeletionMark).Version; version != DeletionMarkVersion1 { + return errors.Errorf("unexpected deletion-mark file version %d, expected %d", version, DeletionMarkVersion1) + } + } + return nil +} diff --git a/pkg/block/metadata/markers_test.go b/pkg/block/metadata/markers_test.go new file mode 100644 index 0000000000..8d21fc576f --- /dev/null +++ b/pkg/block/metadata/markers_test.go @@ -0,0 +1,115 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package metadata + +import ( + "bytes" + "context" + "encoding/json" + "io/ioutil" + "os" + "path" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "go.uber.org/goleak" + + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} + +func TestReadMarker(t *testing.T) { + ctx := context.Background() + + tmpDir, err := ioutil.TempDir("", "test-read-mark") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + bkt := objstore.WithNoopInstr(objstore.NewInMemBucket()) + t.Run(DeletionMarkFilename, func(t *testing.T) { + blockWithoutMark := ulid.MustNew(uint64(1), nil) + d := DeletionMark{} + err := ReadMarker(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, blockWithoutMark.String()), &d) + testutil.NotOk(t, err) + testutil.Equals(t, ErrorMarkerNotFound, err) + + blockWithPartialMark := ulid.MustNew(uint64(2), nil) + testutil.Ok(t, bkt.Upload(ctx, path.Join(tmpDir, blockWithPartialMark.String(), DeletionMarkFilename), bytes.NewBufferString("not a valid deletion-mark.json"))) + err = ReadMarker(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, blockWithPartialMark.String()), &d) + testutil.NotOk(t, err) + testutil.Equals(t, ErrorUnmarshalMarker, errors.Cause(err)) + + blockWithDifferentVersionMark := ulid.MustNew(uint64(3), nil) + var buf bytes.Buffer + testutil.Ok(t, json.NewEncoder(&buf).Encode(&DeletionMark{ + ID: blockWithDifferentVersionMark, + DeletionTime: time.Now().Unix(), + Version: 2, + })) + testutil.Ok(t, bkt.Upload(ctx, path.Join(tmpDir, blockWithDifferentVersionMark.String(), DeletionMarkFilename), &buf)) + err = ReadMarker(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, blockWithDifferentVersionMark.String()), &d) + testutil.NotOk(t, err) + testutil.Equals(t, "unexpected deletion-mark file version 2, expected 1", err.Error()) + + blockWithValidMark := ulid.MustNew(uint64(3), nil) + buf.Reset() + expected := &DeletionMark{ + ID: blockWithValidMark, + DeletionTime: time.Now().Unix(), + Version: 1, + } + testutil.Ok(t, json.NewEncoder(&buf).Encode(expected)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(tmpDir, blockWithValidMark.String(), DeletionMarkFilename), &buf)) + err = ReadMarker(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, blockWithValidMark.String()), &d) + testutil.Ok(t, err) + testutil.Equals(t, *expected, d) + }) + t.Run(NoCompactMarkFilename, func(t *testing.T) { + blockWithoutMark := ulid.MustNew(uint64(1), nil) + n := NoCompactMark{} + err := ReadMarker(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, blockWithoutMark.String()), &n) + testutil.NotOk(t, err) + testutil.Equals(t, ErrorMarkerNotFound, err) + + blockWithPartialMark := ulid.MustNew(uint64(2), nil) + testutil.Ok(t, bkt.Upload(ctx, path.Join(tmpDir, blockWithPartialMark.String(), NoCompactMarkFilename), bytes.NewBufferString("not a valid no-compact-mark.json"))) + err = ReadMarker(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, blockWithPartialMark.String()), &n) + testutil.NotOk(t, err) + testutil.Equals(t, ErrorUnmarshalMarker, errors.Cause(err)) + + blockWithDifferentVersionMark := ulid.MustNew(uint64(3), nil) + var buf bytes.Buffer + testutil.Ok(t, json.NewEncoder(&buf).Encode(&NoCompactMark{ + ID: blockWithDifferentVersionMark, + Version: 2, + Reason: IndexSizeExceedingNoCompactReason, + Details: "yolo", + })) + testutil.Ok(t, bkt.Upload(ctx, path.Join(tmpDir, blockWithDifferentVersionMark.String(), NoCompactMarkFilename), &buf)) + err = ReadMarker(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, blockWithDifferentVersionMark.String()), &n) + testutil.NotOk(t, err) + testutil.Equals(t, "unexpected no-compact-mark file version 2, expected 1", err.Error()) + + blockWithValidMark := ulid.MustNew(uint64(3), nil) + buf.Reset() + expected := &NoCompactMark{ + ID: blockWithValidMark, + Version: 1, + Reason: IndexSizeExceedingNoCompactReason, + Details: "yolo", + } + testutil.Ok(t, json.NewEncoder(&buf).Encode(expected)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(tmpDir, blockWithValidMark.String(), NoCompactMarkFilename), &buf)) + err = ReadMarker(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, blockWithValidMark.String()), &n) + testutil.Ok(t, err) + testutil.Equals(t, *expected, n) + }) +} diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index b3966c6196..267cc236e4 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -22,6 +22,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" @@ -984,3 +985,50 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { level.Info(c.logger).Log("msg", "compaction iterations done") return nil } + +var _ block.MetadataFilter = &GatherNoCompactionMarkFilter{} + +// GatherNoCompactionMarkFilter is a block.Fetcher filter that passes all metas. While doing it, it gathers all no-compact-mark.json markers. +// Not go routine safe. +// TODO(bwplotka): Add unit test. +type GatherNoCompactionMarkFilter struct { + logger log.Logger + bkt objstore.InstrumentedBucketReader + noCompactMarkedMap map[ulid.ULID]*metadata.NoCompactMark +} + +// NewGatherNoCompactionMarkFilter creates GatherNoCompactionMarkFilter. +func NewGatherNoCompactionMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader) *GatherNoCompactionMarkFilter { + return &GatherNoCompactionMarkFilter{ + logger: logger, + bkt: bkt, + } +} + +// DeletionMarkBlocks returns block ids that were marked for deletion. +func (f *GatherNoCompactionMarkFilter) NoCompactMarkedBlocks() map[ulid.ULID]*metadata.NoCompactMark { + return f.noCompactMarkedMap +} + +// Filter passes all metas, while gathering no compact markers. +func (f *GatherNoCompactionMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error { + f.noCompactMarkedMap = make(map[ulid.ULID]*metadata.NoCompactMark) + + for id := range metas { + m := &metadata.NoCompactMark{} + // TODO(bwplotka): Hook up bucket cache here + reset API so we don't introduce API calls . + if err := metadata.ReadMarker(ctx, f.logger, f.bkt, id.String(), m); err != nil { + if errors.Cause(err) == metadata.ErrorMarkerNotFound { + continue + } + if errors.Cause(err) == metadata.ErrorUnmarshalMarker { + level.Warn(f.logger).Log("msg", "found partial no-compact-mark.json; if we will see it happening often for the same block, consider manually deleting no-compact-mark.json from the object storage", "block", id, "err", err) + continue + } + return err + } + synced.WithLabelValues(block.MarkedForNoCompactionMeta).Inc() + f.noCompactMarkedMap[id] = m + } + return nil +} diff --git a/pkg/compact/planner.go b/pkg/compact/planner.go index 53fb628bbc..12c541c200 100644 --- a/pkg/compact/planner.go +++ b/pkg/compact/planner.go @@ -7,6 +7,7 @@ import ( "context" "github.com/go-kit/kit/log" + "github.com/oklog/ulid" "github.com/thanos-io/thanos/pkg/block/metadata" ) @@ -14,40 +15,61 @@ type tsdbBasedPlanner struct { logger log.Logger ranges []int64 + + noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark } var _ Planner = &tsdbBasedPlanner{} -// NewTSDBBasedPlanner is planner with the same functionality as Prometheus' TSDB plus special handling of excluded blocks. +// NewTSDBBasedPlanner is planner with the same functionality as Prometheus' TSDB. // TODO(bwplotka): Consider upstreaming this to Prometheus. -// It's the same functionality just without accessing filesystem, and special handling of excluded blocks. +// It's the same functionality just without accessing filesystem. func NewTSDBBasedPlanner(logger log.Logger, ranges []int64) *tsdbBasedPlanner { - return &tsdbBasedPlanner{logger: logger, ranges: ranges} + return &tsdbBasedPlanner{logger: logger, ranges: ranges, noCompBlocksFunc: func() map[ulid.ULID]*metadata.NoCompactMark { return make(map[ulid.ULID]*metadata.NoCompactMark) }} +} + +// NewPlanner is a default Thanos planner with the same functionality as Prometheus' TSDB plus special handling of excluded blocks. +// It's the same functionality just without accessing filesystem, and special handling of excluded blocks. +func NewPlanner(logger log.Logger, ranges []int64, noCompBlocks *GatherNoCompactionMarkFilter) *tsdbBasedPlanner { + return &tsdbBasedPlanner{logger: logger, ranges: ranges, noCompBlocksFunc: noCompBlocks.NoCompactMarkedBlocks} } +// TODO(bwplotka): Consider smarter algorithm, this prefers smaller iterative compactions vs big single one: https://github.com/thanos-io/thanos/issues/3405 func (p *tsdbBasedPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { - res := selectOverlappingMetas(metasByMinTime) + noCompactMarked := p.noCompBlocksFunc() + notExcludedMetasByMinTime := make([]*metadata.Meta, 0, len(metasByMinTime)) + for _, meta := range metasByMinTime { + if _, excluded := noCompactMarked[meta.ULID]; excluded { + continue + } + notExcludedMetasByMinTime = append(notExcludedMetasByMinTime, meta) + } + + res := selectOverlappingMetas(notExcludedMetasByMinTime) if len(res) > 0 { return res, nil } - // No overlapping blocks, do compaction the usual way. - // We do not include a recently created block with max(minTime), so the block which was just created from WAL. - // This gives users a window of a full block size to piece-wise backup new data without having to care about data overlap. + + // We do not include a recently created block with max(minTime), so the block which was just uploaded to bucket. + // This gives users a window of a full block size maintenance if needed. + if _, excluded := noCompactMarked[metasByMinTime[len(metasByMinTime)-1].ULID]; !excluded { + notExcludedMetasByMinTime = notExcludedMetasByMinTime[:len(notExcludedMetasByMinTime)-1] + } metasByMinTime = metasByMinTime[:len(metasByMinTime)-1] - res = append(res, selectMetas(p.ranges, metasByMinTime)...) + res = append(res, selectMetas(p.ranges, noCompactMarked, metasByMinTime)...) if len(res) > 0 { return res, nil } // Compact any blocks with big enough time range that have >5% tombstones. - for i := len(metasByMinTime) - 1; i >= 0; i-- { - meta := metasByMinTime[i] + for i := len(notExcludedMetasByMinTime) - 1; i >= 0; i-- { + meta := notExcludedMetasByMinTime[i] if meta.MaxTime-meta.MinTime < p.ranges[len(p.ranges)/2] { break } if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 { - return []*metadata.Meta{metasByMinTime[i]}, nil + return []*metadata.Meta{notExcludedMetasByMinTime[i]}, nil } } @@ -56,11 +78,10 @@ func (p *tsdbBasedPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Me // selectMetas returns the dir metas that should be compacted into a single new block. // If only a single block range is configured, the result is always nil. -func selectMetas(ranges []int64, metasByMinTime []*metadata.Meta) []*metadata.Meta { +func selectMetas(ranges []int64, noCompactMarked map[ulid.ULID]*metadata.NoCompactMark, metasByMinTime []*metadata.Meta) []*metadata.Meta { if len(ranges) < 2 || len(metasByMinTime) < 1 { return nil } - highTime := metasByMinTime[len(metasByMinTime)-1].MinTime for _, iv := range ranges[1:] { @@ -68,7 +89,6 @@ func selectMetas(ranges []int64, metasByMinTime []*metadata.Meta) []*metadata.Me if len(parts) == 0 { continue } - Outer: for _, p := range parts { // Do not select the range if it has a block whose compaction failed. @@ -78,14 +98,35 @@ func selectMetas(ranges []int64, metasByMinTime []*metadata.Meta) []*metadata.Me } } + if len(p) < 2 { + continue + } + mint := p[0].MinTime maxt := p[len(p)-1].MaxTime - // Pick the range of blocks if it spans the full range (potentially with gaps) - // or is before the most recent block. - // This ensures we don't compact blocks prematurely when another one of the same - // size still fits in the range. - if (maxt-mint == iv || maxt <= highTime) && len(p) > 1 { - return p + + // Pick the range of blocks if it spans the full range (potentially with gaps) or is before the most recent block. + // This ensures we don't compact blocks prematurely when another one of the same size still would fits in the range + // after upload. + if maxt-mint != iv && maxt > highTime { + continue + } + + // Check if any of resulted blocks are excluded. Exclude them in a way that does not introduce gaps to the system + // as well as preserve the ranges that would be used if they were not excluded. + // This is meant as short-term workaround to create ability for marking some blocks to not be touched for compaction. + lastExcluded := 0 + for i, id := range p { + if _, excluded := noCompactMarked[id.ULID]; !excluded { + continue + } + if len(p[lastExcluded:i]) > 1 { + return p[lastExcluded:i] + } + lastExcluded = i + 1 + } + if len(p[lastExcluded:]) > 1 { + return p[lastExcluded:] } } } @@ -138,6 +179,7 @@ func splitByRange(metasByMinTime []*metadata.Meta, tr int64) [][]*metadata.Meta } else { t0 = tr * ((m.MinTime - tr + 1) / tr) } + // Skip blocks that don't fall into the range. This can happen via mis-alignment or // by being the multiple of the intended range. if m.MaxTime > t0+tr { @@ -145,7 +187,7 @@ func splitByRange(metasByMinTime []*metadata.Meta, tr int64) [][]*metadata.Meta continue } - // Add all dirs to the current group that are within [t0, t0+tr]. + // Add all metas to the current group that are within [t0, t0+tr]. for ; i < len(metasByMinTime); i++ { // Either the block falls into the next range or doesn't fit at all (checked above). if metasByMinTime[i].MaxTime > t0+tr { diff --git a/pkg/compact/planner_test.go b/pkg/compact/planner_test.go index b8748b9029..be8d072909 100644 --- a/pkg/compact/planner_test.go +++ b/pkg/compact/planner_test.go @@ -73,7 +73,7 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { expected []*metadata.Meta }{ { - name: "Outside Range", + name: "Outside range", metas: []*metadata.Meta{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, }, @@ -108,6 +108,21 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, }, }, + { + name: "There are blocks to fill the entire 2nd parent range.", + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 0, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 60, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 120, MaxTime: 180}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(9, nil), MinTime: 180, MaxTime: 200}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(10, nil), MinTime: 200, MaxTime: 220}}, + }, + expected: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 0, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 60, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 120, MaxTime: 180}}, + }, + }, { name: `Block for the next parent range appeared with gap with size 20. Nothing will happen in the first one anymore but we ignore fresh one still, so no compaction`, @@ -146,7 +161,21 @@ func TestPlanners_Plan_Compatibility(t *testing.T) { {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, }, }, - {name: "We have 20, 60, 20, 60, 240 range blocks. We can compact 20 + 60 + 60", + { + name: "There are blocks to fill the entire 2nd parent range, but there is a gap", + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 0, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 120, MaxTime: 180}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(9, nil), MinTime: 180, MaxTime: 200}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(10, nil), MinTime: 200, MaxTime: 220}}, + }, + expected: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 0, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 120, MaxTime: 180}}, + }, + }, + { + name: "We have 20, 60, 20, 60, 240 range blocks. We can compact 20 + 60 + 60", metas: []*metadata.Meta{ {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 120}}, @@ -408,3 +437,195 @@ func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { }) } } + +func TestTSDBBasedPlanners_PlanWithNoCompactMarkers(t *testing.T) { + ranges := []int64{ + 20, + 60, + 180, + 540, + 1620, + } + + g := &GatherNoCompactionMarkFilter{} + tsdbBasedPlanner := NewPlanner(log.NewNopLogger(), ranges, g) + + for _, c := range []struct { + name string + metas []*metadata.Meta + noCompactMarks map[ulid.ULID]*metadata.NoCompactMark + + expected []*metadata.Meta + }{ + { + name: "Outside range and excluded", + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + }, + noCompactMarks: map[ulid.ULID]*metadata.NoCompactMark{ + ulid.MustNew(1, nil): {}, + }, + }, + { + name: "Blocks to fill the entire parent, but with first one excluded.", + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, + }, + noCompactMarks: map[ulid.ULID]*metadata.NoCompactMark{ + ulid.MustNew(1, nil): {}, + }, + expected: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, + }, + }, + { + name: "Blocks to fill the entire parent, but with second one excluded.", + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, + }, + noCompactMarks: map[ulid.ULID]*metadata.NoCompactMark{ + ulid.MustNew(2, nil): {}, + }, + }, + { + name: "Blocks to fill the entire parent, but with last one excluded.", + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, + }, + noCompactMarks: map[ulid.ULID]*metadata.NoCompactMark{ + ulid.MustNew(4, nil): {}, + }, + expected: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, + }, + }, + { + name: "Blocks to fill the entire parent, but with last one fist excluded.", + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, + }, + noCompactMarks: map[ulid.ULID]*metadata.NoCompactMark{ + ulid.MustNew(1, nil): {}, + ulid.MustNew(4, nil): {}, + }, + expected: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, + }, + }, + { + name: "Blocks to fill the entire parent, but with all of them excluded.", + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, + }, + noCompactMarks: map[ulid.ULID]*metadata.NoCompactMark{ + ulid.MustNew(1, nil): {}, + ulid.MustNew(2, nil): {}, + ulid.MustNew(3, nil): {}, + ulid.MustNew(4, nil): {}, + }, + }, + { + name: `Block for the next parent range appeared, and we have a gap with size 20 between second and third block. + Second block is excluded.`, + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 80, MaxTime: 100}}, + }, + noCompactMarks: map[ulid.ULID]*metadata.NoCompactMark{ + ulid.MustNew(2, nil): {}, + }, + }, + { + name: "We have 20, 60, 20, 60, 240 range blocks. We could compact 20 + 60 + 60, but sixth 6th is excluded", + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 960, MaxTime: 980}}, // Fresh one. + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 120, MaxTime: 180}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 720, MaxTime: 960}}, + }, + noCompactMarks: map[ulid.ULID]*metadata.NoCompactMark{ + ulid.MustNew(6, nil): {}, + }, + expected: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 120}}, + }, + }, + { + name: "We have 20, 60, 20, 60, 240 range blocks. We could compact 20 + 60 + 60, but 4th is excluded", + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 960, MaxTime: 980}}, // Fresh one. + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 120, MaxTime: 180}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 720, MaxTime: 960}}, + }, + noCompactMarks: map[ulid.ULID]*metadata.NoCompactMark{ + ulid.MustNew(4, nil): {}, + }, + }, + { + name: "Do not select large blocks that have many tombstones when fresh appears but are excluded", + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 540, Stats: tsdb.BlockStats{ + NumSeries: 10, + NumTombstones: 3, + }}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 540, MaxTime: 560}}, + }, + noCompactMarks: map[ulid.ULID]*metadata.NoCompactMark{ + ulid.MustNew(1, nil): {}, + }, + }, + // |--------------| + // |----------------| + // |--------------| + { + name: "Overlapping blocks 1, but one is excluded", + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 19, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, + }, + noCompactMarks: map[ulid.ULID]*metadata.NoCompactMark{ + ulid.MustNew(1, nil): {}, + }, + }, + } { + t.Run(c.name, func(t *testing.T) { + metasByMinTime := make([]*metadata.Meta, len(c.metas)) + for i := range metasByMinTime { + metasByMinTime[i] = c.metas[i] + } + sort.Slice(metasByMinTime, func(i, j int) bool { + return metasByMinTime[i].MinTime < metasByMinTime[j].MinTime + }) + g.noCompactMarkedMap = c.noCompactMarks + plan, err := tsdbBasedPlanner.Plan(context.Background(), metasByMinTime) + testutil.Ok(t, err) + testutil.Equals(t, c.expected, plan) + }) + } +}