Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Don't write empty blocks #374

Merged
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
beb0d73
Dont write empty blocks
codesome Sep 8, 2018
bb76c82
Added unit test TestNoEmptyBlock
codesome Sep 12, 2018
c4edbcc
Fix infinite loop while compacting head.
codesome Sep 18, 2018
9de3926
Fix deletion of old blocks after no block is written.
codesome Sep 19, 2018
ab14c9c
Fix review comments
codesome Sep 19, 2018
268ae54
Merge remote-tracking branch 'upstream/master' into dont-write-empty-…
codesome Sep 21, 2018
281a493
Merge remote-tracking branch 'upstream/master' into dont-write-empty-…
codesome Sep 28, 2018
897dd33
Fixed infinite loop for head compaction
codesome Sep 28, 2018
9938162
Merge remote-tracking branch 'upstream/master' into dont-write-empty-…
codesome Oct 1, 2018
0faafec
Test fix attempt
codesome Oct 3, 2018
49631bb
Merge remote-tracking branch 'upstream/master' into dont-write-empty-…
codesome Nov 20, 2018
38a2c6b
Updated tests
codesome Nov 22, 2018
061971b
Fix review comments
codesome Nov 23, 2018
059dbd7
Merge remote-tracking branch 'upstream/master' into dont-write-empty-…
codesome Dec 1, 2018
2378d2b
Updated tests and added CHANGELOG entry
codesome Dec 1, 2018
98988fd
Merge remote-tracking branch 'upstream/master' into pull/374-review
Jan 16, 2019
e0bb757
rebased
Jan 16, 2019
efa23ce
Merge pull request #4 from krasi-georgiev/pull/374-review
codesome Jan 16, 2019
0eed036
Revert returning after empty block from head
codesome Jan 16, 2019
271b98f
Fix review comments
codesome Jan 16, 2019
a6a1779
Dont create empty blocks in tests
codesome Jan 16, 2019
2b509e7
nits
codesome Jan 17, 2019
ad9e3ed
simplified test
Jan 17, 2019
441ea45
Merge remote-tracking branch 'upstream/master' into pull/374-review
Jan 17, 2019
02dc732
solved the mistery for double compaction
Jan 17, 2019
162fa29
nit
Jan 17, 2019
9f24cde
less samples
Jan 17, 2019
2cb745f
Merge pull request #5 from krasi-georgiev/pull/374-review
codesome Jan 17, 2019
45bde4c
Nits
codesome Jan 17, 2019
6b87a56
revert some changes
Jan 18, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## master / unreleased
- [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed.
- [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374)
- [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change:
- added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total`
- new public interface `SizeReader: Size() int64`
Expand Down
3 changes: 3 additions & 0 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ type BlockMetaCompaction struct {
Level int `json:"level"`
// ULIDs of all source head blocks that went into the block.
Sources []ulid.ULID `json:"sources,omitempty"`
// Indicates that during compaction it resulted in a block without any samples
// so it should be deleted on the next reload.
Deletable bool `json:"deletable,omitempty"`
// Short descriptions of the direct blocks that were used to create
// this block.
Parents []BlockDesc `json:"parents,omitempty"`
Expand Down
3 changes: 1 addition & 2 deletions block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestSetCompactionFailed(t *testing.T) {
testutil.Ok(t, err)
defer os.RemoveAll(tmpdir)

blockDir := createBlock(t, tmpdir, 0, 0, 0)
blockDir := createBlock(t, tmpdir, 1, 0, 0)
b, err := OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err)
testutil.Equals(t, false, b.meta.Compaction.Failed)
Expand Down Expand Up @@ -91,6 +91,5 @@ func createBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) strin

ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime(), nil)
testutil.Ok(tb, err)

return filepath.Join(dir, ulid.String())
}
72 changes: 53 additions & 19 deletions compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,17 @@ type Compactor interface {
Plan(dir string) ([]string, error)

// Write persists a Block into a directory.
// No Block is written when resulting Block has 0 samples, and returns empty ulid.ULID{}.
Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error)

// Compact runs compaction against the provided directories. Must
// only be called concurrently with results of Plan().
// Can optionally pass a list of already open blocks,
// to avoid having to reopen them.
// When resulting Block has 0 samples
// * No block is written.
// * The source dirs are marked Deletable.
// * Returns empty ulid.ULID{}.
Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error)
}

Expand Down Expand Up @@ -186,19 +191,18 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {
return res, nil
}

// Compact any blocks that have >5% tombstones.
// Compact any blocks with big enough time range that have >5% tombstones.
for i := len(dms) - 1; i >= 0; i-- {
meta := dms[i].meta
if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] {
break
continue
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
}

if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 {
return []string{dms[i].dir}, nil
res = append(res, dms[i].dir)
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
}
}

return nil, nil
return res, nil
}

// selectDirs returns the dir metas that should be compacted into a single new block.
Expand Down Expand Up @@ -366,15 +370,34 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
meta := compactBlockMetas(uid, metas...)
err = c.write(dest, meta, blocks...)
if err == nil {
level.Info(c.logger).Log(
"msg", "compact blocks",
"count", len(blocks),
"mint", meta.MinTime,
"maxt", meta.MaxTime,
"ulid", meta.ULID,
"sources", fmt.Sprintf("%v", uids),
"duration", time.Since(start),
)
if meta.Stats.NumSamples == 0 {
for _, b := range bs {
b.meta.Compaction.Deletable = true
if err = writeMetaFile(b.dir, &b.meta); err != nil {
level.Error(c.logger).Log(
"msg", "Failed to write 'Deletable' to meta file after compaction",
"ulid", b.meta.ULID,
)
}
}
uid = ulid.ULID{}
level.Info(c.logger).Log(
"msg", "compact blocks resulted in empty block",
"count", len(blocks),
"sources", fmt.Sprintf("%v", uids),
"duration", time.Since(start),
)
} else {
level.Info(c.logger).Log(
"msg", "compact blocks",
"count", len(blocks),
"mint", meta.MinTime,
"maxt", meta.MaxTime,
"ulid", meta.ULID,
"sources", fmt.Sprintf("%v", uids),
"duration", time.Since(start),
)
}
return uid, nil
}

Expand Down Expand Up @@ -413,6 +436,10 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p
return uid, err
}

if meta.Stats.NumSamples == 0 {
return ulid.ULID{}, nil
codesome marked this conversation as resolved.
Show resolved Hide resolved
}

level.Info(c.logger).Log("msg", "write block", "mint", meta.MinTime, "maxt", meta.MaxTime, "ulid", meta.ULID)
return uid, nil
}
Expand Down Expand Up @@ -490,11 +517,6 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil {
return errors.Wrap(err, "write compaction")
}

if err = writeMetaFile(tmp, meta); err != nil {
return errors.Wrap(err, "write merged meta")
}

// We are explicitly closing them here to check for error even
// though these are covered under defer. This is because in Windows,
// you cannot delete these unless they are closed and the defer is to
Expand All @@ -506,6 +528,18 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
return errors.Wrap(err, "close index writer")
}

// Populated block is empty, so cleanup and exit.
if meta.Stats.NumSamples == 0 {
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
if err := os.RemoveAll(tmp); err != nil {
return errors.Wrap(err, "remove tmp folder after empty block failed")
}
return nil
}

if err = writeMetaFile(tmp, meta); err != nil {
return errors.Wrap(err, "write merged meta")
}

// Create an empty tombstones file.
if err := writeTombstoneFile(tmp, newMemTombstones()); err != nil {
return errors.Wrap(err, "write new tombstones file")
Expand Down
17 changes: 16 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,8 @@ func (db *DB) compact() (err error) {
// from the block interval here.
maxt: maxt - 1,
}
if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil {
uid, err := db.compactor.Write(db.dir, head, mint, maxt, nil)
if err != nil {
return errors.Wrap(err, "persist head block")
}
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved

Expand All @@ -426,6 +427,14 @@ func (db *DB) compact() (err error) {
if err := db.reload(); err != nil {
return errors.Wrap(err, "reload blocks")
}
if (uid == ulid.ULID{}) {
// Compaction resulted in an empty block.
// Head truncating during db.reload() depends on the persisted blocks and
// in this case no new block will be persisted so manually truncate the head.
if err = db.head.Truncate(maxt); err != nil {
return errors.Wrap(err, "head truncate failed (in compact)")
}
}
gouthamve marked this conversation as resolved.
Show resolved Hide resolved
runtime.GC()
}

Expand Down Expand Up @@ -588,6 +597,12 @@ func (db *DB) deletableBlocks(blocks []*Block) map[ulid.ULID]*Block {
return blocks[i].Meta().MaxTime > blocks[j].Meta().MaxTime
})

for _, block := range blocks {
if block.Meta().Compaction.Deletable {
deletable[block.Meta().ULID] = block
}
}

for ulid, block := range db.beyondTimeRetention(blocks) {
deletable[ulid] = block
}
Expand Down
120 changes: 112 additions & 8 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ func TestTombstoneCleanFail(t *testing.T) {
// totalBlocks should be >=2 so we have enough blocks to trigger compaction failure.
totalBlocks := 2
for i := 0; i < totalBlocks; i++ {
blockDir := createBlock(t, db.Dir(), 0, 0, 0)
blockDir := createBlock(t, db.Dir(), 1, 0, 0)
block, err := OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err)
// Add some some fake tombstones to trigger the compaction.
Expand Down Expand Up @@ -880,7 +880,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6
return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail")
}

block, err := OpenBlock(nil, createBlock(c.t, dest, 0, 0, 0), nil)
block, err := OpenBlock(nil, createBlock(c.t, dest, 1, 0, 0), nil)
testutil.Ok(c.t, err)
testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere.
c.blocks = append(c.blocks, block)
Expand Down Expand Up @@ -1364,6 +1364,109 @@ func TestInitializeHeadTimestamp(t *testing.T) {
})
}

func TestNoEmptyBlocks(t *testing.T) {
db, close := openTestDB(t, &Options{
BlockRanges: []int64{100},
})
defer close()
defer db.Close()
db.DisableCompactions()

rangeToTriggercompaction := db.opts.BlockRanges[0]/2*3 - 1
defaultLabel := labels.FromStrings("foo", "bar")
defaultMatcher := labels.NewMustRegexpMatcher("", ".*")

t.Run("Test no blocks after compact with empty head.", func(t *testing.T) {
testutil.Ok(t, db.compact())
actBlocks, err := blockDirs(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, len(db.Blocks()), len(actBlocks))
testutil.Equals(t, 0, len(actBlocks))
testutil.Equals(t, 0, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "no compaction should be triggered here")
})

t.Run("Test no blocks after deleting all samples from head.", func(t *testing.T) {
app := db.Appender()
_, err := app.Add(defaultLabel, 1, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, 2, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, 3+rangeToTriggercompaction, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
testutil.Ok(t, db.compact())
testutil.Equals(t, 1, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here")

actBlocks, err := blockDirs(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, len(db.Blocks()), len(actBlocks))
testutil.Equals(t, 0, len(actBlocks))

app = db.Appender()
_, err = app.Add(defaultLabel, 1, 0)
testutil.Assert(t, err == ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed")

// Adding new blocks.
currentTime := db.Head().MaxTime()
_, err = app.Add(defaultLabel, currentTime, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, currentTime+1, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())

testutil.Ok(t, db.compact())
testutil.Equals(t, 2, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here")
actBlocks, err = blockDirs(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, len(db.Blocks()), len(actBlocks))
testutil.Assert(t, len(actBlocks) == 1, "No blocks created when compacting with >0 samples")
})

t.Run(`When no new block is created from head, and there are some blocks on disk
compaction should not run into infinite loop (was seen during development).`, func(t *testing.T) {
oldBlocks := db.Blocks()
app := db.Appender()
currentTime := db.Head().MaxTime()
_, err := app.Add(defaultLabel, currentTime, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, currentTime+1, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
testutil.Ok(t, db.compact())
testutil.Equals(t, 3, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here")
testutil.Equals(t, oldBlocks, db.Blocks())
})

t.Run("Test no blocks remaining after deleting all samples from disk.", func(t *testing.T) {
currentTime := db.Head().MaxTime()
blocks := []*BlockMeta{
{MinTime: currentTime, MaxTime: currentTime + db.opts.BlockRanges[0]},
{MinTime: currentTime + 100, MaxTime: currentTime + 100 + db.opts.BlockRanges[0]},
}
for _, m := range blocks {
createBlock(t, db.Dir(), 2, m.MinTime, m.MaxTime)
}

oldBlocks := db.Blocks()
testutil.Ok(t, db.reload()) // Reload the db to register the new blocks.
testutil.Equals(t, len(blocks)+len(oldBlocks), len(db.Blocks())) // Ensure all blocks are registered.
testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
testutil.Ok(t, db.compact())
testutil.Equals(t, 4, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here as all blocks have tombstones")

actBlocks, err := blockDirs(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, len(db.Blocks()), len(actBlocks))
testutil.Equals(t, 1, len(actBlocks), "All samples are deleted. Only the most recent block should remain after compaction.")
})
}

func TestDB_LabelNames(t *testing.T) {
tests := []struct {
// Add 'sampleLabels1' -> Test Head -> Compact -> Test Disk ->
Expand Down Expand Up @@ -1468,12 +1571,13 @@ func TestCorrectNumTombstones(t *testing.T) {
defer db.Close()

blockRange := DefaultOptions.BlockRanges[0]
label := labels.FromStrings("foo", "bar")
defaultLabel := labels.FromStrings("foo", "bar")
defaultMatcher := labels.NewEqualMatcher(defaultLabel[0].Name, defaultLabel[0].Value)

app := db.Appender()
for i := int64(0); i < 3; i++ {
for j := int64(0); j < 15; j++ {
_, err := app.Add(label, i*blockRange+j, 0)
_, err := app.Add(defaultLabel, i*blockRange+j, 0)
testutil.Ok(t, err)
}
}
Expand All @@ -1483,17 +1587,17 @@ func TestCorrectNumTombstones(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, 1, len(db.blocks))

testutil.Ok(t, db.Delete(0, 1, labels.NewEqualMatcher("foo", "bar")))
testutil.Ok(t, db.Delete(0, 1, defaultMatcher))
testutil.Equals(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones)

// {0, 1} and {2, 3} are merged to form 1 tombstone.
testutil.Ok(t, db.Delete(2, 3, labels.NewEqualMatcher("foo", "bar")))
testutil.Ok(t, db.Delete(2, 3, defaultMatcher))
testutil.Equals(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones)

testutil.Ok(t, db.Delete(5, 6, labels.NewEqualMatcher("foo", "bar")))
testutil.Ok(t, db.Delete(5, 6, defaultMatcher))
testutil.Equals(t, uint64(2), db.blocks[0].meta.Stats.NumTombstones)

testutil.Ok(t, db.Delete(9, 11, labels.NewEqualMatcher("foo", "bar")))
testutil.Ok(t, db.Delete(9, 11, defaultMatcher))
testutil.Equals(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones)
}

Expand Down