From e9785d63477325d69b00ac792597dfe3b6ff239c Mon Sep 17 00:00:00 2001 From: Congqi Xia Date: Wed, 7 Aug 2024 16:05:48 +0800 Subject: [PATCH] enhance: Add forward delete max batch size Separate delete data into batches if any delete data size exceeds the confgured max batch size Related to #35303 Signed-off-by: Congqi Xia --- configs/milvus.yaml | 1 + .../querynodev2/delegator/delegator_data.go | 121 ++++++++++++------ pkg/util/paramtable/component_param.go | 14 +- 3 files changed, 95 insertions(+), 41 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index e3e7ed5e9f23d..55fba2ae1a172 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -384,6 +384,7 @@ queryNode: taskQueueExpire: 60 # Control how long (many seconds) that queue retains since queue is empty enableCrossUserGrouping: false # Enable Cross user grouping when using user-task-polling policy. (Disable it if user's task can not merge each other) maxPendingTaskPerUser: 1024 # Max pending task per user in scheduler + forwardDeleteMaxBatchSize: 16777216 # the max batch size for delegator to forward streaming delete(in bytes), default 16MB dataSync: flowGraph: maxQueueLength: 16 # Maximum length of task queue in flowgraph diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index ebefbc5631be0..e9cf5e008757c 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -347,16 +347,46 @@ func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker return false, merr.WrapErrChannelNotAvailable(sd.vchannelName, "channel is unsubscribing") } - err := worker.Delete(ctx, &querypb.DeleteRequest{ - Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(nodeID)), - CollectionId: sd.collectionID, - PartitionId: segmentEntry.PartitionID, - VchannelName: sd.vchannelName, - SegmentId: segmentEntry.SegmentID, - PrimaryKeys: storage.ParsePrimaryKeys2IDs(delRecord.PrimaryKeys), - Timestamps: delRecord.Timestamps, - Scope: scope, - }) + forwardDelete := func(pks []storage.PrimaryKey, tss []Timestamp) error { + log.Info("forward pipeline delete to worker...", + zap.Int("deleteRowNum", len(pks)), + ) + return worker.Delete(ctx, &querypb.DeleteRequest{ + Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(nodeID)), + CollectionId: sd.collectionID, + PartitionId: segmentEntry.PartitionID, + VchannelName: sd.vchannelName, + SegmentId: segmentEntry.SegmentID, + PrimaryKeys: storage.ParsePrimaryKeys2IDs(pks), + Timestamps: tss, + Scope: scope, + }) + } + + // separate l0 delete data if too large + batchingForwardDelete := func(pks []storage.PrimaryKey, tss []Timestamp) error { + start := 0 + size := int64(0) + maxSize := paramtable.Get().QueryNodeCfg.ForwardDeleteMaxBatchSize.GetAsInt64() + for i, pk := range pks { + size += pk.Size() + 4 // pk size + timestamp size + if size >= maxSize { + err := forwardDelete(pks[start:i+1], tss[start:i+1]) + if err != nil { + log.Warn("failed to apply delete when LoadSegment", zap.Error(err)) + return err + } + size = 0 + start = i + 1 + } + } + // remanent data + if start < len(pks) { + return forwardDelete(pks[start:], tss[start:]) + } + return nil + } + err := batchingForwardDelete(delRecord.PrimaryKeys, delRecord.Timestamps) if errors.Is(err, merr.ErrNodeNotFound) { log.Warn("try to delete data on non-exist node") // cancel other request @@ -633,6 +663,7 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context, log := log.With( zap.Int64("segmentID", info.GetSegmentID()), ) + candidate := idCandidates[info.GetSegmentID()] position := info.GetDeltaPosition() if position == nil { // for compatibility of rolling upgrade from 2.2.x to 2.3 @@ -654,29 +685,50 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context, deleteScope = querypb.DataScope_Streaming } - deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition(), candidate) - deleteData := &storage.DeleteData{} - deleteData.AppendBatch(deletedPks, deletedTss) - if deleteData.RowCount > 0 { - log.Info("forward L0 delete to worker...", - zap.Int64("deleteRowNum", deleteData.RowCount), + forwardDelete := func(pks []storage.PrimaryKey, tss []Timestamp) error { + log.Info("forward streaming delete to worker...", + zap.Int("deleteRowNum", len(pks)), ) - err := worker.Delete(ctx, &querypb.DeleteRequest{ + return worker.Delete(ctx, &querypb.DeleteRequest{ Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(targetNodeID)), CollectionId: info.GetCollectionID(), PartitionId: info.GetPartitionID(), SegmentId: info.GetSegmentID(), - PrimaryKeys: storage.ParsePrimaryKeys2IDs(deleteData.Pks), - Timestamps: deleteData.Tss, + PrimaryKeys: storage.ParsePrimaryKeys2IDs(pks), + Timestamps: tss, Scope: deleteScope, }) - if err != nil { - log.Warn("failed to apply delete when LoadSegment", zap.Error(err)) - return err + } + + // separate l0 delete data if too large + batchingForwardDelete := func(pks []storage.PrimaryKey, tss []Timestamp) error { + start := 0 + size := int64(0) + maxSize := paramtable.Get().QueryNodeCfg.ForwardDeleteMaxBatchSize.GetAsInt64() + for i, pk := range pks { + size += pk.Size() + 4 // pk size + timestamp size + if size >= maxSize { + err := forwardDelete(pks[start:i+1], tss[start:i+1]) + if err != nil { + log.Warn("failed to apply delete when LoadSegment", zap.Error(err)) + return err + } + size = 0 + start = i + 1 + } + } + // remanent data + if start < len(pks) { + return forwardDelete(pks[start:], tss[start:]) } + return nil + } + + deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition(), candidate) + if err := batchingForwardDelete(deletedPks, deletedTss); err != nil { + return err } - deleteData = &storage.DeleteData{} // start position is dml position for segment // if this position is before deleteBuffer's safe ts, it means some delete shall be read from msgstream if position.GetTimestamp() < sd.deleteBuffer.SafeTs() { @@ -687,10 +739,14 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context, return err } - deleteData.Merge(streamDeleteData) + if err := batchingForwardDelete(streamDeleteData.Pks, streamDeleteData.Tss); err != nil { + return err + } + log.Info("load delete from stream done") } + deleteData := &storage.DeleteData{} // list buffered delete deleteRecords := sd.deleteBuffer.ListAfter(position.GetTimestamp()) for _, entry := range deleteRecords { @@ -717,21 +773,8 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context, } } // if delete count not empty, apply - if deleteData.RowCount > 0 { - log.Info("forward delete to worker...", zap.Int64("deleteRowNum", deleteData.RowCount)) - err := worker.Delete(ctx, &querypb.DeleteRequest{ - Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(targetNodeID)), - CollectionId: info.GetCollectionID(), - PartitionId: info.GetPartitionID(), - SegmentId: info.GetSegmentID(), - PrimaryKeys: storage.ParsePrimaryKeys2IDs(deleteData.Pks), - Timestamps: deleteData.Tss, - Scope: deleteScope, - }) - if err != nil { - log.Warn("failed to apply delete when LoadSegment", zap.Error(err)) - return err - } + if err := batchingForwardDelete(deleteData.Pks, deleteData.Tss); err != nil { + return err } } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 1354f8705c5cf..f5950964f5657 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2282,8 +2282,9 @@ type queryNodeConfig struct { GracefulStopTimeout ParamItem `refreshable:"false"` // delete buffer - MaxSegmentDeleteBuffer ParamItem `refreshable:"false"` - DeleteBufferBlockSize ParamItem `refreshable:"false"` + MaxSegmentDeleteBuffer ParamItem `refreshable:"false"` + DeleteBufferBlockSize ParamItem `refreshable:"false"` + ForwardDeleteMaxBatchSize ParamItem `refreshable:"true"` // loader IoPoolSize ParamItem `refreshable:"false"` @@ -2818,6 +2819,15 @@ Max read concurrency must greater than or equal to 1, and less than or equal to } p.DeleteBufferBlockSize.Init(base.mgr) + p.ForwardDeleteMaxBatchSize = ParamItem{ + Key: "queryNode.forwardDeleteMaxBatchSize", + Version: "2.4.7", + Doc: "the max batch size for delegator to forward streaming delete(in bytes), default 16MB", + Export: true, + DefaultValue: "16777216", // 16MB + } + p.ForwardDeleteMaxBatchSize.Init(base.mgr) + p.IoPoolSize = ParamItem{ Key: "queryNode.ioPoolSize", Version: "2.3.0",