Skip to content

Commit

Permalink
Implement delete with predicate.
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson committed Dec 2, 2020
1 parent e0543e6 commit be5f58d
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 13 deletions.
16 changes: 11 additions & 5 deletions http/delete_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,22 @@ func (h *DeleteHandler) handleDelete(w http.ResponseWriter, r *http.Request) {
return
}

h.HandleHTTPError(r.Context(), &influxdb.Error{
Code: influxdb.ENotImplemented,
Op: "http/handleDelete",
Msg: "Not implemented",
}, w)
if err := h.DeleteService.DeleteBucketRangePredicate(r.Context(), dr.Org.ID, dr.Bucket.ID, dr.Start, dr.Stop, dr.Predicate); err != nil {
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EInternal,
Op: "http/handleDelete",
Msg: fmt.Sprintf("unable to delete: %v", err),
Err: err,
}, w)
return
}

h.log.Debug("Deleted",
zap.String("orgID", fmt.Sprint(dr.Org.ID.String())),
zap.String("bucketID", fmt.Sprint(dr.Bucket.ID.String())),
)

w.WriteHeader(http.StatusNoContent)
}

func decodeDeleteRequest(ctx context.Context, r *http.Request, orgSvc influxdb.OrganizationService, bucketSvc influxdb.BucketService) (*deleteRequest, error) {
Expand Down
4 changes: 2 additions & 2 deletions http/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func TestDelete(t *testing.T) {
},
},
wants: wants{
statusCode: http.StatusNotImplemented,
statusCode: http.StatusNoContent,
body: ``,
},
},
Expand Down Expand Up @@ -333,7 +333,7 @@ func TestDelete(t *testing.T) {
},
},
wants: wants{
statusCode: http.StatusNotImplemented,
statusCode: http.StatusNoContent,
body: ``,
},
},
Expand Down
1 change: 1 addition & 0 deletions predicate/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func New(n Node) (influxdb.Predicate, error) {
if n == nil {
return nil, nil
}

dt, err := n.ToDataType()
if err != nil {
return nil, err
Expand Down
15 changes: 9 additions & 6 deletions storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,15 +299,18 @@ func (e *Engine) DeleteBucket(ctx context.Context, orgID, bucketID influxdb.ID)
return e.tsdbStore.DeleteDatabase(bucketID.String())
}

// DeleteBucketRange deletes an entire range of data from the storage engine.
func (e *Engine) DeleteBucketRange(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64) error {
return ErrNotImplemented
}

// DeleteBucketRangePredicate deletes data within a bucket from the storage engine. Any data
// deleted must be in [min, max], and the key must match the predicate if provided.
func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error {
return ErrNotImplemented
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()

e.mu.RLock()
defer e.mu.RUnlock()
if e.closing == nil {
return ErrEngineClosed
}
return e.tsdbStore.DeleteSeriesWithPredicate(bucketID.String(), min, max, pred)
}

func (e *Engine) BackupKVStore(ctx context.Context, w io.Writer) error {
Expand Down
45 changes: 45 additions & 0 deletions tsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sort"
"sync"

"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/influxql/query"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/bytesutil"
Expand Down Expand Up @@ -157,6 +158,50 @@ func (e *seriesElemAdapter) Tags() models.Tags { return e.tags }
func (e *seriesElemAdapter) Deleted() bool { return e.deleted }
func (e *seriesElemAdapter) Expr() influxql.Expr { return e.expr }

var _ SeriesIDIterator = (*PredicateSeriesIDIterator)(nil)

type PredicateSeriesIDIterator struct {
itr SeriesIDIterator
sfile *SeriesFile
pred influxdb.Predicate
}

func NewPredicateSeriesIDIterator(itr SeriesIDIterator, sfile *SeriesFile, pred influxdb.Predicate) SeriesIDIterator {
if pred == nil {
return itr
}
return &PredicateSeriesIDIterator{
itr: itr,
sfile: sfile,
pred: pred,
}
}

func (itr *PredicateSeriesIDIterator) Close() error { return itr.itr.Close() }

func (itr *PredicateSeriesIDIterator) Next() (SeriesIDElem, error) {
for {
elem, err := itr.itr.Next()
if elem.SeriesID == 0 || err != nil {
return elem, err
}

// Skip if this key has been tombstoned.
seriesKey := itr.sfile.SeriesKey(elem.SeriesID)
if len(seriesKey) == 0 {
continue
}

name, tags := ParseSeriesKey(seriesKey)
tags = append(models.Tags{{Key: models.MeasurementTagKeyBytes, Value: name}}, tags...)
key := models.MakeKey(name, tags)
if !itr.pred.Matches(key) {
continue
}
return elem, nil
}
}

// SeriesIDElem represents a single series and optional expression.
type SeriesIDElem struct {
SeriesID uint64
Expand Down
73 changes: 73 additions & 0 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,79 @@ func (s *Store) ShardRelativePath(id uint64) (string, error) {
return relativePath(s.path, shard.path)
}

// DeleteSeries loops through the local shards and deletes the series data for
// the passed in series keys.
func (s *Store) DeleteSeriesWithPredicate(database string, min, max int64, pred influxdb.Predicate) error {
s.mu.RLock()
if s.databases[database].hasMultipleIndexTypes() {
s.mu.RUnlock()
return ErrMultipleIndexTypes
}
sfile := s.sfiles[database]
if sfile == nil {
s.mu.RUnlock()
// No series file means nothing has been written to this DB and thus nothing to delete.
return nil
}
shards := s.filterShards(byDatabase(database))
epochs := s.epochsForShards(shards)
s.mu.RUnlock()

// Limit to 1 delete for each shard since expanding the measurement into the list
// of series keys can be very memory intensive if run concurrently.
limit := limiter.NewFixed(1)

return s.walkShards(shards, func(sh *Shard) error {
limit.Take()
defer limit.Release()

// install our guard and wait for any prior deletes to finish. the
// guard ensures future deletes that could conflict wait for us.
waiter := epochs[sh.id].WaitDelete(newGuard(min, max, nil, nil))
waiter.Wait()
defer waiter.Done()

index, err := sh.Index()
if err != nil {
return err
}
// indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: sfile}

// Find matching series keys for each measurement.
mitr, err := index.MeasurementIterator()
if err != nil {
return err
}
defer mitr.Close()

for {
mm, err := mitr.Next()
if err != nil {
return err
} else if mm == nil {
break
}

if err := func() error {
sitr, err := index.MeasurementSeriesIDIterator(mm)
if err != nil {
return err
} else if sitr == nil {
return nil
}
defer sitr.Close()

itr := NewSeriesIteratorAdapter(sfile, NewPredicateSeriesIDIterator(sitr, sfile, pred))
return sh.DeleteSeriesRange(itr, min, max)
}(); err != nil {
return err
}
}

return nil
})
}

// DeleteSeries loops through the local shards and deletes the series data for
// the passed in series keys.
func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error {
Expand Down

0 comments on commit be5f58d

Please sign in to comment.