Skip to content

Commit

Permalink
compact: Added index size limiting planner detecting output index siz…
Browse files Browse the repository at this point in the history
…e over 64GB. (#3410)

* compact: Added index size limiting planner detecting output index size over 64GB.

Fixes: #1424

Signed-off-by: Bartlomiej Plotka <[email protected]>

* Addressed comments; added changelog.

Signed-off-by: Bartlomiej Plotka <[email protected]>

* Skipped flaky test.

Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka authored Nov 9, 2020
1 parent 5ea9812 commit ce72bfe
Show file tree
Hide file tree
Showing 11 changed files with 522 additions and 40 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,20 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#2979](https://github.com/thanos-io/thanos/pull/2979) Replicator: Add the ability to replicate blocks within a time frame by passing --min-time and --max-time
- [#3398](https://github.com/thanos-io/thanos/pull/3398) Query Frontend: Add default config for query frontend memcached config.
- [#3277](https://github.com/thanos-io/thanos/pull/3277) Thanos Query: Introduce dynamic lookback interval. This allows queries with large step to make use of downsampled data.
- [#3409](https://github.com/thanos-io/thanos/pull/3409) Compactor: Added support for no-compact-mark.json which excludes the block from compaction.

### Fixed

- [#3257](https://github.com/thanos-io/thanos/pull/3257) Ruler: Prevent Ruler from crashing when using default DNS to lookup hosts that results in "No such hosts" errors.
- [#3331](https://github.com/thanos-io/thanos/pull/3331) Disable Azure blob exception logging
- [#3341](https://github.com/thanos-io/thanos/pull/3341) Disable Azure blob syslog exception logging
- [#3414](https://github.com/thanos-io/thanos/pull/3414) Set CORS for Query Frontend

### Changed

- [#3410](https://github.com/thanos-io/thanos/pull/3410) Compactor: Changed metric `thanos_compactor_blocks_marked_for_deletion_total` to `thanos_compactor_blocks_marked_total` with `marker` label.
Compactor will now automatically disable compaction for blocks with large index that would output blocks after compaction larger than specified value (by default: 64GB). This automatically
handles the Promethus [format limit](https://github.com/thanos-io/thanos/issues/1424).
- [#2906](https://github.com/thanos-io/thanos/pull/2906) Tools: Refactor Bucket replicate execution. Removed all `thanos_replicate_origin_.*` metrics.
- `thanos_replicate_origin_meta_loads_total` can be replaced by `blocks_meta_synced{state="loaded"}`.
- `thanos_replicate_origin_partial_meta_reads_total` can be replaced by `blocks_meta_synced{state="failed"}`.
Expand Down
50 changes: 42 additions & 8 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"sync"
"time"

"github.com/alecthomas/units"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
Expand Down Expand Up @@ -124,10 +125,13 @@ func runCompact(
Name: "thanos_compactor_block_cleanup_failures_total",
Help: "Failures encountered while deleting blocks in compactor.",
})
blocksMarkedForDeletion := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compactor_blocks_marked_for_deletion_total",
Help: "Total number of blocks marked for deletion in compactor.",
})
blocksMarked := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_compactor_blocks_marked_total",
Help: "Total number of blocks marked in compactor.",
}, []string{"marker"})
blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename)
blocksMarked.WithLabelValues(metadata.DeletionMarkFilename)

garbageCollectedBlocks := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_garbage_collected_blocks_total",
Help: "Total number of blocks marked for deletion by compactor.",
Expand Down Expand Up @@ -244,7 +248,7 @@ func runCompact(
cf,
duplicateBlocksFilter,
ignoreDeletionMarkFilter,
blocksMarkedForDeletion,
blocksMarked.WithLabelValues(metadata.DeletionMarkFilename),
garbageCollectedBlocks,
conf.blockSyncConcurrency)
if err != nil {
Expand Down Expand Up @@ -280,9 +284,31 @@ func runCompact(
return errors.Wrap(err, "clean working downsample directory")
}

grouper := compact.NewDefaultGrouper(logger, bkt, conf.acceptMalformedIndex, enableVerticalCompaction, reg, blocksMarkedForDeletion, garbageCollectedBlocks)
grouper := compact.NewDefaultGrouper(
logger,
bkt,
conf.acceptMalformedIndex,
enableVerticalCompaction,
reg,
blocksMarked.WithLabelValues(metadata.DeletionMarkFilename),
garbageCollectedBlocks,
)
blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, blocksCleaned, blockCleanupFailures)
compactor, err := compact.NewBucketCompactor(logger, sy, grouper, compact.NewPlanner(logger, levels, noCompactMarkerFilter), comp, compactDir, bkt, conf.compactionConcurrency)
compactor, err := compact.NewBucketCompactor(
logger,
sy,
grouper,
compact.WithLargeTotalIndexSizeFilter(
compact.NewPlanner(logger, levels, noCompactMarkerFilter),
bkt,
int64(conf.maxBlockIndexSize),
blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename),
),
comp,
compactDir,
bkt,
conf.compactionConcurrency,
)
if err != nil {
cancel()
return errors.Wrap(err, "create bucket compactor")
Expand Down Expand Up @@ -373,7 +399,7 @@ func runCompact(
return errors.Wrap(err, "sync before first pass of downsampling")
}

if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, sy.Metas(), retentionByResolution, blocksMarkedForDeletion); err != nil {
if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, sy.Metas(), retentionByResolution, blocksMarked.WithLabelValues(metadata.DeletionMarkFilename)); err != nil {
return errors.Wrap(err, "retention failed")
}

Expand Down Expand Up @@ -512,6 +538,7 @@ type compactConfig struct {
selectorRelabelConf extflag.PathOrContent
webConf webConfig
label string
maxBlockIndexSize units.Base2Bytes
}

func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -574,6 +601,13 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
"This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication.").
Hidden().StringsVar(&cc.dedupReplicaLabels)

// TODO(bwplotka): This is short term fix for https://github.com/thanos-io/thanos/issues/1424, replace with vertical block sharding https://github.com/thanos-io/thanos/pull/3390.
cmd.Flag("compact.block-max-index-size", "Maximum index size for the resulted block during any compaction. Note that"+
"total size is approximated in worst case. If the block that would be resulted from compaction is estimated to exceed this number, biggest source"+
"block is marked for no compaction (no-compact-mark.json is uploaded) which causes this block to be excluded from any compaction. "+
"Default is due to https://github.com/thanos-io/thanos/issues/1424, but it's overall recommended to keeps block size to some reasonable size.").
Hidden().Default("64GB").BytesVar(&cc.maxBlockIndexSize)

cc.selectorRelabelConf = *extkingpin.RegisterSelectorRelabelFlags(cmd)

cc.webConf.registerFlag(cmd)
Expand Down
32 changes: 32 additions & 0 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,35 @@ func gatherFileStats(blockDir string) (res []metadata.File, _ error) {
// TODO(bwplotka): Add optional files like tombstones?
return res, err
}

// MarkForNoCompact creates a file which marks block to be not compacted.
func MarkForNoCompact(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, reason metadata.NoCompactReason, noCompactDetails string, markedForNoCompact prometheus.Counter) error {
m := path.Join(id.String(), metadata.NoCompactMarkFilename)
noCompactMarkExists, err := bkt.Exists(ctx, m)
if err != nil {
return errors.Wrapf(err, "check exists %s in bucket", m)
}
if noCompactMarkExists {
level.Warn(logger).Log("msg", "requested to mark for no compaction, but file already exists; this should not happen; investigate", "err", errors.Errorf("file %s already exists in bucket", m))
return nil
}

noCompactMark, err := json.Marshal(metadata.NoCompactMark{
ID: id,
Version: metadata.NoCompactMarkVersion1,

Time: time.Now().Unix(),
Reason: reason,
Details: noCompactDetails,
})
if err != nil {
return errors.Wrap(err, "json encode no compact mark")
}

if err := bkt.Upload(ctx, m, bytes.NewBuffer(noCompactMark)); err != nil {
return errors.Wrapf(err, "upload file %s to bucket", m)
}
markedForNoCompact.Inc()
level.Info(logger).Log("msg", "block has been marked for no compaction", "block", id)
return nil
}
56 changes: 56 additions & 0 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,59 @@ func TestMarkForDeletion(t *testing.T) {
})
}
}

func TestMarkForNoCompact(t *testing.T) {
defer testutil.TolerantVerifyLeak(t)
ctx := context.Background()

tmpDir, err := ioutil.TempDir("", "test-block-mark-for-no-compact")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()

for _, tcase := range []struct {
name string
preUpload func(t testing.TB, id ulid.ULID, bkt objstore.Bucket)

blocksMarked int
}{
{
name: "block marked",
preUpload: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {},
blocksMarked: 1,
},
{
name: "block with no-compact mark already, expected log and no metric increment",
preUpload: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {
m, err := json.Marshal(metadata.NoCompactMark{
ID: id,
Time: time.Now().Unix(),
Version: metadata.NoCompactMarkVersion1,
})
testutil.Ok(t, err)
testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), metadata.NoCompactMarkFilename), bytes.NewReader(m)))
},
blocksMarked: 0,
},
} {
t.Run(tcase.name, func(t *testing.T) {
bkt := objstore.NewInMemBucket()
id, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
{{Name: "a", Value: "3"}},
{{Name: "a", Value: "4"}},
{{Name: "b", Value: "1"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124)
testutil.Ok(t, err)

tcase.preUpload(t, id, bkt)

testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, id.String())))

c := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
err = MarkForNoCompact(ctx, log.NewNopLogger(), bkt, id, metadata.ManualNoCompactReason, "", c)
testutil.Ok(t, err)
testutil.Equals(t, float64(tcase.blocksMarked), promtest.ToFloat64(c))
})
}
}
2 changes: 2 additions & 0 deletions pkg/block/metadata/markers.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type NoCompactMark struct {
// Version of the file.
Version int `json:"version"`

// Time is a unix timestamp of when the block was marked for no compact.
Time int64 `json:"time"`
Reason NoCompactReason `json:"reason"`
// Details is a human readable string giving details of reason.
Details string `json:"details"`
Expand Down
91 changes: 90 additions & 1 deletion pkg/compact/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,17 @@ package compact

import (
"context"
"fmt"
"math"
"path/filepath"

"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"
)

type tsdbBasedPlanner struct {
Expand Down Expand Up @@ -42,7 +49,10 @@ func NewPlanner(logger log.Logger, ranges []int64, noCompBlocks *GatherNoCompact

// TODO(bwplotka): Consider smarter algorithm, this prefers smaller iterative compactions vs big single one: https://github.com/thanos-io/thanos/issues/3405
func (p *tsdbBasedPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) {
noCompactMarked := p.noCompBlocksFunc()
return p.plan(p.noCompBlocksFunc(), metasByMinTime)
}

func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompactMark, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) {
notExcludedMetasByMinTime := make([]*metadata.Meta, 0, len(metasByMinTime))
for _, meta := range metasByMinTime {
if _, excluded := noCompactMarked[meta.ULID]; excluded {
Expand Down Expand Up @@ -212,3 +222,82 @@ func splitByRange(metasByMinTime []*metadata.Meta, tr int64) [][]*metadata.Meta

return splitDirs
}

type largeTotalIndexSizeFilter struct {
*tsdbBasedPlanner

bkt objstore.Bucket
markedForNoCompact prometheus.Counter
totalMaxIndexSizeBytes int64
}

var _ Planner = &largeTotalIndexSizeFilter{}

// WithLargeTotalIndexSizeFilter wraps Planner with largeTotalIndexSizeFilter that checks the given plans and estimates total index size.
// When found, it marks block for no compaction by placing no-compact.json and updating cache.
// NOTE: The estimation is very rough as it assumes extreme cases of indexes sharing no bytes, thus summing all source index sizes.
// Adjust limit accordingly reducing to some % of actual limit you want to give.
// TODO(bwplotka): This is short term fix for https://github.com/thanos-io/thanos/issues/1424, replace with vertical block sharding https://github.com/thanos-io/thanos/pull/3390.
func WithLargeTotalIndexSizeFilter(with *tsdbBasedPlanner, bkt objstore.Bucket, totalMaxIndexSizeBytes int64, markedForNoCompact prometheus.Counter) *largeTotalIndexSizeFilter {
return &largeTotalIndexSizeFilter{tsdbBasedPlanner: with, bkt: bkt, totalMaxIndexSizeBytes: totalMaxIndexSizeBytes, markedForNoCompact: markedForNoCompact}
}

func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) {
noCompactMarked := t.noCompBlocksFunc()
copiedNoCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, len(noCompactMarked))
for k, v := range noCompactMarked {
copiedNoCompactMarked[k] = v
}

PlanLoop:
for {
plan, err := t.plan(copiedNoCompactMarked, metasByMinTime)
if err != nil {
return nil, err
}
var totalIndexBytes, maxIndexSize int64 = 0, math.MinInt64
var biggestIndex int
for i, p := range plan {
indexSize := int64(-1)
for _, f := range p.Thanos.Files {
if f.RelPath == block.IndexFilename {
indexSize = f.SizeBytes
}
}
if indexSize <= 0 {
// Get size from bkt instead.
attr, err := t.bkt.Attributes(ctx, filepath.Join(p.ULID.String(), block.IndexFilename))
if err != nil {
return nil, errors.Wrapf(err, "get attr of %v", filepath.Join(p.ULID.String(), block.IndexFilename))
}
indexSize = attr.Size
}

if maxIndexSize < indexSize {
maxIndexSize = indexSize
biggestIndex = i
}
totalIndexBytes += indexSize
if totalIndexBytes >= t.totalMaxIndexSizeBytes {
// Marking blocks for no compact to limit size.
// TODO(bwplotka): Make sure to reset cache once this is done: https://github.com/thanos-io/thanos/issues/3408
if err := block.MarkForNoCompact(
ctx,
t.logger,
t.bkt,
plan[biggestIndex].ULID,
metadata.IndexSizeExceedingNoCompactReason,
fmt.Sprintf("largeTotalIndexSizeFilter: Total compacted block's index size could exceed: %v with this block. See https://github.com/thanos-io/thanos/issues/1424", t.totalMaxIndexSizeBytes),
t.markedForNoCompact,
); err != nil {
return nil, errors.Wrapf(err, "mark %v for no compaction", plan[biggestIndex].ULID.String())
}
// Make sure wrapped planner exclude this block.
copiedNoCompactMarked[plan[biggestIndex].ULID] = &metadata.NoCompactMark{ID: plan[biggestIndex].ULID, Version: metadata.NoCompactMarkVersion1}
continue PlanLoop
}
}
// Planned blocks should not exceed limit.
return plan, nil
}
}
Loading

0 comments on commit ce72bfe

Please sign in to comment.