diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index 13e1e54ff56..12b870f20f2 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -718,6 +718,8 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat "is being rewritten, the resulted rewritten block might only cause overlap (mitigated by marking overlapping block manually for deletion)"+ "and the data you wanted to rewrite could already part of bigger block.\n\n"+ "Use FILESYSTEM type of bucket to rewrite block on disk (suitable for vanilla Prometheus)"+ + "Resources needed: 1 CPU and:"+ + "* For deletions: At max 1/32 of posting offsets, 1/32 of symbols, largest labels for single series and biggest XOR chunk."+ "WARNING: This procedure is *IRREVERSIBLE* after certain time (delete delay), so do backup your blocks first (you can use objstore.config-backup flags for this command)") blockIDs := cmd.Flag("id", "ID (ULID) of the blocks for rewrite (repeated flag).").Required().Strings() objStoreBackupConfig := extkingpin.RegisterCommonObjStoreFlags(cmd, "-backup", false, "Used for backup-ing block before rewrite if you choose so (only use in non-dry run mode).") diff --git a/pkg/block/writer_modifiers.go b/pkg/block/writer_modifiers.go index 12abff1d803..5228ca7b07e 100644 --- a/pkg/block/writer_modifiers.go +++ b/pkg/block/writer_modifiers.go @@ -3,31 +3,35 @@ package block import ( "math" + "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tombstones" ) type Modifier interface { - Modify(sym index.StringIter, set storage.ChunkSeriesSet, log printChangeLog) (index.StringIter, storage.ChunkSeriesSet) + Modify(sym index.StringIter, set storage.ChunkSeriesSet, log ChangeLogger, p ProgressLogger) (index.StringIter, storage.ChunkSeriesSet) } type DeletionModifier struct { deletions []DeleteRequest } -func WithDeletionModifier(deletions []DeleteRequest) *DeletionModifier { +func WithDeletionModifier(deletions ...DeleteRequest) *DeletionModifier { return &DeletionModifier{deletions: deletions} } -func (d *DeletionModifier) Modify(sym index.StringIter, set storage.ChunkSeriesSet, log printChangeLog) (index.StringIter, storage.ChunkSeriesSet) { +func (d *DeletionModifier) Modify(sym index.StringIter, set storage.ChunkSeriesSet, log ChangeLogger, p ProgressLogger) (index.StringIter, storage.ChunkSeriesSet) { return sym, &delModifierSeriesSet{ d: d, ChunkSeriesSet: set, log: log, + p: p, } } @@ -35,9 +39,11 @@ type delModifierSeriesSet struct { storage.ChunkSeriesSet d *DeletionModifier - log printChangeLog + log ChangeLogger + p ProgressLogger - err error + curr *storage.ChunkSeriesEntry + err error } func (d *delModifierSeriesSet) Next() bool { @@ -53,10 +59,10 @@ func (d *delModifierSeriesSet) Next() bool { continue } - if m.Matches(v) { + if !m.Matches(v) { continue } - for _, in := range deletions.intervals { + for _, in := range deletions.Intervals { intervals = intervals.Add(in) } break @@ -65,41 +71,260 @@ func (d *delModifierSeriesSet) Next() bool { if (tombstones.Interval{Mint: math.MinInt64, Maxt: math.MaxInt64}.IsSubrange(intervals)) { // Quick path for skipping series completely. - chksIter := d.ChunkSeriesSet.At().Iterator() + chksIter := s.Iterator() var chks []chunks.Meta for chksIter.Next() { chks = append(chks, chksIter.At()) } - d.err = chksIter.Err() - if d.err != nil { + if d.err = chksIter.Err(); d.err != nil { return false } - deleted := tombstones.Intervals{} + var deleted tombstones.Intervals if len(chks) > 0 { - deleted.Add(tombstones.Interval{Mint: chks[0].MinTime, Maxt: chks[len(chks)].MaxTime}) + deleted = deleted.Add(tombstones.Interval{Mint: chks[0].MinTime, Maxt: chks[len(chks)-1].MaxTime}) } d.log.DeleteSeries(lbls, deleted) + d.p.SeriesProcessed() continue } + + d.curr = &storage.ChunkSeriesEntry{ + Lset: lbls, + ChunkIteratorFn: func() chunks.Iterator { + return NewDelGenericSeriesIterator(s.Iterator(), intervals, func(intervals tombstones.Intervals) { + d.log.DeleteSeries(lbls, intervals) + }).ToChunkSeriesIterator() + }, + } + return true } return false } -func (d *delModifierSeriesSet) At() storage.ChunkSeries { +// Intersection returns intersection between interval and range of intervals. +func Intersection(i tombstones.Interval, dranges tombstones.Intervals) tombstones.Intervals { + var ret tombstones.Intervals + for _, r := range dranges { + isLeftIn := r.Mint <= i.Maxt + isRightIn := i.Mint <= r.Maxt + if !isLeftIn || !isRightIn { + continue + } + intersection := tombstones.Interval{Mint: r.Mint, Maxt: r.Maxt} + if intersection.Mint < i.Mint { + intersection.Mint = i.Mint + } + if intersection.Maxt > i.Maxt { + intersection.Maxt = i.Maxt + } + ret = ret.Add(intersection) + } + return ret +} + +func (d *delModifierSeriesSet) At() storage.ChunkSeries { + return d.curr } func (d *delModifierSeriesSet) Err() error { - panic("implement me") + if d.err != nil { + return d.err + } + return d.ChunkSeriesSet.Err() } func (d *delModifierSeriesSet) Warnings() storage.Warnings { - panic("implement me") + return d.ChunkSeriesSet.Warnings() +} + +type delGenericSeriesIterator struct { + chks chunks.Iterator + + err error + bufIter *tsdb.DeletedIterator + intervals tombstones.Intervals + + currDelIter chunkenc.Iterator + currChkMeta chunks.Meta + logDelete func(intervals tombstones.Intervals) + deleted tombstones.Intervals + + passedAny bool +} + +func NewDelGenericSeriesIterator( + chks chunks.Iterator, + intervals tombstones.Intervals, + logDelete func(intervals tombstones.Intervals), +) *delGenericSeriesIterator { + return &delGenericSeriesIterator{ + chks: chks, + bufIter: &tsdb.DeletedIterator{}, + intervals: intervals, + logDelete: logDelete, + } +} + +func (d *delGenericSeriesIterator) next() (ok bool) { + if d.err != nil { + return false + } + + for d.chks.Next() { + d.currChkMeta = d.chks.At() + + if chk := (tombstones.Interval{Mint: d.currChkMeta.MinTime, Maxt: d.currChkMeta.MaxTime}); chk.IsSubrange(d.intervals) { + d.deleted = d.deleted.Add(chk) + continue + } + d.bufIter.Intervals = d.bufIter.Intervals[:0] + for _, interval := range d.intervals { + if d.currChkMeta.OverlapsClosedInterval(interval.Mint, interval.Maxt) { + d.bufIter.Intervals = d.bufIter.Intervals.Add(interval) + } + } + if len(d.bufIter.Intervals) == 0 { + d.currDelIter = nil + return true + } + + for _, del := range Intersection(tombstones.Interval{Mint: d.currChkMeta.MinTime, Maxt: d.currChkMeta.MaxTime}, d.bufIter.Intervals) { + d.deleted = d.deleted.Add(del) + } + + // We don't want full chunk, take just part of it. + d.bufIter.Iter = d.currChkMeta.Chunk.Iterator(nil) + d.currDelIter = d.bufIter + return true + } + if len(d.deleted) > 0 { + d.logDelete(d.deleted) + } + return false +} + +func (d *delGenericSeriesIterator) Err() error { + if d.err != nil { + return d.err + } + return d.chks.Err() +} + +func (d *delGenericSeriesIterator) ToSeriesIterator() chunkenc.Iterator { + return &delSeriesIterator{delGenericSeriesIterator: d} +} +func (d *delGenericSeriesIterator) ToChunkSeriesIterator() chunks.Iterator { + return &delChunkSeriesIterator{delGenericSeriesIterator: d} } +// delSeriesIterator allows to iterate over samples for the single series. +type delSeriesIterator struct { + *delGenericSeriesIterator + + curr chunkenc.Iterator +} + +func (p *delSeriesIterator) Next() bool { + if p.curr != nil && p.curr.Next() { + return true + } + + for p.next() { + if p.currDelIter != nil { + p.curr = p.currDelIter + } else { + p.curr = p.currChkMeta.Chunk.Iterator(nil) + } + if p.curr.Next() { + return true + } + } + return false +} + +func (p *delSeriesIterator) Seek(t int64) bool { + if p.curr != nil && p.curr.Seek(t) { + return true + } + for p.Next() { + if p.curr.Seek(t) { + return true + } + } + return false +} + +func (p *delSeriesIterator) At() (int64, float64) { return p.curr.At() } + +func (p *delSeriesIterator) Err() error { + if err := p.delGenericSeriesIterator.Err(); err != nil { + return err + } + if p.curr != nil { + return p.curr.Err() + } + return nil +} + +type delChunkSeriesIterator struct { + *delGenericSeriesIterator + + curr chunks.Meta +} + +func (p *delChunkSeriesIterator) Next() bool { + if !p.next() { + return false + } + + p.curr = p.currChkMeta + if p.currDelIter == nil { + return true + } + + // Re-encode the chunk if iterator is provider. This means that it has some samples to be deleted or chunk is opened. + newChunk := chunkenc.NewXORChunk() + app, err := newChunk.Appender() + if err != nil { + p.err = err + return false + } + + if !p.currDelIter.Next() { + if err := p.currDelIter.Err(); err != nil { + p.err = errors.Wrap(err, "iterate chunk while re-encoding") + return false + } + + // Empty chunk, this should not happen, as we assume full deletions being filtered before this iterator. + p.err = errors.Wrap(err, "populateWithDelChunkSeriesIterator: unexpected empty chunk found while rewriting chunk") + return false + } + + t, v := p.currDelIter.At() + p.curr.MinTime = t + app.Append(t, v) + + for p.currDelIter.Next() { + t, v = p.currDelIter.At() + app.Append(t, v) + } + if err := p.currDelIter.Err(); err != nil { + p.err = errors.Wrap(err, "iterate chunk while re-encoding") + return false + } + + p.curr.Chunk = newChunk + p.curr.MaxTime = t + return true +} + +func (p *delChunkSeriesIterator) At() chunks.Meta { return p.curr } + // TODO(bwplotka): Add relabelling. type DeleteRequest struct { Matchers []*labels.Matcher - intervals tombstones.Intervals + Intervals tombstones.Intervals } diff --git a/pkg/block/writer_series.go b/pkg/block/writer_series.go index 5a90b748264..e1ab13a0d1a 100644 --- a/pkg/block/writer_series.go +++ b/pkg/block/writer_series.go @@ -7,6 +7,7 @@ import ( "strings" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" @@ -18,7 +19,7 @@ import ( "github.com/prometheus/prometheus/tsdb/tombstones" ) -type printChangeLog interface { +type ChangeLogger interface { DeleteSeries(del labels.Labels, intervals tombstones.Intervals) ModifySeries(old labels.Labels, new labels.Labels) } @@ -27,6 +28,12 @@ type changeLog struct { w io.Writer } +func NewChangeLog(w io.Writer) *changeLog { + return &changeLog{ + w: w, + } +} + func (l *changeLog) DeleteSeries(del labels.Labels, intervals tombstones.Intervals) { _, _ = fmt.Fprintf(l.w, "Deleted %v %v\n", del.String(), intervals) } @@ -35,12 +42,34 @@ func (l *changeLog) ModifySeries(old labels.Labels, new labels.Labels) { _, _ = fmt.Fprintf(l.w, "Relabelled %v %v\n", old.String(), new.String()) } +type ProgressLogger interface { + SeriesProcessed() +} + +type progressLogger struct { + logger log.Logger + + series int + processed int +} + +func NewProgressLogger(logger log.Logger, series int) *progressLogger { + return &progressLogger{logger: logger, series: series} +} + +func (p *progressLogger) SeriesProcessed() { + p.processed++ + if (p.series/10) == 0 || p.processed%(p.series/10) == 0 { + level.Info(p.logger).Log("msg", fmt.Sprintf("processed %0.2f%s of %v series", 100*(float64(p.processed)/float64(p.series)), "%", p.series)) + } +} + type seriesWriter struct { tmpDir string logger log.Logger chunkPool chunkenc.Pool - changeLogger printChangeLog + changeLogger ChangeLogger dryRun bool } @@ -50,7 +79,7 @@ type seriesReader struct { cr tsdb.ChunkReader } -func NewSeriesWriter(tmpDir string, logger log.Logger, changeLogger printChangeLog, pool chunkenc.Pool) *seriesWriter { +func NewSeriesWriter(tmpDir string, logger log.Logger, changeLogger ChangeLogger, pool chunkenc.Pool) *seriesWriter { return &seriesWriter{ tmpDir: tmpDir, logger: logger, @@ -59,8 +88,14 @@ func NewSeriesWriter(tmpDir string, logger log.Logger, changeLogger printChangeL } } +func NewDryRunSeriesWriter(tmpDir string, logger log.Logger, changeLogger ChangeLogger, pool chunkenc.Pool) *seriesWriter { + s := NewSeriesWriter(tmpDir, logger, changeLogger, pool) + s.dryRun = true + return s +} + // TODO(bwplotka): Upstream this. -func (w *seriesWriter) WriteSeries(ctx context.Context, readers []Reader, sWriter Writer, modifiers ...Modifier) (err error) { +func (w *seriesWriter) WriteSeries(ctx context.Context, readers []Reader, sWriter Writer, p ProgressLogger, modifiers ...Modifier) (err error) { if len(readers) == 0 { return errors.New("cannot write from no readers") } @@ -104,14 +139,27 @@ func (w *seriesWriter) WriteSeries(ctx context.Context, readers []Reader, sWrite } for _, m := range modifiers { - symbols, set = m.Modify(symbols, set, w.changeLogger) + symbols, set = m.Modify(symbols, set, w.changeLogger, p) } if w.dryRun { + // Even for dry run, we need to exhaust iterators to see potential changes. + for set.Next() { + s := set.At() + iter := s.Iterator() + for iter.Next() { + } + if err := iter.Err(); err != nil { + level.Error(w.logger).Log("msg", "error while iterating over chunks", "series", s.Labels(), "err", err) + } + } + if err := set.Err(); err != nil { + level.Error(w.logger).Log("msg", "error while iterating over set", "err", err) + } return nil } - if err := w.write(ctx, symbols, set, sWriter); err != nil { + if err := w.write(ctx, symbols, set, sWriter, p); err != nil { return errors.Wrap(err, "write") } return nil @@ -214,18 +262,12 @@ func (s *lazyPopulateChunkSeriesSet) Err() error { func (s *lazyPopulateChunkSeriesSet) Warnings() storage.Warnings { return nil } -// populatableChunk allows to trigger when you want to have chunks populated. -type populatableChunk interface { - Populate(intervals tombstones.Intervals) (err error) -} - type lazyPopulatableChunk struct { m *chunks.Meta cr tsdb.ChunkReader populated chunkenc.Chunk - bufIter *tsdb.DeletedIterator } type errChunkIterator struct{ err error } @@ -246,12 +288,7 @@ func (e errChunk) Iterator(chunkenc.Iterator) chunkenc.Iterator { return e.err } func (e errChunk) NumSamples() int { return 0 } func (e errChunk) Compact() {} -func (l *lazyPopulatableChunk) Populate(intervals tombstones.Intervals) { - if len(intervals) > 0 && (tombstones.Interval{Mint: l.m.MinTime, Maxt: l.m.MaxTime}.IsSubrange(intervals)) { - l.m.Chunk = EmptyChunk - return - } - +func (l *lazyPopulatableChunk) populate() { // TODO(bwplotka): In most cases we don't need to parse anything, just copy. Extend reader/writer for this. var err error l.populated, err = l.cr.Chunk(l.m.Ref) @@ -260,70 +297,52 @@ func (l *lazyPopulatableChunk) Populate(intervals tombstones.Intervals) { return } - var matching tombstones.Intervals - for _, interval := range intervals { - if l.m.OverlapsClosedInterval(interval.Mint, interval.Maxt) { - matching = matching.Add(interval) - } - } - - if len(matching) == 0 { - l.m.Chunk = l.populated - return - } - - // TODO(bwplotka): Optimize by using passed iterator. - l.bufIter = &tsdb.DeletedIterator{Intervals: matching, Iter: l.populated.Iterator(nil)} - return - + l.m.Chunk = l.populated } func (l *lazyPopulatableChunk) Bytes() []byte { if l.populated == nil { - l.Populate(nil) + l.populate() } return l.populated.Bytes() } func (l *lazyPopulatableChunk) Encoding() chunkenc.Encoding { if l.populated == nil { - l.Populate(nil) + l.populate() } return l.populated.Encoding() } func (l *lazyPopulatableChunk) Appender() (chunkenc.Appender, error) { if l.populated == nil { - l.Populate(nil) + l.populate() } return l.populated.Appender() } func (l *lazyPopulatableChunk) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator { if l.populated == nil { - l.Populate(nil) + l.populate() } - if l.bufIter == nil { - return l.populated.Iterator(iterator) - } - return l.bufIter + return l.populated.Iterator(iterator) } func (l *lazyPopulatableChunk) NumSamples() int { if l.populated == nil { - l.Populate(nil) + l.populate() } return l.populated.NumSamples() } func (l *lazyPopulatableChunk) Compact() { if l.populated == nil { - l.Populate(nil) + l.populate() } l.populated.Compact() } -func (w *seriesWriter) write(ctx context.Context, symbols index.StringIter, populatedSet storage.ChunkSeriesSet, sWriter SeriesWriter) error { +func (w *seriesWriter) write(ctx context.Context, symbols index.StringIter, populatedSet storage.ChunkSeriesSet, sWriter SeriesWriter, p ProgressLogger) error { var ( chks []chunks.Meta ref uint64 @@ -361,6 +380,8 @@ func (w *seriesWriter) write(ctx context.Context, symbols index.StringIter, popu // Skip the series with all deleted chunks. if len(chks) == 0 { + // All series will be ignored. + p.SeriesProcessed() continue } @@ -370,13 +391,13 @@ func (w *seriesWriter) write(ctx context.Context, symbols index.StringIter, popu if err := sWriter.AddSeries(ref, s.Labels(), chks...); err != nil { return errors.Wrap(err, "add series") } - for _, chk := range chks { if err := w.chunkPool.Put(chk.Chunk); err != nil { return errors.Wrap(err, "put chunk") } } ref++ + p.SeriesProcessed() } if populatedSet.Err() != nil { return errors.Wrap(populatedSet.Err(), "iterate populated chunk series set") diff --git a/pkg/block/writer_series_test.go b/pkg/block/writer_series_test.go index 1b3560e17b7..0edca7d539b 100644 --- a/pkg/block/writer_series_test.go +++ b/pkg/block/writer_series_test.go @@ -1,8 +1,10 @@ package block import ( + "bytes" "context" "io/ioutil" + "math" "os" "path/filepath" "sort" @@ -17,6 +19,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" + "github.com/prometheus/prometheus/tsdb/tombstones" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/testutil" ) @@ -29,11 +32,14 @@ func TestSeriesWriter_WriteSeries_e2e(t *testing.T) { for _, tcase := range []struct { name string - input [][]seriesSamples - expected []seriesSamples - expectedErr error - expectedStats tsdb.BlockStats - modifiers struct{} + input [][]seriesSamples + modifiers []Modifier + dryRun bool + + expected []seriesSamples + expectedErr error + expectedStats tsdb.BlockStats + expectedChanges string }{ { name: "empty block", @@ -103,6 +109,136 @@ func TestSeriesWriter_WriteSeries_e2e(t *testing.T) { NumChunks: 7, }, }, + { + name: "1 blocks + delete modifier, empty deletion request", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + }, + modifiers: []Modifier{WithDeletionModifier()}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + expectedStats: tsdb.BlockStats{ + NumSamples: 18, + NumSeries: 3, + NumChunks: 4, + }, + }, + { + name: "1 blocks + delete modifier, deletion request no deleting anything", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + }, + modifiers: []Modifier{WithDeletionModifier( + DeleteRequest{ + Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "0")}, + Intervals: tombstones.Intervals{{Mint: math.MinInt64, Maxt: math.MaxInt64}}, + }, DeleteRequest{ + Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "1")}, + Intervals: tombstones.Intervals{{Mint: math.MinInt64, Maxt: -1}}, + })}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + expectedStats: tsdb.BlockStats{ + NumSamples: 18, + NumSeries: 3, + NumChunks: 4, + }, + }, + { + name: "1 blocks + delete modifier, delete second series", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + }, + modifiers: []Modifier{WithDeletionModifier( + DeleteRequest{ + Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "2")}, + Intervals: tombstones.Intervals{{Mint: math.MinInt64, Maxt: math.MaxInt64}}, + }, DeleteRequest{ + Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "1")}, + Intervals: tombstones.Intervals{{Mint: math.MinInt64, Maxt: -1}}, + })}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + expectedChanges: "Deleted {a=\"2\"} [{0 20}]\n", + expectedStats: tsdb.BlockStats{ + NumSamples: 12, + NumSeries: 2, + NumChunks: 2, + }, + }, + { + name: "1 blocks + delete modifier, delete second series and part of first 3rd", + input: [][]seriesSamples{ + { + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "2"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}}, {{10, 11}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 12}, {11, 11}, {20, 20}}}}, + }, + }, + modifiers: []Modifier{WithDeletionModifier( + DeleteRequest{ + Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "2")}, + Intervals: tombstones.Intervals{{Mint: math.MinInt64, Maxt: math.MaxInt64}}, + }, DeleteRequest{ + Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "1")}, + Intervals: tombstones.Intervals{{Mint: math.MinInt64, Maxt: -1}}, + }, DeleteRequest{ + Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "3")}, + Intervals: tombstones.Intervals{{Mint: 10, Maxt: 11}}, + })}, + expected: []seriesSamples{ + {lset: labels.Labels{{Name: "a", Value: "1"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {10, 10}, {11, 11}, {20, 20}}}}, + {lset: labels.Labels{{Name: "a", Value: "3"}}, + chunks: [][]sample{{{0, 0}, {1, 1}, {2, 2}, {20, 20}}}}, + }, + expectedChanges: "Deleted {a=\"2\"} [{0 20}]\nDeleted {a=\"3\"} [{10 11}]\n", + expectedStats: tsdb.BlockStats{ + NumSamples: 10, + NumSeries: 2, + NumChunks: 2, + }, + }, } { t.Run(tcase.name, func(t *testing.T) { tmpDir, err := ioutil.TempDir("", "test-series-writer") @@ -110,10 +246,20 @@ func TestSeriesWriter_WriteSeries_e2e(t *testing.T) { defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() chunkPool := chunkenc.NewPool() - s := NewSeriesWriter(tmpDir, logger, chunkPool) + changes := bytes.Buffer{} + changeLog := &changeLog{w: &changes} + var s *seriesWriter + if tcase.dryRun { + s = NewDryRunSeriesWriter(tmpDir, logger, changeLog, chunkPool) + } else { + s = NewSeriesWriter(tmpDir, logger, changeLog, chunkPool) + } + + var series int var blocks []Reader for _, b := range tcase.input { + series += len(b) id := ulid.MustNew(uint64(len(blocks)+1), nil) bdir := filepath.Join(tmpDir, id.String()) testutil.Ok(t, os.MkdirAll(bdir, os.ModePerm)) @@ -128,16 +274,19 @@ func TestSeriesWriter_WriteSeries_e2e(t *testing.T) { id := ulid.MustNew(uint64(len(blocks)+1), nil) d, err := NewDiskWriter(ctx, logger, filepath.Join(tmpDir, id.String())) testutil.Ok(t, err) + p := NewProgressLogger(logger, series) if tcase.expectedErr != nil { - err := s.WriteSeries(ctx, nil, d) + err := s.WriteSeries(ctx, blocks, d, p, tcase.modifiers...) testutil.NotOk(t, err) testutil.Equals(t, tcase.expectedErr.Error(), err.Error()) return } - testutil.Ok(t, s.WriteSeries(ctx, blocks, d)) + testutil.Ok(t, s.WriteSeries(ctx, blocks, d, p, tcase.modifiers...)) stats, err := d.Flush() testutil.Ok(t, err) + + testutil.Equals(t, tcase.expectedChanges, changes.String()) testutil.Equals(t, tcase.expectedStats, stats) testutil.Equals(t, tcase.expected, readBlockSeries(t, filepath.Join(tmpDir, id.String()))) })