From be5f58db492df720ee57ef824a249315a2c40160 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Wed, 2 Dec 2020 09:40:37 -0700 Subject: [PATCH] Implement delete with predicate. --- http/delete_handler.go | 16 ++++++--- http/delete_test.go | 4 +-- predicate/predicate.go | 1 + storage/engine.go | 15 +++++---- tsdb/index.go | 45 ++++++++++++++++++++++++++ tsdb/store.go | 73 ++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 141 insertions(+), 13 deletions(-) diff --git a/http/delete_handler.go b/http/delete_handler.go index 4d5d150d135..13cd048daf4 100644 --- a/http/delete_handler.go +++ b/http/delete_handler.go @@ -114,16 +114,22 @@ func (h *DeleteHandler) handleDelete(w http.ResponseWriter, r *http.Request) { return } - h.HandleHTTPError(r.Context(), &influxdb.Error{ - Code: influxdb.ENotImplemented, - Op: "http/handleDelete", - Msg: "Not implemented", - }, w) + if err := h.DeleteService.DeleteBucketRangePredicate(r.Context(), dr.Org.ID, dr.Bucket.ID, dr.Start, dr.Stop, dr.Predicate); err != nil { + h.HandleHTTPError(ctx, &influxdb.Error{ + Code: influxdb.EInternal, + Op: "http/handleDelete", + Msg: fmt.Sprintf("unable to delete: %v", err), + Err: err, + }, w) + return + } h.log.Debug("Deleted", zap.String("orgID", fmt.Sprint(dr.Org.ID.String())), zap.String("bucketID", fmt.Sprint(dr.Bucket.ID.String())), ) + + w.WriteHeader(http.StatusNoContent) } func decodeDeleteRequest(ctx context.Context, r *http.Request, orgSvc influxdb.OrganizationService, bucketSvc influxdb.BucketService) (*deleteRequest, error) { diff --git a/http/delete_test.go b/http/delete_test.go index fca803ef429..4be4f23e1f4 100644 --- a/http/delete_test.go +++ b/http/delete_test.go @@ -228,7 +228,7 @@ func TestDelete(t *testing.T) { }, }, wants: wants{ - statusCode: http.StatusNotImplemented, + statusCode: http.StatusNoContent, body: ``, }, }, @@ -333,7 +333,7 @@ func TestDelete(t *testing.T) { }, }, wants: wants{ - statusCode: http.StatusNotImplemented, + statusCode: http.StatusNoContent, body: ``, }, }, diff --git a/predicate/predicate.go b/predicate/predicate.go index 4a7d3f43dd7..ffed5161a56 100644 --- a/predicate/predicate.go +++ b/predicate/predicate.go @@ -16,6 +16,7 @@ func New(n Node) (influxdb.Predicate, error) { if n == nil { return nil, nil } + dt, err := n.ToDataType() if err != nil { return nil, err diff --git a/storage/engine.go b/storage/engine.go index 18ecebaeaa9..44a28fe2dad 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -299,15 +299,18 @@ func (e *Engine) DeleteBucket(ctx context.Context, orgID, bucketID influxdb.ID) return e.tsdbStore.DeleteDatabase(bucketID.String()) } -// DeleteBucketRange deletes an entire range of data from the storage engine. -func (e *Engine) DeleteBucketRange(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64) error { - return ErrNotImplemented -} - // DeleteBucketRangePredicate deletes data within a bucket from the storage engine. Any data // deleted must be in [min, max], and the key must match the predicate if provided. func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error { - return ErrNotImplemented + span, _ := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + e.mu.RLock() + defer e.mu.RUnlock() + if e.closing == nil { + return ErrEngineClosed + } + return e.tsdbStore.DeleteSeriesWithPredicate(bucketID.String(), min, max, pred) } func (e *Engine) BackupKVStore(ctx context.Context, w io.Writer) error { diff --git a/tsdb/index.go b/tsdb/index.go index 9a64fe66597..0d7740594f9 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -9,6 +9,7 @@ import ( "sort" "sync" + "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/influxql/query" "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/pkg/bytesutil" @@ -157,6 +158,50 @@ func (e *seriesElemAdapter) Tags() models.Tags { return e.tags } func (e *seriesElemAdapter) Deleted() bool { return e.deleted } func (e *seriesElemAdapter) Expr() influxql.Expr { return e.expr } +var _ SeriesIDIterator = (*PredicateSeriesIDIterator)(nil) + +type PredicateSeriesIDIterator struct { + itr SeriesIDIterator + sfile *SeriesFile + pred influxdb.Predicate +} + +func NewPredicateSeriesIDIterator(itr SeriesIDIterator, sfile *SeriesFile, pred influxdb.Predicate) SeriesIDIterator { + if pred == nil { + return itr + } + return &PredicateSeriesIDIterator{ + itr: itr, + sfile: sfile, + pred: pred, + } +} + +func (itr *PredicateSeriesIDIterator) Close() error { return itr.itr.Close() } + +func (itr *PredicateSeriesIDIterator) Next() (SeriesIDElem, error) { + for { + elem, err := itr.itr.Next() + if elem.SeriesID == 0 || err != nil { + return elem, err + } + + // Skip if this key has been tombstoned. + seriesKey := itr.sfile.SeriesKey(elem.SeriesID) + if len(seriesKey) == 0 { + continue + } + + name, tags := ParseSeriesKey(seriesKey) + tags = append(models.Tags{{Key: models.MeasurementTagKeyBytes, Value: name}}, tags...) + key := models.MakeKey(name, tags) + if !itr.pred.Matches(key) { + continue + } + return elem, nil + } +} + // SeriesIDElem represents a single series and optional expression. type SeriesIDElem struct { SeriesID uint64 diff --git a/tsdb/store.go b/tsdb/store.go index 646f2c6e178..68481a81ea2 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -1278,6 +1278,79 @@ func (s *Store) ShardRelativePath(id uint64) (string, error) { return relativePath(s.path, shard.path) } +// DeleteSeries loops through the local shards and deletes the series data for +// the passed in series keys. +func (s *Store) DeleteSeriesWithPredicate(database string, min, max int64, pred influxdb.Predicate) error { + s.mu.RLock() + if s.databases[database].hasMultipleIndexTypes() { + s.mu.RUnlock() + return ErrMultipleIndexTypes + } + sfile := s.sfiles[database] + if sfile == nil { + s.mu.RUnlock() + // No series file means nothing has been written to this DB and thus nothing to delete. + return nil + } + shards := s.filterShards(byDatabase(database)) + epochs := s.epochsForShards(shards) + s.mu.RUnlock() + + // Limit to 1 delete for each shard since expanding the measurement into the list + // of series keys can be very memory intensive if run concurrently. + limit := limiter.NewFixed(1) + + return s.walkShards(shards, func(sh *Shard) error { + limit.Take() + defer limit.Release() + + // install our guard and wait for any prior deletes to finish. the + // guard ensures future deletes that could conflict wait for us. + waiter := epochs[sh.id].WaitDelete(newGuard(min, max, nil, nil)) + waiter.Wait() + defer waiter.Done() + + index, err := sh.Index() + if err != nil { + return err + } + // indexSet := IndexSet{Indexes: []Index{index}, SeriesFile: sfile} + + // Find matching series keys for each measurement. + mitr, err := index.MeasurementIterator() + if err != nil { + return err + } + defer mitr.Close() + + for { + mm, err := mitr.Next() + if err != nil { + return err + } else if mm == nil { + break + } + + if err := func() error { + sitr, err := index.MeasurementSeriesIDIterator(mm) + if err != nil { + return err + } else if sitr == nil { + return nil + } + defer sitr.Close() + + itr := NewSeriesIteratorAdapter(sfile, NewPredicateSeriesIDIterator(sitr, sfile, pred)) + return sh.DeleteSeriesRange(itr, min, max) + }(); err != nil { + return err + } + } + + return nil + }) +} + // DeleteSeries loops through the local shards and deletes the series data for // the passed in series keys. func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error {