Skip to content

Commit

Permalink
Merge pull request #18689 from influxdata/batch-write-tombstones-when…
Browse files Browse the repository at this point in the history
…-deleting

perf(tsi1): batch write tombstone entries when dropping/deleting
  • Loading branch information
benbjohnson authored Jun 25, 2020
2 parents be0edf5 + 331569b commit 4a1a8c0
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 30 deletions.
14 changes: 10 additions & 4 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1681,6 +1681,9 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
ids := tsdb.NewSeriesIDSet()
measurements := make(map[string]struct{}, 1)

deleteIDList := make([]uint64, 0, 10000)
deleteKeyList := make([][]byte, 0, 10000)

for _, k := range seriesKeys {
if len(k) == 0 {
continue // This key was wiped because it shouldn't be removed from index.
Expand Down Expand Up @@ -1710,15 +1713,18 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
continue
}

// Insert deleting series info into queue
measurements[string(name)] = struct{}{}
// Remove the series from the local index.
if err := e.index.DropSeries(sid, k, false); err != nil {
return err
}
deleteIDList = append(deleteIDList, sid)
deleteKeyList = append(deleteKeyList, k)

// Add the id to the set of delete ids.
ids.Add(sid)
}
// Remove the series from the local index.
if err := e.index.DropSeriesList(deleteIDList, deleteKeyList, false); err != nil {
return err
}

fielsetChanged := false
for k := range measurements {
Expand Down
1 change: 1 addition & 0 deletions tsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Index interface {
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
DropSeries(seriesID uint64, key []byte, cascade bool) error
DropSeriesList(seriesID []uint64, key [][]byte, cascade bool) error
DropMeasurementIfSeriesNotExist(name []byte) (bool, error)

// Used to clean up series in inmem index that were dropped with a shard.
Expand Down
24 changes: 24 additions & 0 deletions tsdb/index/inmem/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,30 @@ func (idx *ShardIndex) DropSeries(seriesID uint64, key []byte, _ bool) error {
return nil
}

// DropSeriesList removes the provided series ids from the local bitset that tracks
// series in this shard only.
func (idx *ShardIndex) DropSeriesList(seriesIDs []uint64, keys [][]byte, _ bool) error {
// All slices must be of equal length.
if len(seriesIDs) != len(keys) {
return errors.New("seriesIDs/keys length mismatch in index")
}
idx.seriesIDSet.Lock()
for i, seriesID := range seriesIDs {
if idx.seriesIDSet.ContainsNoLock(seriesID) {
idx.seriesIDSet.RemoveNoLock(seriesID)

name := models.ParseName(keys[i])
if curr := idx.measurements[string(name)]; curr <= 1 {
delete(idx.measurements, string(name))
} else {
idx.measurements[string(name)] = curr - 1
}
}
}
idx.seriesIDSet.Unlock()
return nil
}

// DropMeasurementIfSeriesNotExist drops a measurement only if there are no more
// series for the measurment.
func (idx *ShardIndex) DropMeasurementIfSeriesNotExist(name []byte) (bool, error) {
Expand Down
58 changes: 58 additions & 0 deletions tsdb/index/tsi1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,64 @@ func (i *Index) DropSeries(seriesID uint64, key []byte, cascade bool) error {
return nil
}

// DropSeries drops the provided series from the index. If cascade is true
// and this is the last series to the measurement, the measurment will also be dropped.
func (i *Index) DropSeriesList(seriesIDs []uint64, keys [][]byte, _ bool) error {
// All slices must be of equal length.
if len(seriesIDs) != len(keys) {
return errors.New("seriesIDs/keys length mismatch in index")
}

// We need to move different series into collections for each partition
// to process.
pSeriesIDs := make([][]uint64, i.PartitionN)
pKeys := make([][][]byte, i.PartitionN)

for idx, key := range keys {
pidx := i.partitionIdx(key)
pSeriesIDs[pidx] = append(pSeriesIDs[pidx], seriesIDs[idx])
pKeys[pidx] = append(pKeys[pidx], key)
}

// Process each subset of series on each partition.
n := i.availableThreads()

// Store errors.
errC := make(chan error, i.PartitionN)

var pidx uint32 // Index of maximum Partition being worked on.
for k := 0; k < n; k++ {
go func() {
for {
idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on.
if idx >= len(i.partitions) {
return // No more work.
}

// Drop from partition.
err := i.partitions[idx].DropSeriesList(pSeriesIDs[idx])
errC <- err
}
}()
}

// Check for error
for i := 0; i < cap(errC); i++ {
if err := <-errC; err != nil {
return err
}
}

// Add sketch tombstone.
i.mu.Lock()
for _, key := range keys {
i.sTSketch.Add(key)
}
i.mu.Unlock()

return nil
}

// DropSeriesGlobal is a no-op on the tsi1 index.
func (i *Index) DropSeriesGlobal(key []byte) error { return nil }

Expand Down
63 changes: 63 additions & 0 deletions tsdb/index/tsi1/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,69 @@ func TestIndex_TagValueSeriesIDIterator(t *testing.T) {
})
}

func TestIndex_DropSeriesList(t *testing.T) {
idx := MustOpenDefaultIndex() // Uses the single series creation method CreateSeriesIfNotExists
defer idx.Close()

// Add some series.
data := []struct {
Key string
Name string
Tags map[string]string
}{
{"cpu,region=west,server=a", "cpu", map[string]string{"region": "west", "server": "a"}},
{"cpu,region=west,server=b", "cpu", map[string]string{"region": "west", "server": "b"}},
{"cpu,region=west,server=c", "cpu", map[string]string{"region": "west", "server": "c"}},
{"cpu,region=east,server=a", "cpu", map[string]string{"region": "east", "server": "a"}},
{"cpu,region=east,server=c", "cpu", map[string]string{"region": "east", "server": "c"}},
{"cpu,region=east,server=d", "cpu", map[string]string{"region": "east", "server": "d"}},
{"cpu,region=north,server=b", "cpu", map[string]string{"region": "north", "server": "b"}},
{"cpu,region=north,server=c", "cpu", map[string]string{"region": "north", "server": "c"}},
{"cpu,region=north,server=d", "cpu", map[string]string{"region": "north", "server": "d"}},
{"cpu,region=south,server=a", "cpu", map[string]string{"region": "south", "server": "a"}},
{"cpu,region=south,server=d", "cpu", map[string]string{"region": "south", "server": "d"}},
}

keys := make([][]byte, 0, 15)
seriesIDs := make([]uint64, 0, 15)
for _, pt := range data {
if err := idx.CreateSeriesIfNotExists([]byte(pt.Key), []byte(pt.Name), models.NewTags(pt.Tags)); err != nil {
t.Fatal(err)
}

keys = append(keys, []byte(pt.Key))
seriesIDs = append(seriesIDs, idx.Index.SeriesFile().SeriesID([]byte(pt.Name), models.NewTags(pt.Tags), nil))
}

// Drop series list
if err := idx.DropSeriesList(seriesIDs[0:len(seriesIDs)-2], keys[0:len(keys)-2], false); err != nil {
t.Fatal(err)
}

// Verify series still exists.
idx.Run(t, func(t *testing.T) {
if v, err := idx.MeasurementHasSeries([]byte("cpu")); err != nil {
t.Fatal(err)
} else if !v {
t.Fatal("expected series to still exist")
}
})

// Drop series list lefted
if err := idx.DropSeriesList(seriesIDs[len(seriesIDs)-2:], keys[len(keys)-2:], false); err != nil {
t.Fatal(err)
}

// Verify series is now deleted.
idx.Run(t, func(t *testing.T) {
if v, err := idx.MeasurementHasSeries([]byte("cpu")); err != nil {
t.Fatal(err)
} else if v {
t.Fatal("expected series to be deleted")
}
})
}

// Index is a test wrapper for tsi1.Index.
type Index struct {
*tsi1.Index
Expand Down
32 changes: 32 additions & 0 deletions tsdb/index/tsi1/log_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,23 @@ func (f *LogFile) DeleteSeriesID(id uint64) error {
return f.FlushAndSync()
}

// DeleteSeriesIDList adds a tombstone for seriesIDList
func (f *LogFile) DeleteSeriesIDList(ids []uint64) error {
f.mu.Lock()
defer f.mu.Unlock()

for _, id := range ids {
e := LogEntry{Flag: LogEntrySeriesTombstoneFlag, SeriesID: id}
if err := f.appendEntry(&e); err != nil {
return err
}
f.execEntry(&e)
}

// Flush buffer and sync to disk.
return f.FlushAndSync()
}

// SeriesN returns the total number of series in the file.
func (f *LogFile) SeriesN() (n uint64) {
f.mu.RLock()
Expand Down Expand Up @@ -1056,6 +1073,21 @@ func (f *LogFile) seriesSketches() (sketch, tSketch estimator.Sketch, err error)
return sketch, tSketch, nil
}

func (f *LogFile) Writes(entries []LogEntry) error {
f.mu.RLock()
defer f.mu.RUnlock()

for i := range entries {
entry := &entries[i]
if err := f.appendEntry(entry); err != nil {
return err
}
f.execEntry(entry)
}
// Flush buffer and sync to disk.
return f.FlushAndSync()
}

// LogEntry represents a single log entry in the write-ahead log.
type LogEntry struct {
Flag byte // flag
Expand Down
58 changes: 32 additions & 26 deletions tsdb/index/tsi1/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,31 +585,20 @@ func (p *Partition) DropMeasurement(name []byte) error {
}
defer fs.Release()

entries := make([]LogEntry, 0, 100)
// Delete all keys and values.
if kitr := fs.TagKeyIterator(name); kitr != nil {
for k := kitr.Next(); k != nil; k = kitr.Next() {
// Delete key if not already deleted.
if !k.Deleted() {
if err := func() error {
p.mu.RLock()
defer p.mu.RUnlock()
return p.activeLogFile.DeleteTagKey(name, k.Key())
}(); err != nil {
return err
}
entries = append(entries, LogEntry{Flag: LogEntryTagKeyTombstoneFlag, Name: name, Key: k.Key()})
}

// Delete each value in key.
if vitr := k.TagValueIterator(); vitr != nil {
for v := vitr.Next(); v != nil; v = vitr.Next() {
if !v.Deleted() {
if err := func() error {
p.mu.RLock()
defer p.mu.RUnlock()
return p.activeLogFile.DeleteTagValue(name, k.Key(), v.Value())
}(); err != nil {
return err
}
entries = append(entries, LogEntry{Flag: LogEntryTagValueTombstoneFlag, Name: name, Key: k.Key(), Value: v.Value()})
}
}
}
Expand All @@ -626,27 +615,22 @@ func (p *Partition) DropMeasurement(name []byte) error {
} else if elem.SeriesID == 0 {
break
}
if err := func() error {
p.mu.RLock()
defer p.mu.RUnlock()
return p.activeLogFile.DeleteSeriesID(elem.SeriesID)
}(); err != nil {
return err
}
entries = append(entries, LogEntry{Flag: LogEntrySeriesTombstoneFlag, SeriesID: elem.SeriesID})
}
if err = itr.Close(); err != nil {
return err
}
}

// Mark measurement as deleted.
if err := func() error {
p.mu.RLock()
defer p.mu.RUnlock()
return p.activeLogFile.DeleteMeasurement(name)
}(); err != nil {
entries = append(entries, LogEntry{Flag: LogEntryMeasurementTombstoneFlag, Name: name})

p.mu.RLock()
if err := p.activeLogFile.Writes(entries); err != nil {
p.mu.RUnlock()
return err
}
p.mu.RUnlock()

// Check if the log file needs to be swapped.
if err := p.CheckLogFile(); err != nil {
Expand Down Expand Up @@ -705,6 +689,28 @@ func (p *Partition) DropSeries(seriesID uint64) error {
return p.CheckLogFile()
}

func (p *Partition) DropSeriesList(seriesIDs []uint64) error {
if len(seriesIDs) == 0 {
return nil
}

// Delete series from index.
if err := func() error {
p.mu.RLock()
defer p.mu.RUnlock()
return p.activeLogFile.DeleteSeriesIDList(seriesIDs)
}(); err != nil {
return err
}

for _, seriesID := range seriesIDs {
p.seriesIDSet.Remove(seriesID)
}

// Swap log file, if necessary.
return p.CheckLogFile()
}

// MeasurementsSketches returns the two sketches for the partition by merging all
// instances of the type sketch types in all the index files.
func (p *Partition) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {
Expand Down

0 comments on commit 4a1a8c0

Please sign in to comment.