Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compact: Added index size limiting planner detecting output index size over 64GB. #3410

Merged
merged 3 commits into from
Nov 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,20 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#3381](https://github.com/thanos-io/thanos/pull/3381) Querier UI: Add ability to enable or disable metric autocomplete functionality.
- [#2979](https://github.com/thanos-io/thanos/pull/2979) Replicator: Add the ability to replicate blocks within a time frame by passing --min-time and --max-time
- [#3277](https://github.com/thanos-io/thanos/pull/3277) Thanos Query: Introduce dynamic lookback interval. This allows queries with large step to make use of downsampled data.
- [#3409](https://github.com/thanos-io/thanos/pull/3409) Compactor: Added support for no-compact-mark.json which excludes the block from compaction.

### Fixed

- [#3257](https://github.com/thanos-io/thanos/pull/3257) Ruler: Prevent Ruler from crashing when using default DNS to lookup hosts that results in "No such hosts" errors.
- [#3331](https://github.com/thanos-io/thanos/pull/3331) Disable Azure blob exception logging
- [#3341](https://github.com/thanos-io/thanos/pull/3341) Disable Azure blob syslog exception logging
- [#3414](https://github.com/thanos-io/thanos/pull/3414) Set CORS for Query Frontend

### Changed

- [#3410](https://github.com/thanos-io/thanos/pull/3410) Compactor: Changed metric `thanos_compactor_blocks_marked_for_deletion_total` to `thanos_compactor_blocks_marked_total` with `marker` label.
Compactor will now automatically disable compaction for blocks with large index that would output blocks after compaction larger than specified value (by default: 64GB). This automatically
handles the Promethus [format limit](https://github.com/thanos-io/thanos/issues/1424).
- [#2906](https://github.com/thanos-io/thanos/pull/2906) Tools: Refactor Bucket replicate execution. Removed all `thanos_replicate_origin_.*` metrics.
- `thanos_replicate_origin_meta_loads_total` can be replaced by `blocks_meta_synced{state="loaded"}`.
- `thanos_replicate_origin_partial_meta_reads_total` can be replaced by `blocks_meta_synced{state="failed"}`.
Expand Down
50 changes: 42 additions & 8 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"sync"
"time"

"github.com/alecthomas/units"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
Expand Down Expand Up @@ -124,10 +125,13 @@ func runCompact(
Name: "thanos_compactor_block_cleanup_failures_total",
Help: "Failures encountered while deleting blocks in compactor.",
})
blocksMarkedForDeletion := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compactor_blocks_marked_for_deletion_total",
Help: "Total number of blocks marked for deletion in compactor.",
})
blocksMarked := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_compactor_blocks_marked_total",
Help: "Total number of blocks marked in compactor.",
}, []string{"marker"})
blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename)
blocksMarked.WithLabelValues(metadata.DeletionMarkFilename)

garbageCollectedBlocks := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_compact_garbage_collected_blocks_total",
Help: "Total number of blocks marked for deletion by compactor.",
Expand Down Expand Up @@ -244,7 +248,7 @@ func runCompact(
cf,
duplicateBlocksFilter,
ignoreDeletionMarkFilter,
blocksMarkedForDeletion,
blocksMarked.WithLabelValues(metadata.DeletionMarkFilename),
garbageCollectedBlocks,
conf.blockSyncConcurrency)
if err != nil {
Expand Down Expand Up @@ -280,9 +284,31 @@ func runCompact(
return errors.Wrap(err, "clean working downsample directory")
}

grouper := compact.NewDefaultGrouper(logger, bkt, conf.acceptMalformedIndex, enableVerticalCompaction, reg, blocksMarkedForDeletion, garbageCollectedBlocks)
grouper := compact.NewDefaultGrouper(
logger,
bkt,
conf.acceptMalformedIndex,
enableVerticalCompaction,
reg,
blocksMarked.WithLabelValues(metadata.DeletionMarkFilename),
garbageCollectedBlocks,
)
blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, blocksCleaned, blockCleanupFailures)
compactor, err := compact.NewBucketCompactor(logger, sy, grouper, compact.NewPlanner(logger, levels, noCompactMarkerFilter), comp, compactDir, bkt, conf.compactionConcurrency)
compactor, err := compact.NewBucketCompactor(
logger,
sy,
grouper,
compact.WithLargeTotalIndexSizeFilter(
compact.NewPlanner(logger, levels, noCompactMarkerFilter),
bkt,
int64(conf.maxBlockIndexSize),
blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename),
),
comp,
compactDir,
bkt,
conf.compactionConcurrency,
)
if err != nil {
cancel()
return errors.Wrap(err, "create bucket compactor")
Expand Down Expand Up @@ -373,7 +399,7 @@ func runCompact(
return errors.Wrap(err, "sync before first pass of downsampling")
}

if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, sy.Metas(), retentionByResolution, blocksMarkedForDeletion); err != nil {
if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, sy.Metas(), retentionByResolution, blocksMarked.WithLabelValues(metadata.DeletionMarkFilename)); err != nil {
return errors.Wrap(err, "retention failed")
}

Expand Down Expand Up @@ -512,6 +538,7 @@ type compactConfig struct {
selectorRelabelConf extflag.PathOrContent
webConf webConfig
label string
maxBlockIndexSize units.Base2Bytes
}

func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -574,6 +601,13 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
"This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication.").
Hidden().StringsVar(&cc.dedupReplicaLabels)

// TODO(bwplotka): This is short term fix for https://github.com/thanos-io/thanos/issues/1424, replace with vertical block sharding https://github.com/thanos-io/thanos/pull/3390.
cmd.Flag("compact.block-max-index-size", "Maximum index size for the resulted block during any compaction. Note that"+
"total size is approximated in worst case. If the block that would be resulted from compaction is estimated to exceed this number, biggest source"+
"block is marked for no compaction (no-compact-mark.json is uploaded) which causes this block to be excluded from any compaction. "+
"Default is due to https://github.com/thanos-io/thanos/issues/1424, but it's overall recommended to keeps block size to some reasonable size.").
Hidden().Default("64GB").BytesVar(&cc.maxBlockIndexSize)
bwplotka marked this conversation as resolved.
Show resolved Hide resolved

cc.selectorRelabelConf = *extkingpin.RegisterSelectorRelabelFlags(cmd)

cc.webConf.registerFlag(cmd)
Expand Down
32 changes: 32 additions & 0 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,35 @@ func gatherFileStats(blockDir string) (res []metadata.File, _ error) {
// TODO(bwplotka): Add optional files like tombstones?
return res, err
}

// MarkForNoCompact creates a file which marks block to be not compacted.
func MarkForNoCompact(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, reason metadata.NoCompactReason, noCompactDetails string, markedForNoCompact prometheus.Counter) error {
m := path.Join(id.String(), metadata.NoCompactMarkFilename)
noCompactMarkExists, err := bkt.Exists(ctx, m)
if err != nil {
return errors.Wrapf(err, "check exists %s in bucket", m)
}
if noCompactMarkExists {
level.Warn(logger).Log("msg", "requested to mark for no compaction, but file already exists; this should not happen; investigate", "err", errors.Errorf("file %s already exists in bucket", m))
return nil
}

noCompactMark, err := json.Marshal(metadata.NoCompactMark{
ID: id,
Version: metadata.NoCompactMarkVersion1,

Time: time.Now().Unix(),
Reason: reason,
Details: noCompactDetails,
})
if err != nil {
return errors.Wrap(err, "json encode no compact mark")
}

if err := bkt.Upload(ctx, m, bytes.NewBuffer(noCompactMark)); err != nil {
return errors.Wrapf(err, "upload file %s to bucket", m)
}
markedForNoCompact.Inc()
level.Info(logger).Log("msg", "block has been marked for no compaction", "block", id)
return nil
}
56 changes: 56 additions & 0 deletions pkg/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,59 @@ func TestMarkForDeletion(t *testing.T) {
})
}
}

func TestMarkForNoCompact(t *testing.T) {
defer testutil.TolerantVerifyLeak(t)
ctx := context.Background()

tmpDir, err := ioutil.TempDir("", "test-block-mark-for-no-compact")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()

for _, tcase := range []struct {
name string
preUpload func(t testing.TB, id ulid.ULID, bkt objstore.Bucket)

blocksMarked int
}{
{
name: "block marked",
preUpload: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {},
blocksMarked: 1,
},
{
name: "block with no-compact mark already, expected log and no metric increment",
preUpload: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {
m, err := json.Marshal(metadata.NoCompactMark{
ID: id,
Time: time.Now().Unix(),
Version: metadata.NoCompactMarkVersion1,
})
testutil.Ok(t, err)
testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), metadata.NoCompactMarkFilename), bytes.NewReader(m)))
},
blocksMarked: 0,
},
} {
t.Run(tcase.name, func(t *testing.T) {
bkt := objstore.NewInMemBucket()
id, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
{{Name: "a", Value: "3"}},
{{Name: "a", Value: "4"}},
{{Name: "b", Value: "1"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124)
testutil.Ok(t, err)

tcase.preUpload(t, id, bkt)

testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, id.String())))

c := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
err = MarkForNoCompact(ctx, log.NewNopLogger(), bkt, id, metadata.ManualNoCompactReason, "", c)
testutil.Ok(t, err)
testutil.Equals(t, float64(tcase.blocksMarked), promtest.ToFloat64(c))
})
}
}
2 changes: 2 additions & 0 deletions pkg/block/metadata/markers.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type NoCompactMark struct {
// Version of the file.
Version int `json:"version"`

// Time is a unix timestamp of when the block was marked for no compact.
Time int64 `json:"time"`
Reason NoCompactReason `json:"reason"`
// Details is a human readable string giving details of reason.
Details string `json:"details"`
Expand Down
91 changes: 90 additions & 1 deletion pkg/compact/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,17 @@ package compact

import (
"context"
"fmt"
"math"
"path/filepath"

"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"
)

type tsdbBasedPlanner struct {
Expand Down Expand Up @@ -42,7 +49,10 @@ func NewPlanner(logger log.Logger, ranges []int64, noCompBlocks *GatherNoCompact

// TODO(bwplotka): Consider smarter algorithm, this prefers smaller iterative compactions vs big single one: https://github.com/thanos-io/thanos/issues/3405
func (p *tsdbBasedPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) {
noCompactMarked := p.noCompBlocksFunc()
return p.plan(p.noCompBlocksFunc(), metasByMinTime)
}

func (p *tsdbBasedPlanner) plan(noCompactMarked map[ulid.ULID]*metadata.NoCompactMark, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) {
notExcludedMetasByMinTime := make([]*metadata.Meta, 0, len(metasByMinTime))
for _, meta := range metasByMinTime {
if _, excluded := noCompactMarked[meta.ULID]; excluded {
Expand Down Expand Up @@ -212,3 +222,82 @@ func splitByRange(metasByMinTime []*metadata.Meta, tr int64) [][]*metadata.Meta

return splitDirs
}

type largeTotalIndexSizeFilter struct {
*tsdbBasedPlanner

bkt objstore.Bucket
markedForNoCompact prometheus.Counter
totalMaxIndexSizeBytes int64
}

var _ Planner = &largeTotalIndexSizeFilter{}

// WithLargeTotalIndexSizeFilter wraps Planner with largeTotalIndexSizeFilter that checks the given plans and estimates total index size.
// When found, it marks block for no compaction by placing no-compact.json and updating cache.
// NOTE: The estimation is very rough as it assumes extreme cases of indexes sharing no bytes, thus summing all source index sizes.
// Adjust limit accordingly reducing to some % of actual limit you want to give.
// TODO(bwplotka): This is short term fix for https://github.com/thanos-io/thanos/issues/1424, replace with vertical block sharding https://github.com/thanos-io/thanos/pull/3390.
func WithLargeTotalIndexSizeFilter(with *tsdbBasedPlanner, bkt objstore.Bucket, totalMaxIndexSizeBytes int64, markedForNoCompact prometheus.Counter) *largeTotalIndexSizeFilter {
return &largeTotalIndexSizeFilter{tsdbBasedPlanner: with, bkt: bkt, totalMaxIndexSizeBytes: totalMaxIndexSizeBytes, markedForNoCompact: markedForNoCompact}
}

func (t *largeTotalIndexSizeFilter) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) {
noCompactMarked := t.noCompBlocksFunc()
copiedNoCompactMarked := make(map[ulid.ULID]*metadata.NoCompactMark, len(noCompactMarked))
for k, v := range noCompactMarked {
copiedNoCompactMarked[k] = v
}

PlanLoop:
for {
plan, err := t.plan(copiedNoCompactMarked, metasByMinTime)
if err != nil {
return nil, err
}
var totalIndexBytes, maxIndexSize int64 = 0, math.MinInt64
var biggestIndex int
for i, p := range plan {
indexSize := int64(-1)
for _, f := range p.Thanos.Files {
if f.RelPath == block.IndexFilename {
indexSize = f.SizeBytes
}
}
if indexSize <= 0 {
// Get size from bkt instead.
attr, err := t.bkt.Attributes(ctx, filepath.Join(p.ULID.String(), block.IndexFilename))
if err != nil {
return nil, errors.Wrapf(err, "get attr of %v", filepath.Join(p.ULID.String(), block.IndexFilename))
}
indexSize = attr.Size
}

if maxIndexSize < indexSize {
maxIndexSize = indexSize
biggestIndex = i
}
totalIndexBytes += indexSize
if totalIndexBytes >= t.totalMaxIndexSizeBytes {
// Marking blocks for no compact to limit size.
// TODO(bwplotka): Make sure to reset cache once this is done: https://github.com/thanos-io/thanos/issues/3408
if err := block.MarkForNoCompact(
ctx,
t.logger,
t.bkt,
plan[biggestIndex].ULID,
metadata.IndexSizeExceedingNoCompactReason,
fmt.Sprintf("largeTotalIndexSizeFilter: Total compacted block's index size could exceed: %v with this block. See https://github.com/thanos-io/thanos/issues/1424", t.totalMaxIndexSizeBytes),
t.markedForNoCompact,
); err != nil {
return nil, errors.Wrapf(err, "mark %v for no compaction", plan[biggestIndex].ULID.String())
}
// Make sure wrapped planner exclude this block.
copiedNoCompactMarked[plan[biggestIndex].ULID] = &metadata.NoCompactMark{ID: plan[biggestIndex].ULID, Version: metadata.NoCompactMarkVersion1}
continue PlanLoop
}
}
// Planned blocks should not exceed limit.
return plan, nil
}
}
Loading