Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Don't write empty blocks (#374)
Browse files Browse the repository at this point in the history
* Dont write empty blocks when a compaction results in a block with no samples.

Signed-off-by: Ganesh Vernekar <[email protected]>
  • Loading branch information
codesome authored and krasi-georgiev committed Jan 18, 2019
1 parent 3929359 commit 1a9d08a
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## master / unreleased
- [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed.
- [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374)
- [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change:
- added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total`
- new public interface `SizeReader: Size() int64`
Expand Down
3 changes: 3 additions & 0 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ type BlockMetaCompaction struct {
Level int `json:"level"`
// ULIDs of all source head blocks that went into the block.
Sources []ulid.ULID `json:"sources,omitempty"`
// Indicates that during compaction it resulted in a block without any samples
// so it should be deleted on the next reload.
Deletable bool `json:"deletable,omitempty"`
// Short descriptions of the direct blocks that were used to create
// this block.
Parents []BlockDesc `json:"parents,omitempty"`
Expand Down
3 changes: 1 addition & 2 deletions block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestSetCompactionFailed(t *testing.T) {
testutil.Ok(t, err)
defer os.RemoveAll(tmpdir)

blockDir := createBlock(t, tmpdir, 0, 0, 0)
blockDir := createBlock(t, tmpdir, 1, 0, 0)
b, err := OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err)
testutil.Equals(t, false, b.meta.Compaction.Failed)
Expand Down Expand Up @@ -91,6 +91,5 @@ func createBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) strin

ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime(), nil)
testutil.Ok(tb, err)

return filepath.Join(dir, ulid.String())
}
66 changes: 50 additions & 16 deletions compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,17 @@ type Compactor interface {
Plan(dir string) ([]string, error)

// Write persists a Block into a directory.
// No Block is written when resulting Block has 0 samples, and returns empty ulid.ULID{}.
Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error)

// Compact runs compaction against the provided directories. Must
// only be called concurrently with results of Plan().
// Can optionally pass a list of already open blocks,
// to avoid having to reopen them.
// When resulting Block has 0 samples
// * No block is written.
// * The source dirs are marked Deletable.
// * Returns empty ulid.ULID{}.
Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error)
}

Expand Down Expand Up @@ -186,13 +191,12 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {
return res, nil
}

// Compact any blocks that have >5% tombstones.
// Compact any blocks with big enough time range that have >5% tombstones.
for i := len(dms) - 1; i >= 0; i-- {
meta := dms[i].meta
if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] {
break
}

if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 {
return []string{dms[i].dir}, nil
}
Expand Down Expand Up @@ -366,15 +370,34 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
meta := compactBlockMetas(uid, metas...)
err = c.write(dest, meta, blocks...)
if err == nil {
level.Info(c.logger).Log(
"msg", "compact blocks",
"count", len(blocks),
"mint", meta.MinTime,
"maxt", meta.MaxTime,
"ulid", meta.ULID,
"sources", fmt.Sprintf("%v", uids),
"duration", time.Since(start),
)
if meta.Stats.NumSamples == 0 {
for _, b := range bs {
b.meta.Compaction.Deletable = true
if err = writeMetaFile(b.dir, &b.meta); err != nil {
level.Error(c.logger).Log(
"msg", "Failed to write 'Deletable' to meta file after compaction",
"ulid", b.meta.ULID,
)
}
}
uid = ulid.ULID{}
level.Info(c.logger).Log(
"msg", "compact blocks resulted in empty block",
"count", len(blocks),
"sources", fmt.Sprintf("%v", uids),
"duration", time.Since(start),
)
} else {
level.Info(c.logger).Log(
"msg", "compact blocks",
"count", len(blocks),
"mint", meta.MinTime,
"maxt", meta.MaxTime,
"ulid", meta.ULID,
"sources", fmt.Sprintf("%v", uids),
"duration", time.Since(start),
)
}
return uid, nil
}

Expand Down Expand Up @@ -413,6 +436,10 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p
return uid, err
}

if meta.Stats.NumSamples == 0 {
return ulid.ULID{}, nil
}

level.Info(c.logger).Log("msg", "write block", "mint", meta.MinTime, "maxt", meta.MaxTime, "ulid", meta.ULID)
return uid, nil
}
Expand Down Expand Up @@ -490,11 +517,6 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil {
return errors.Wrap(err, "write compaction")
}

if err = writeMetaFile(tmp, meta); err != nil {
return errors.Wrap(err, "write merged meta")
}

// We are explicitly closing them here to check for error even
// though these are covered under defer. This is because in Windows,
// you cannot delete these unless they are closed and the defer is to
Expand All @@ -506,6 +528,18 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
return errors.Wrap(err, "close index writer")
}

// Populated block is empty, so cleanup and exit.
if meta.Stats.NumSamples == 0 {
if err := os.RemoveAll(tmp); err != nil {
return errors.Wrap(err, "remove tmp folder after empty block failed")
}
return nil
}

if err = writeMetaFile(tmp, meta); err != nil {
return errors.Wrap(err, "write merged meta")
}

// Create an empty tombstones file.
if err := writeTombstoneFile(tmp, newMemTombstones()); err != nil {
return errors.Wrap(err, "write new tombstones file")
Expand Down
17 changes: 16 additions & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,8 @@ func (db *DB) compact() (err error) {
// from the block interval here.
maxt: maxt - 1,
}
if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil {
uid, err := db.compactor.Write(db.dir, head, mint, maxt, nil)
if err != nil {
return errors.Wrap(err, "persist head block")
}

Expand All @@ -426,6 +427,14 @@ func (db *DB) compact() (err error) {
if err := db.reload(); err != nil {
return errors.Wrap(err, "reload blocks")
}
if (uid == ulid.ULID{}) {
// Compaction resulted in an empty block.
// Head truncating during db.reload() depends on the persisted blocks and
// in this case no new block will be persisted so manually truncate the head.
if err = db.head.Truncate(maxt); err != nil {
return errors.Wrap(err, "head truncate failed (in compact)")
}
}
runtime.GC()
}

Expand Down Expand Up @@ -588,6 +597,12 @@ func (db *DB) deletableBlocks(blocks []*Block) map[ulid.ULID]*Block {
return blocks[i].Meta().MaxTime > blocks[j].Meta().MaxTime
})

for _, block := range blocks {
if block.Meta().Compaction.Deletable {
deletable[block.Meta().ULID] = block
}
}

for ulid, block := range db.beyondTimeRetention(blocks) {
deletable[ulid] = block
}
Expand Down
120 changes: 112 additions & 8 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ func TestTombstoneCleanFail(t *testing.T) {
// totalBlocks should be >=2 so we have enough blocks to trigger compaction failure.
totalBlocks := 2
for i := 0; i < totalBlocks; i++ {
blockDir := createBlock(t, db.Dir(), 0, 0, 0)
blockDir := createBlock(t, db.Dir(), 1, 0, 0)
block, err := OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err)
// Add some some fake tombstones to trigger the compaction.
Expand Down Expand Up @@ -880,7 +880,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6
return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail")
}

block, err := OpenBlock(nil, createBlock(c.t, dest, 0, 0, 0), nil)
block, err := OpenBlock(nil, createBlock(c.t, dest, 1, 0, 0), nil)
testutil.Ok(c.t, err)
testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere.
c.blocks = append(c.blocks, block)
Expand Down Expand Up @@ -1364,6 +1364,109 @@ func TestInitializeHeadTimestamp(t *testing.T) {
})
}

func TestNoEmptyBlocks(t *testing.T) {
db, close := openTestDB(t, &Options{
BlockRanges: []int64{100},
})
defer close()
defer db.Close()
db.DisableCompactions()

rangeToTriggercompaction := db.opts.BlockRanges[0]/2*3 - 1
defaultLabel := labels.FromStrings("foo", "bar")
defaultMatcher := labels.NewMustRegexpMatcher("", ".*")

t.Run("Test no blocks after compact with empty head.", func(t *testing.T) {
testutil.Ok(t, db.compact())
actBlocks, err := blockDirs(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, len(db.Blocks()), len(actBlocks))
testutil.Equals(t, 0, len(actBlocks))
testutil.Equals(t, 0, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "no compaction should be triggered here")
})

t.Run("Test no blocks after deleting all samples from head.", func(t *testing.T) {
app := db.Appender()
_, err := app.Add(defaultLabel, 1, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, 2, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, 3+rangeToTriggercompaction, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
testutil.Ok(t, db.compact())
testutil.Equals(t, 1, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here")

actBlocks, err := blockDirs(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, len(db.Blocks()), len(actBlocks))
testutil.Equals(t, 0, len(actBlocks))

app = db.Appender()
_, err = app.Add(defaultLabel, 1, 0)
testutil.Assert(t, err == ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed")

// Adding new blocks.
currentTime := db.Head().MaxTime()
_, err = app.Add(defaultLabel, currentTime, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, currentTime+1, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())

testutil.Ok(t, db.compact())
testutil.Equals(t, 2, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here")
actBlocks, err = blockDirs(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, len(db.Blocks()), len(actBlocks))
testutil.Assert(t, len(actBlocks) == 1, "No blocks created when compacting with >0 samples")
})

t.Run(`When no new block is created from head, and there are some blocks on disk
compaction should not run into infinite loop (was seen during development).`, func(t *testing.T) {
oldBlocks := db.Blocks()
app := db.Appender()
currentTime := db.Head().MaxTime()
_, err := app.Add(defaultLabel, currentTime, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, currentTime+1, 0)
testutil.Ok(t, err)
_, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
testutil.Ok(t, db.compact())
testutil.Equals(t, 3, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here")
testutil.Equals(t, oldBlocks, db.Blocks())
})

t.Run("Test no blocks remaining after deleting all samples from disk.", func(t *testing.T) {
currentTime := db.Head().MaxTime()
blocks := []*BlockMeta{
{MinTime: currentTime, MaxTime: currentTime + db.opts.BlockRanges[0]},
{MinTime: currentTime + 100, MaxTime: currentTime + 100 + db.opts.BlockRanges[0]},
}
for _, m := range blocks {
createBlock(t, db.Dir(), 2, m.MinTime, m.MaxTime)
}

oldBlocks := db.Blocks()
testutil.Ok(t, db.reload()) // Reload the db to register the new blocks.
testutil.Equals(t, len(blocks)+len(oldBlocks), len(db.Blocks())) // Ensure all blocks are registered.
testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher))
testutil.Ok(t, db.compact())
testutil.Equals(t, 5, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here once for each block that have tombstones")

actBlocks, err := blockDirs(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, len(db.Blocks()), len(actBlocks))
testutil.Equals(t, 1, len(actBlocks), "All samples are deleted. Only the most recent block should remain after compaction.")
})
}

func TestDB_LabelNames(t *testing.T) {
tests := []struct {
// Add 'sampleLabels1' -> Test Head -> Compact -> Test Disk ->
Expand Down Expand Up @@ -1468,12 +1571,13 @@ func TestCorrectNumTombstones(t *testing.T) {
defer db.Close()

blockRange := DefaultOptions.BlockRanges[0]
label := labels.FromStrings("foo", "bar")
defaultLabel := labels.FromStrings("foo", "bar")
defaultMatcher := labels.NewEqualMatcher(defaultLabel[0].Name, defaultLabel[0].Value)

app := db.Appender()
for i := int64(0); i < 3; i++ {
for j := int64(0); j < 15; j++ {
_, err := app.Add(label, i*blockRange+j, 0)
_, err := app.Add(defaultLabel, i*blockRange+j, 0)
testutil.Ok(t, err)
}
}
Expand All @@ -1483,17 +1587,17 @@ func TestCorrectNumTombstones(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, 1, len(db.blocks))

testutil.Ok(t, db.Delete(0, 1, labels.NewEqualMatcher("foo", "bar")))
testutil.Ok(t, db.Delete(0, 1, defaultMatcher))
testutil.Equals(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones)

// {0, 1} and {2, 3} are merged to form 1 tombstone.
testutil.Ok(t, db.Delete(2, 3, labels.NewEqualMatcher("foo", "bar")))
testutil.Ok(t, db.Delete(2, 3, defaultMatcher))
testutil.Equals(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones)

testutil.Ok(t, db.Delete(5, 6, labels.NewEqualMatcher("foo", "bar")))
testutil.Ok(t, db.Delete(5, 6, defaultMatcher))
testutil.Equals(t, uint64(2), db.blocks[0].meta.Stats.NumTombstones)

testutil.Ok(t, db.Delete(9, 11, labels.NewEqualMatcher("foo", "bar")))
testutil.Ok(t, db.Delete(9, 11, defaultMatcher))
testutil.Equals(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones)
}

Expand Down

0 comments on commit 1a9d08a

Please sign in to comment.