Skip to content

Commit

Permalink
enhance: Add streaming forward policy switch for delegator (milvus-io…
Browse files Browse the repository at this point in the history
…#36330)

Related to milvus-io#35303

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Sep 23, 2024
1 parent 701f3bf commit 1833913
Show file tree
Hide file tree
Showing 7 changed files with 623 additions and 226 deletions.
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ queryNode:
enableCrossUserGrouping: false # Enable Cross user grouping when using user-task-polling policy. (Disable it if user's task can not merge each other)
maxPendingTaskPerUser: 1024 # Max pending task per user in scheduler
levelZeroForwardPolicy: FilterByBF # delegator level zero deletion forward policy, possible option["FilterByBF", "RemoteLoad"]
streamingDeltaForwardPolicy: FilterByBF # delegator streaming deletion forward policy, possible option["FilterByBF", "Direct"]
dataSync:
flowGraph:
maxQueueLength: 16 # The maximum size of task queue cache in flow graph in query node.
Expand Down
87 changes: 9 additions & 78 deletions internal/querynodev2/delegator/delegator_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,85 +203,10 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) {
Data: cacheItems,
})

start := time.Now()
retMap := sd.applyBFInParallel(deleteData, segments.GetBFApplyPool())
// segment => delete data
delRecords := make(map[int64]DeleteData)
retMap.Range(func(key int, value *BatchApplyRet) bool {
startIdx := value.StartIdx
pk2SegmentIDs := value.Segment2Hits

pks := deleteData[value.DeleteDataIdx].PrimaryKeys
tss := deleteData[value.DeleteDataIdx].Timestamps

for segmentID, hits := range pk2SegmentIDs {
for i, hit := range hits {
if hit {
delRecord := delRecords[segmentID]
delRecord.PrimaryKeys = append(delRecord.PrimaryKeys, pks[startIdx+i])
delRecord.Timestamps = append(delRecord.Timestamps, tss[startIdx+i])
delRecord.RowCount++
delRecords[segmentID] = delRecord
}
}
}
return true
})
bfCost := time.Since(start)

offlineSegments := typeutil.NewConcurrentSet[int64]()

sealed, growing, version := sd.distribution.PinOnlineSegments()

start = time.Now()
eg, ctx := errgroup.WithContext(context.Background())
for _, entry := range sealed {
entry := entry
eg.Go(func() error {
worker, err := sd.workerManager.GetWorker(ctx, entry.NodeID)
if err != nil {
log.Warn("failed to get worker",
zap.Int64("nodeID", paramtable.GetNodeID()),
zap.Error(err),
)
// skip if node down
// delete will be processed after loaded again
return nil
}
offlineSegments.Upsert(sd.applyDelete(ctx, entry.NodeID, worker, delRecords, entry.Segments, querypb.DataScope_Historical)...)
return nil
})
}
if len(growing) > 0 {
eg.Go(func() error {
worker, err := sd.workerManager.GetWorker(ctx, paramtable.GetNodeID())
if err != nil {
log.Error("failed to get worker(local)",
zap.Int64("nodeID", paramtable.GetNodeID()),
zap.Error(err),
)
// panic here, local worker shall not have error
panic(err)
}
offlineSegments.Upsert(sd.applyDelete(ctx, paramtable.GetNodeID(), worker, delRecords, growing, querypb.DataScope_Streaming)...)
return nil
})
}
// not error return in apply delete
_ = eg.Wait()
forwardDeleteCost := time.Since(start)

sd.distribution.Unpin(version)
offlineSegIDs := offlineSegments.Collect()
if len(offlineSegIDs) > 0 {
log.Warn("failed to apply delete, mark segment offline", zap.Int64s("offlineSegments", offlineSegIDs))
sd.markSegmentOffline(offlineSegIDs...)
}
sd.forwardStreamingDeletion(context.Background(), deleteData)

metrics.QueryNodeProcessCost.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).
Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.QueryNodeApplyBFCost.WithLabelValues("ProcessDelete", fmt.Sprint(paramtable.GetNodeID())).Observe(float64(bfCost.Milliseconds()))
metrics.QueryNodeForwardDeleteCost.WithLabelValues("ProcessDelete", fmt.Sprint(paramtable.GetNodeID())).Observe(float64(forwardDeleteCost.Milliseconds()))
}

type BatchApplyRet = struct {
Expand Down Expand Up @@ -327,7 +252,13 @@ func (sd *shardDelegator) applyBFInParallel(deleteDatas []*DeleteData, pool *con
}

// applyDelete handles delete record and apply them to corresponding workers.
func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker cluster.Worker, delRecords map[int64]DeleteData, entries []SegmentEntry, scope querypb.DataScope) []int64 {
func (sd *shardDelegator) applyDelete(ctx context.Context,
nodeID int64,
worker cluster.Worker,
delRecords func(segmentID int64) (DeleteData, bool),
entries []SegmentEntry,
scope querypb.DataScope,
) []int64 {
offlineSegments := typeutil.NewConcurrentSet[int64]()
log := sd.getLogger(ctx)

Expand All @@ -340,7 +271,7 @@ func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker
var futures []*conc.Future[struct{}]
for _, segmentEntry := range entries {
segmentEntry := segmentEntry
delRecord, ok := delRecords[segmentEntry.SegmentID]
delRecord, ok := delRecords(segmentEntry.SegmentID)
log := log.With(
zap.Int64("segmentID", segmentEntry.SegmentID),
zap.Int64("workerID", nodeID),
Expand Down
146 changes: 0 additions & 146 deletions internal/querynodev2/delegator/delegator_delta_forward.go

This file was deleted.

Loading

0 comments on commit 1833913

Please sign in to comment.