Skip to content

Commit

Permalink
enhance: Add forward delete max batch size
Browse files Browse the repository at this point in the history
Separate delete data into batches if any delete data size exceeds the
confgured max batch size

Related to milvus-io#35303

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia committed Aug 7, 2024
1 parent c6253f9 commit d2de474
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 41 deletions.
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ queryNode:
taskQueueExpire: 60 # Control how long (many seconds) that queue retains since queue is empty
enableCrossUserGrouping: false # Enable Cross user grouping when using user-task-polling policy. (Disable it if user's task can not merge each other)
maxPendingTaskPerUser: 1024 # Max pending task per user in scheduler
forwardDeleteMaxBatchSize: 16777216 # the max batch size for delegator to forward streaming delete(in bytes), default 16MB
dataSync:
flowGraph:
maxQueueLength: 16 # The maximum size of task queue cache in flow graph in query node.
Expand Down
121 changes: 82 additions & 39 deletions internal/querynodev2/delegator/delegator_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,16 +349,46 @@ func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker
return false, merr.WrapErrChannelNotAvailable(sd.vchannelName, "channel is unsubscribing")
}

err := worker.Delete(ctx, &querypb.DeleteRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(nodeID)),
CollectionId: sd.collectionID,
PartitionId: segmentEntry.PartitionID,
VchannelName: sd.vchannelName,
SegmentId: segmentEntry.SegmentID,
PrimaryKeys: storage.ParsePrimaryKeys2IDs(delRecord.PrimaryKeys),
Timestamps: delRecord.Timestamps,
Scope: scope,
})
forwardDelete := func(pks []storage.PrimaryKey, tss []Timestamp) error {
log.Info("forward pipeline delete to worker...",
zap.Int("deleteRowNum", len(pks)),
)
return worker.Delete(ctx, &querypb.DeleteRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(nodeID)),
CollectionId: sd.collectionID,
PartitionId: segmentEntry.PartitionID,
VchannelName: sd.vchannelName,
SegmentId: segmentEntry.SegmentID,
PrimaryKeys: storage.ParsePrimaryKeys2IDs(pks),
Timestamps: tss,
Scope: scope,
})
}

// separate l0 delete data if too large
batchingForwardDelete := func(pks []storage.PrimaryKey, tss []Timestamp) error {
start := 0
size := int64(0)
maxSize := paramtable.Get().QueryNodeCfg.ForwardDeleteMaxBatchSize.GetAsInt64()
for i, pk := range pks {
size += pk.Size() + 4 // pk size + timestamp size
if size >= maxSize {
err := forwardDelete(pks[start:i+1], tss[start:i+1])
if err != nil {
log.Warn("failed to apply delete when LoadSegment", zap.Error(err))
return err
}
size = 0
start = i + 1
}
}
// remanent data
if start < len(pks) {
return forwardDelete(pks[start:], tss[start:])
}
return nil
}
err := batchingForwardDelete(delRecord.PrimaryKeys, delRecord.Timestamps)
if errors.Is(err, merr.ErrNodeNotFound) {
log.Warn("try to delete data on non-exist node")
// cancel other request
Expand Down Expand Up @@ -635,6 +665,7 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
log := log.With(
zap.Int64("segmentID", info.GetSegmentID()),
)

candidate := idCandidates[info.GetSegmentID()]
position := info.GetDeltaPosition()
if position == nil { // for compatibility of rolling upgrade from 2.2.x to 2.3
Expand All @@ -656,29 +687,50 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
deleteScope = querypb.DataScope_Streaming
}

deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition(), candidate)
deleteData := &storage.DeleteData{}
deleteData.AppendBatch(deletedPks, deletedTss)
if deleteData.RowCount > 0 {
log.Info("forward L0 delete to worker...",
zap.Int64("deleteRowNum", deleteData.RowCount),
forwardDelete := func(pks []storage.PrimaryKey, tss []Timestamp) error {
log.Info("forward streaming delete to worker...",
zap.Int("deleteRowNum", len(pks)),
)
err := worker.Delete(ctx, &querypb.DeleteRequest{
return worker.Delete(ctx, &querypb.DeleteRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(targetNodeID)),
CollectionId: info.GetCollectionID(),
PartitionId: info.GetPartitionID(),
SegmentId: info.GetSegmentID(),
PrimaryKeys: storage.ParsePrimaryKeys2IDs(deleteData.Pks),
Timestamps: deleteData.Tss,
PrimaryKeys: storage.ParsePrimaryKeys2IDs(pks),
Timestamps: tss,
Scope: deleteScope,
})
if err != nil {
log.Warn("failed to apply delete when LoadSegment", zap.Error(err))
return err
}

// separate l0 delete data if too large
batchingForwardDelete := func(pks []storage.PrimaryKey, tss []Timestamp) error {
start := 0
size := int64(0)
maxSize := paramtable.Get().QueryNodeCfg.ForwardDeleteMaxBatchSize.GetAsInt64()
for i, pk := range pks {
size += pk.Size() + 4 // pk size + timestamp size
if size >= maxSize {
err := forwardDelete(pks[start:i+1], tss[start:i+1])
if err != nil {
log.Warn("failed to apply delete when LoadSegment", zap.Error(err))
return err
}
size = 0
start = i + 1
}
}
// remanent data
if start < len(pks) {
return forwardDelete(pks[start:], tss[start:])
}
return nil
}

deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition(), candidate)
if err := batchingForwardDelete(deletedPks, deletedTss); err != nil {
return err
}

deleteData = &storage.DeleteData{}
// start position is dml position for segment
// if this position is before deleteBuffer's safe ts, it means some delete shall be read from msgstream
if position.GetTimestamp() < sd.deleteBuffer.SafeTs() {
Expand All @@ -689,10 +741,14 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
return err
}

deleteData.Merge(streamDeleteData)
if err := batchingForwardDelete(streamDeleteData.Pks, streamDeleteData.Tss); err != nil {
return err
}

log.Info("load delete from stream done")
}

deleteData := &storage.DeleteData{}
// list buffered delete
deleteRecords := sd.deleteBuffer.ListAfter(position.GetTimestamp())
for _, entry := range deleteRecords {
Expand All @@ -719,21 +775,8 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
}
}
// if delete count not empty, apply
if deleteData.RowCount > 0 {
log.Info("forward delete to worker...", zap.Int64("deleteRowNum", deleteData.RowCount))
err := worker.Delete(ctx, &querypb.DeleteRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(targetNodeID)),
CollectionId: info.GetCollectionID(),
PartitionId: info.GetPartitionID(),
SegmentId: info.GetSegmentID(),
PrimaryKeys: storage.ParsePrimaryKeys2IDs(deleteData.Pks),
Timestamps: deleteData.Tss,
Scope: deleteScope,
})
if err != nil {
log.Warn("failed to apply delete when LoadSegment", zap.Error(err))
return err
}
if err := batchingForwardDelete(deleteData.Pks, deleteData.Tss); err != nil {
return err
}
}

Expand Down
14 changes: 12 additions & 2 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -2327,8 +2327,9 @@ type queryNodeConfig struct {
GracefulStopTimeout ParamItem `refreshable:"false"`

// delete buffer
MaxSegmentDeleteBuffer ParamItem `refreshable:"false"`
DeleteBufferBlockSize ParamItem `refreshable:"false"`
MaxSegmentDeleteBuffer ParamItem `refreshable:"false"`
DeleteBufferBlockSize ParamItem `refreshable:"false"`
ForwardDeleteMaxBatchSize ParamItem `refreshable:"true"`

// loader
IoPoolSize ParamItem `refreshable:"false"`
Expand Down Expand Up @@ -2864,6 +2865,15 @@ Max read concurrency must greater than or equal to 1, and less than or equal to
}
p.DeleteBufferBlockSize.Init(base.mgr)

p.ForwardDeleteMaxBatchSize = ParamItem{
Key: "queryNode.forwardDeleteMaxBatchSize",
Version: "2.4.7",
Doc: "the max batch size for delegator to forward streaming delete(in bytes), default 16MB",
Export: true,
DefaultValue: "16777216", // 16MB
}
p.ForwardDeleteMaxBatchSize.Init(base.mgr)

p.IoPoolSize = ParamItem{
Key: "queryNode.ioPoolSize",
Version: "2.3.0",
Expand Down

0 comments on commit d2de474

Please sign in to comment.