Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

move WAL tailing code from Prometheus to TSDB WAL package #606

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/tombstones"
)

// IndexWriter serializes the index for a block of series data.
Expand Down Expand Up @@ -136,7 +137,7 @@ type BlockReader interface {
Chunks() (ChunkReader, error)

// Tombstones returns a TombstoneReader over the block's deleted data.
Tombstones() (TombstoneReader, error)
Tombstones() (tombstones.TombstoneReader, error)

// Meta provides meta information about the block reader.
Meta() BlockMeta
Expand Down Expand Up @@ -278,7 +279,7 @@ type Block struct {

chunkr ChunkReader
indexr IndexReader
tombstones TombstoneReader
tombstones tombstones.TombstoneReader

logger log.Logger

Expand Down Expand Up @@ -320,7 +321,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er
}
closers = append(closers, ir)

tr, sizeTomb, err := readTombstones(dir)
tr, sizeTomb, err := tombstones.ReadTombstones(dir)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -411,7 +412,7 @@ func (pb *Block) Chunks() (ChunkReader, error) {
}

// Tombstones returns a new TombstoneReader against the block data.
func (pb *Block) Tombstones() (TombstoneReader, error) {
func (pb *Block) Tombstones() (tombstones.TombstoneReader, error) {
if err := pb.startRead(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -482,7 +483,7 @@ func (r blockIndexReader) Close() error {
}

type blockTombstoneReader struct {
TombstoneReader
tombstones.TombstoneReader
b *Block
}

Expand Down Expand Up @@ -518,7 +519,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error {
ir := pb.indexr

// Choose only valid postings which have chunks in the time-range.
stones := newMemTombstones()
stones := tombstones.NewMemTombstones()

var lset labels.Labels
var chks []chunks.Meta
Expand All @@ -534,7 +535,7 @@ Outer:
if chk.OverlapsClosedInterval(mint, maxt) {
// Delete only until the current values and not beyond.
tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime)
stones.addInterval(p.At(), Interval{tmin, tmax})
stones.AddInterval(p.At(), tombstones.Interval{tmin, tmax})
continue Outer
}
}
Expand All @@ -544,9 +545,9 @@ Outer:
return p.Err()
}

err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
err = pb.tombstones.Iter(func(id uint64, ivs tombstones.Intervals) error {
for _, iv := range ivs {
stones.addInterval(id, iv)
stones.AddInterval(id, iv)
}
return nil
})
Expand All @@ -556,7 +557,7 @@ Outer:
pb.tombstones = stones
pb.meta.Stats.NumTombstones = pb.tombstones.Total()

n, err := writeTombstoneFile(pb.logger, pb.dir, pb.tombstones)
n, err := tombstones.WriteTombstoneFile(pb.logger, pb.dir, pb.tombstones)
if err != nil {
return err
}
Expand All @@ -574,7 +575,7 @@ Outer:
func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) {
numStones := 0

if err := pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
if err := pb.tombstones.Iter(func(id uint64, ivs tombstones.Intervals) error {
numStones += len(ivs)
return nil
}); err != nil {
Expand Down Expand Up @@ -609,7 +610,7 @@ func (pb *Block) Snapshot(dir string) error {
for _, fname := range []string{
metaFilename,
indexFilename,
tombstoneFilename,
tombstones.TombstoneFilename,
} {
if err := os.Link(filepath.Join(pb.dir, fname), filepath.Join(blockDir, fname)); err != nil {
return errors.Wrapf(err, "create snapshot %s", fname)
Expand Down
21 changes: 11 additions & 10 deletions compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/tombstones"
)

// ExponentialBlockRanges returns the time ranges based on the stepSize.
Expand Down Expand Up @@ -607,7 +608,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
}

// Create an empty tombstones file.
if _, err := writeTombstoneFile(c.logger, tmp, newMemTombstones()); err != nil {
if _, err := tombstones.WriteTombstoneFile(c.logger, tmp, tombstones.NewMemTombstones()); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure tombstones.WriteTombstoneFile needs to be WriteTombstoneFile etc? Why not just tombstones.WriteFile if we have clear package name?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same for NewMemTombstones

return errors.Wrap(err, "write new tombstones file")
}

Expand Down Expand Up @@ -768,7 +769,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
//
// TODO think how to avoid the typecasting to verify when it is head block.
if _, isHeadChunk := chk.Chunk.(*safeChunk); isHeadChunk && chk.MaxTime >= meta.MaxTime {
dranges = append(dranges, Interval{Mint: meta.MaxTime, Maxt: math.MaxInt64})
dranges = append(dranges, tombstones.Interval{Mint: meta.MaxTime, Maxt: math.MaxInt64})

} else
// Sanity check for disk blocks.
Expand Down Expand Up @@ -876,15 +877,15 @@ type compactionSeriesSet struct {
p index.Postings
index IndexReader
chunks ChunkReader
tombstones TombstoneReader
tombstones tombstones.TombstoneReader

l labels.Labels
c []chunks.Meta
intervals Intervals
intervals tombstones.Intervals
err error
}

func newCompactionSeriesSet(i IndexReader, c ChunkReader, t TombstoneReader, p index.Postings) *compactionSeriesSet {
func newCompactionSeriesSet(i IndexReader, c ChunkReader, t tombstones.TombstoneReader, p index.Postings) *compactionSeriesSet {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Suggested change
func newCompactionSeriesSet(i IndexReader, c ChunkReader, t tombstones.TombstoneReader, p index.Postings) *compactionSeriesSet {
func newCompactionSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings) *compactionSeriesSet {

return &compactionSeriesSet{
index: i,
chunks: c,
Expand Down Expand Up @@ -914,7 +915,7 @@ func (c *compactionSeriesSet) Next() bool {
if len(c.intervals) > 0 {
chks := make([]chunks.Meta, 0, len(c.c))
for _, chk := range c.c {
if !(Interval{chk.MinTime, chk.MaxTime}.isSubrange(c.intervals)) {
if !(tombstones.Interval{chk.MinTime, chk.MaxTime}.IsSubrange(c.intervals)) {
chks = append(chks, chk)
}
}
Expand Down Expand Up @@ -942,7 +943,7 @@ func (c *compactionSeriesSet) Err() error {
return c.p.Err()
}

func (c *compactionSeriesSet) At() (labels.Labels, []chunks.Meta, Intervals) {
func (c *compactionSeriesSet) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) {
return c.l, c.c, c.intervals
}

Expand All @@ -952,7 +953,7 @@ type compactionMerger struct {
aok, bok bool
l labels.Labels
c []chunks.Meta
intervals Intervals
intervals tombstones.Intervals
}

func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) {
Expand Down Expand Up @@ -1008,7 +1009,7 @@ func (c *compactionMerger) Next() bool {
_, cb, rb := c.b.At()

for _, r := range rb {
ra = ra.add(r)
ra = ra.Add(r)
}

c.l = append(c.l[:0], l...)
Expand All @@ -1029,6 +1030,6 @@ func (c *compactionMerger) Err() error {
return c.b.Err()
}

func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, Intervals) {
func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) {
return c.l, c.c, c.intervals
}
11 changes: 7 additions & 4 deletions compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/tombstones"
)

func TestSplitByRange(t *testing.T) {
Expand Down Expand Up @@ -455,10 +456,12 @@ func metaRange(name string, mint, maxt int64, stats *BlockStats) dirMeta {

type erringBReader struct{}

func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") }
func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") }
func (erringBReader) Tombstones() (TombstoneReader, error) { return nil, errors.New("tombstones") }
func (erringBReader) Meta() BlockMeta { return BlockMeta{} }
func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") }
func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") }
func (erringBReader) Tombstones() (tombstones.TombstoneReader, error) {
return nil, errors.New("tombstones")
}
func (erringBReader) Meta() BlockMeta { return BlockMeta{} }

type nopChunkWriter struct{}

Expand Down
40 changes: 21 additions & 19 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import (
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/record"
"github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/tombstones"
"github.com/prometheus/tsdb/tsdbutil"
"github.com/prometheus/tsdb/wal"
)
Expand Down Expand Up @@ -243,27 +245,27 @@ func TestDeleteSimple(t *testing.T) {
numSamples := int64(10)

cases := []struct {
intervals Intervals
intervals tombstones.Intervals
remaint []int64
}{
{
intervals: Intervals{{0, 3}},
intervals: tombstones.Intervals{{0, 3}},
remaint: []int64{4, 5, 6, 7, 8, 9},
},
{
intervals: Intervals{{1, 3}},
intervals: tombstones.Intervals{{1, 3}},
remaint: []int64{0, 4, 5, 6, 7, 8, 9},
},
{
intervals: Intervals{{1, 3}, {4, 7}},
intervals: tombstones.Intervals{{1, 3}, {4, 7}},
remaint: []int64{0, 8, 9},
},
{
intervals: Intervals{{1, 3}, {4, 700}},
intervals: tombstones.Intervals{{1, 3}, {4, 700}},
remaint: []int64{0},
},
{ // This case is to ensure that labels and symbols are deleted.
intervals: Intervals{{0, 9}},
intervals: tombstones.Intervals{{0, 9}},
remaint: []int64{},
},
}
Expand Down Expand Up @@ -561,11 +563,11 @@ func TestDB_SnapshotWithDelete(t *testing.T) {

testutil.Ok(t, app.Commit())
cases := []struct {
intervals Intervals
intervals tombstones.Intervals
remaint []int64
}{
{
intervals: Intervals{{1, 3}, {4, 7}},
intervals: tombstones.Intervals{{1, 3}, {4, 7}},
remaint: []int64{0, 8, 9},
},
}
Expand Down Expand Up @@ -888,11 +890,11 @@ func TestTombstoneClean(t *testing.T) {

testutil.Ok(t, app.Commit())
cases := []struct {
intervals Intervals
intervals tombstones.Intervals
remaint []int64
}{
{
intervals: Intervals{{1, 3}, {4, 7}},
intervals: tombstones.Intervals{{1, 3}, {4, 7}},
remaint: []int64{0, 8, 9},
},
}
Expand Down Expand Up @@ -964,7 +966,7 @@ func TestTombstoneClean(t *testing.T) {
}

for _, b := range db.Blocks() {
testutil.Equals(t, newMemTombstones(), b.tombstones)
testutil.Equals(t, tombstones.NewMemTombstones(), b.tombstones)
}
}
}
Expand All @@ -990,8 +992,8 @@ func TestTombstoneCleanFail(t *testing.T) {
block, err := OpenBlock(nil, blockDir, nil)
testutil.Ok(t, err)
// Add some some fake tombstones to trigger the compaction.
tomb := newMemTombstones()
tomb.addInterval(0, Interval{0, 1})
tomb := tombstones.NewMemTombstones()
tomb.AddInterval(0, tombstones.Interval{0, 1})
block.tombstones = tomb

db.blocks = append(db.blocks, block)
Expand Down Expand Up @@ -1470,13 +1472,13 @@ func TestInitializeHeadTimestamp(t *testing.T) {
w, err := wal.New(nil, nil, path.Join(dir, "wal"), false)
testutil.Ok(t, err)

var enc RecordEncoder
var enc record.Encoder
err = w.Log(
enc.Series([]RefSeries{
enc.Series([]record.RefSeries{
{Ref: 123, Labels: labels.FromStrings("a", "1")},
{Ref: 124, Labels: labels.FromStrings("a", "2")},
}, nil),
enc.Samples([]RefSample{
enc.Samples([]record.RefSample{
{Ref: 123, T: 5000, V: 1},
{Ref: 124, T: 15000, V: 1},
}, nil),
Expand Down Expand Up @@ -1520,13 +1522,13 @@ func TestInitializeHeadTimestamp(t *testing.T) {
w, err := wal.New(nil, nil, path.Join(dir, "wal"), false)
testutil.Ok(t, err)

var enc RecordEncoder
var enc record.Encoder
err = w.Log(
enc.Series([]RefSeries{
enc.Series([]record.RefSeries{
{Ref: 123, Labels: labels.FromStrings("a", "1")},
{Ref: 124, Labels: labels.FromStrings("a", "2")},
}, nil),
enc.Samples([]RefSample{
enc.Samples([]record.RefSample{
{Ref: 123, T: 5000, V: 1},
{Ref: 124, T: 15000, V: 1},
}, nil),
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515 h1:T+h1c/A9Gawja4Y9mFVWj2vyii2bbUNDw3kt9VxK2EY=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
Expand Down
Loading