Skip to content

Commit

Permalink
Added deletion modifier + tests.
Browse files Browse the repository at this point in the history
Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka committed Nov 8, 2020
1 parent ef02fe0 commit 2ad9593
Show file tree
Hide file tree
Showing 4 changed files with 467 additions and 70 deletions.
2 changes: 2 additions & 0 deletions cmd/thanos/tools_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,8 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
"is being rewritten, the resulted rewritten block might only cause overlap (mitigated by marking overlapping block manually for deletion)"+
"and the data you wanted to rewrite could already part of bigger block.\n\n"+
"Use FILESYSTEM type of bucket to rewrite block on disk (suitable for vanilla Prometheus)"+
"Resources needed: 1 CPU and:"+
"* For deletions: At max 1/32 of posting offsets, 1/32 of symbols, largest labels for single series and biggest XOR chunk."+
"WARNING: This procedure is *IRREVERSIBLE* after certain time (delete delay), so do backup your blocks first (you can use objstore.config-backup flags for this command)")
blockIDs := cmd.Flag("id", "ID (ULID) of the blocks for rewrite (repeated flag).").Required().Strings()
objStoreBackupConfig := extkingpin.RegisterCommonObjStoreFlags(cmd, "-backup", false, "Used for backup-ing block before rewrite if you choose so (only use in non-dry run mode).")
Expand Down
257 changes: 241 additions & 16 deletions pkg/block/writer_modifiers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,47 @@ package block
import (
"math"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/tombstones"
)

type Modifier interface {
Modify(sym index.StringIter, set storage.ChunkSeriesSet, log printChangeLog) (index.StringIter, storage.ChunkSeriesSet)
Modify(sym index.StringIter, set storage.ChunkSeriesSet, log ChangeLogger, p ProgressLogger) (index.StringIter, storage.ChunkSeriesSet)
}

type DeletionModifier struct {
deletions []DeleteRequest
}

func WithDeletionModifier(deletions []DeleteRequest) *DeletionModifier {
func WithDeletionModifier(deletions ...DeleteRequest) *DeletionModifier {
return &DeletionModifier{deletions: deletions}
}

func (d *DeletionModifier) Modify(sym index.StringIter, set storage.ChunkSeriesSet, log printChangeLog) (index.StringIter, storage.ChunkSeriesSet) {
func (d *DeletionModifier) Modify(sym index.StringIter, set storage.ChunkSeriesSet, log ChangeLogger, p ProgressLogger) (index.StringIter, storage.ChunkSeriesSet) {
return sym, &delModifierSeriesSet{
d: d,

ChunkSeriesSet: set,
log: log,
p: p,
}
}

type delModifierSeriesSet struct {
storage.ChunkSeriesSet

d *DeletionModifier
log printChangeLog
log ChangeLogger
p ProgressLogger

err error
curr *storage.ChunkSeriesEntry
err error
}

func (d *delModifierSeriesSet) Next() bool {
Expand All @@ -53,10 +59,10 @@ func (d *delModifierSeriesSet) Next() bool {
continue
}

if m.Matches(v) {
if !m.Matches(v) {
continue
}
for _, in := range deletions.intervals {
for _, in := range deletions.Intervals {
intervals = intervals.Add(in)
}
break
Expand All @@ -65,41 +71,260 @@ func (d *delModifierSeriesSet) Next() bool {

if (tombstones.Interval{Mint: math.MinInt64, Maxt: math.MaxInt64}.IsSubrange(intervals)) {
// Quick path for skipping series completely.
chksIter := d.ChunkSeriesSet.At().Iterator()
chksIter := s.Iterator()
var chks []chunks.Meta
for chksIter.Next() {
chks = append(chks, chksIter.At())
}
d.err = chksIter.Err()
if d.err != nil {
if d.err = chksIter.Err(); d.err != nil {
return false
}

deleted := tombstones.Intervals{}
var deleted tombstones.Intervals
if len(chks) > 0 {
deleted.Add(tombstones.Interval{Mint: chks[0].MinTime, Maxt: chks[len(chks)].MaxTime})
deleted = deleted.Add(tombstones.Interval{Mint: chks[0].MinTime, Maxt: chks[len(chks)-1].MaxTime})
}
d.log.DeleteSeries(lbls, deleted)
d.p.SeriesProcessed()
continue
}

d.curr = &storage.ChunkSeriesEntry{
Lset: lbls,
ChunkIteratorFn: func() chunks.Iterator {
return NewDelGenericSeriesIterator(s.Iterator(), intervals, func(intervals tombstones.Intervals) {
d.log.DeleteSeries(lbls, intervals)
}).ToChunkSeriesIterator()
},
}
return true
}
return false
}
func (d *delModifierSeriesSet) At() storage.ChunkSeries {

// Intersection returns intersection between interval and range of intervals.
func Intersection(i tombstones.Interval, dranges tombstones.Intervals) tombstones.Intervals {
var ret tombstones.Intervals
for _, r := range dranges {
isLeftIn := r.Mint <= i.Maxt
isRightIn := i.Mint <= r.Maxt
if !isLeftIn || !isRightIn {
continue
}
intersection := tombstones.Interval{Mint: r.Mint, Maxt: r.Maxt}
if intersection.Mint < i.Mint {
intersection.Mint = i.Mint
}
if intersection.Maxt > i.Maxt {
intersection.Maxt = i.Maxt
}
ret = ret.Add(intersection)
}
return ret
}

func (d *delModifierSeriesSet) At() storage.ChunkSeries {
return d.curr
}

func (d *delModifierSeriesSet) Err() error {
panic("implement me")
if d.err != nil {
return d.err
}
return d.ChunkSeriesSet.Err()
}

func (d *delModifierSeriesSet) Warnings() storage.Warnings {
panic("implement me")
return d.ChunkSeriesSet.Warnings()
}

type delGenericSeriesIterator struct {
chks chunks.Iterator

err error
bufIter *tsdb.DeletedIterator
intervals tombstones.Intervals

currDelIter chunkenc.Iterator
currChkMeta chunks.Meta
logDelete func(intervals tombstones.Intervals)
deleted tombstones.Intervals

passedAny bool
}

func NewDelGenericSeriesIterator(
chks chunks.Iterator,
intervals tombstones.Intervals,
logDelete func(intervals tombstones.Intervals),
) *delGenericSeriesIterator {
return &delGenericSeriesIterator{
chks: chks,
bufIter: &tsdb.DeletedIterator{},
intervals: intervals,
logDelete: logDelete,
}
}

func (d *delGenericSeriesIterator) next() (ok bool) {
if d.err != nil {
return false
}

for d.chks.Next() {
d.currChkMeta = d.chks.At()

if chk := (tombstones.Interval{Mint: d.currChkMeta.MinTime, Maxt: d.currChkMeta.MaxTime}); chk.IsSubrange(d.intervals) {
d.deleted = d.deleted.Add(chk)
continue
}
d.bufIter.Intervals = d.bufIter.Intervals[:0]
for _, interval := range d.intervals {
if d.currChkMeta.OverlapsClosedInterval(interval.Mint, interval.Maxt) {
d.bufIter.Intervals = d.bufIter.Intervals.Add(interval)
}
}
if len(d.bufIter.Intervals) == 0 {
d.currDelIter = nil
return true
}

for _, del := range Intersection(tombstones.Interval{Mint: d.currChkMeta.MinTime, Maxt: d.currChkMeta.MaxTime}, d.bufIter.Intervals) {
d.deleted = d.deleted.Add(del)
}

// We don't want full chunk, take just part of it.
d.bufIter.Iter = d.currChkMeta.Chunk.Iterator(nil)
d.currDelIter = d.bufIter
return true
}
if len(d.deleted) > 0 {
d.logDelete(d.deleted)
}
return false
}

func (d *delGenericSeriesIterator) Err() error {
if d.err != nil {
return d.err
}
return d.chks.Err()
}

func (d *delGenericSeriesIterator) ToSeriesIterator() chunkenc.Iterator {
return &delSeriesIterator{delGenericSeriesIterator: d}
}
func (d *delGenericSeriesIterator) ToChunkSeriesIterator() chunks.Iterator {
return &delChunkSeriesIterator{delGenericSeriesIterator: d}
}

// delSeriesIterator allows to iterate over samples for the single series.
type delSeriesIterator struct {
*delGenericSeriesIterator

curr chunkenc.Iterator
}

func (p *delSeriesIterator) Next() bool {
if p.curr != nil && p.curr.Next() {
return true
}

for p.next() {
if p.currDelIter != nil {
p.curr = p.currDelIter
} else {
p.curr = p.currChkMeta.Chunk.Iterator(nil)
}
if p.curr.Next() {
return true
}
}
return false
}

func (p *delSeriesIterator) Seek(t int64) bool {
if p.curr != nil && p.curr.Seek(t) {
return true
}
for p.Next() {
if p.curr.Seek(t) {
return true
}
}
return false
}

func (p *delSeriesIterator) At() (int64, float64) { return p.curr.At() }

func (p *delSeriesIterator) Err() error {
if err := p.delGenericSeriesIterator.Err(); err != nil {
return err
}
if p.curr != nil {
return p.curr.Err()
}
return nil
}

type delChunkSeriesIterator struct {
*delGenericSeriesIterator

curr chunks.Meta
}

func (p *delChunkSeriesIterator) Next() bool {
if !p.next() {
return false
}

p.curr = p.currChkMeta
if p.currDelIter == nil {
return true
}

// Re-encode the chunk if iterator is provider. This means that it has some samples to be deleted or chunk is opened.
newChunk := chunkenc.NewXORChunk()
app, err := newChunk.Appender()
if err != nil {
p.err = err
return false
}

if !p.currDelIter.Next() {
if err := p.currDelIter.Err(); err != nil {
p.err = errors.Wrap(err, "iterate chunk while re-encoding")
return false
}

// Empty chunk, this should not happen, as we assume full deletions being filtered before this iterator.
p.err = errors.Wrap(err, "populateWithDelChunkSeriesIterator: unexpected empty chunk found while rewriting chunk")
return false
}

t, v := p.currDelIter.At()
p.curr.MinTime = t
app.Append(t, v)

for p.currDelIter.Next() {
t, v = p.currDelIter.At()
app.Append(t, v)
}
if err := p.currDelIter.Err(); err != nil {
p.err = errors.Wrap(err, "iterate chunk while re-encoding")
return false
}

p.curr.Chunk = newChunk
p.curr.MaxTime = t
return true
}

func (p *delChunkSeriesIterator) At() chunks.Meta { return p.curr }

// TODO(bwplotka): Add relabelling.

type DeleteRequest struct {
Matchers []*labels.Matcher
intervals tombstones.Intervals
Intervals tombstones.Intervals
}
Loading

0 comments on commit 2ad9593

Please sign in to comment.