From d10908ad09390b2f839c42a47723d5114d927c29 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Wed, 18 Dec 2024 20:34:03 +0800 Subject: [PATCH 01/12] fix: Fix slow dist handle and slow observe Signed-off-by: bigsheeper --- internal/querycoordv2/meta/target.go | 38 ++++++++++++++----- internal/querycoordv2/meta/target_manager.go | 6 +-- .../observers/collection_observer.go | 32 +++++++++------- internal/querycoordv2/task/action.go | 16 +------- internal/querycoordv2/task/scheduler.go | 11 +++--- internal/querycoordv2/task/task_test.go | 16 -------- internal/querynodev2/services.go | 5 ++- 7 files changed, 59 insertions(+), 65 deletions(-) diff --git a/internal/querycoordv2/meta/target.go b/internal/querycoordv2/meta/target.go index 3099d2ab9f8ac..1c8abd1523707 100644 --- a/internal/querycoordv2/meta/target.go +++ b/internal/querycoordv2/meta/target.go @@ -30,36 +30,52 @@ import ( // CollectionTarget collection target is immutable, type CollectionTarget struct { - segments map[int64]*datapb.SegmentInfo - dmChannels map[string]*DmChannel - partitions typeutil.Set[int64] // stores target partitions info - version int64 + segments map[int64]*datapb.SegmentInfo + partition2Segments map[int64][]*datapb.SegmentInfo + dmChannels map[string]*DmChannel + partitions typeutil.Set[int64] // stores target partitions info + version int64 } func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel, partitionIDs []int64) *CollectionTarget { + partition2Segments := make(map[int64][]*datapb.SegmentInfo, len(partitionIDs)) + for _, segment := range segments { + partitionID := segment.GetPartitionID() + if _, ok := partition2Segments[partitionID]; !ok { + partition2Segments[partitionID] = make([]*datapb.SegmentInfo, 0) + } + partition2Segments[partitionID] = append(partition2Segments[partitionID], segment) + } return &CollectionTarget{ - segments: segments, - dmChannels: dmChannels, - partitions: typeutil.NewSet(partitionIDs...), - version: time.Now().UnixNano(), + segments: segments, + partition2Segments: partition2Segments, + dmChannels: dmChannels, + partitions: typeutil.NewSet(partitionIDs...), + version: time.Now().UnixNano(), } } func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget { segments := make(map[int64]*datapb.SegmentInfo) dmChannels := make(map[string]*DmChannel) + partition2Segments := make(map[int64][]*datapb.SegmentInfo) var partitions []int64 for _, t := range target.GetChannelTargets() { for _, partition := range t.GetPartitionTargets() { + if _, ok := partition2Segments[partition.GetPartitionID()]; !ok { + partition2Segments[partition.GetPartitionID()] = make([]*datapb.SegmentInfo, 0, len(partition.GetSegments())) + } for _, segment := range partition.GetSegments() { - segments[segment.GetID()] = &datapb.SegmentInfo{ + info := &datapb.SegmentInfo{ ID: segment.GetID(), Level: segment.GetLevel(), CollectionID: target.GetCollectionID(), PartitionID: partition.GetPartitionID(), InsertChannel: t.GetChannelName(), } + segments[segment.GetID()] = info + partition2Segments[partition.GetPartitionID()] = append(partition2Segments[partition.GetPartitionID()], info) } partitions = append(partitions, partition.GetPartitionID()) } @@ -139,6 +155,10 @@ func (p *CollectionTarget) GetAllSegments() map[int64]*datapb.SegmentInfo { return p.segments } +func (p *CollectionTarget) GetPartitionSegments(partitionID int64) []*datapb.SegmentInfo { + return p.partition2Segments[partitionID] +} + func (p *CollectionTarget) GetTargetVersion() int64 { return p.version } diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 5b7c0f6567281..aef973d1a1745 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -467,10 +467,8 @@ func (mgr *TargetManager) GetSealedSegmentsByPartition(ctx context.Context, coll targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { segments := make(map[int64]*datapb.SegmentInfo) - for _, s := range t.GetAllSegments() { - if s.GetPartitionID() == partitionID { - segments[s.GetID()] = s - } + for _, s := range t.GetPartitionSegments(partitionID) { + segments[s.GetID()] = s } if len(segments) > 0 { diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index 99f8c2f06a341..8a5c016a94bc8 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -325,11 +325,6 @@ func (ob *CollectionObserver) observeChannelStatus(ctx context.Context, collecti } func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, partition *meta.Partition, replicaNum int32, channelTargetNum, subChannelCount int) bool { - log := log.Ctx(ctx).WithRateGroup("qcv2.observePartitionLoadStatus", 1, 60).With( - zap.Int64("collectionID", partition.GetCollectionID()), - zap.Int64("partitionID", partition.GetPartitionID()), - ) - segmentTargets := ob.targetMgr.GetSealedSegmentsByPartition(ctx, partition.GetCollectionID(), partition.GetPartitionID(), meta.NextTarget) targetNum := len(segmentTargets) + channelTargetNum @@ -338,7 +333,9 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa return false } - log.RatedInfo(10, "partition targets", + log.Ctx(ctx).WithRateGroup("qcv2.observePartitionLoadStatus", 1, 60).RatedInfo(10, "partition targets", + zap.Int64("collectionID", partition.GetCollectionID()), + zap.Int64("partitionID", partition.GetPartitionID()), zap.Int("segmentTargetNum", len(segmentTargets)), zap.Int("channelTargetNum", channelTargetNum), zap.Int("totalTargetNum", targetNum), @@ -356,7 +353,9 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa loadedCount += len(group) } if loadedCount > 0 { - log.Info("partition load progress", + log.Ctx(ctx).Info("partition load progress", + zap.Int64("collectionID", partition.GetCollectionID()), + zap.Int64("partitionID", partition.GetPartitionID()), zap.Int("subChannelCount", subChannelCount), zap.Int("loadSegmentCount", loadedCount-subChannelCount)) } @@ -370,16 +369,22 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount if loadPercentage == 100 { if !ob.targetObserver.Check(ctx, partition.GetCollectionID(), partition.PartitionID) { - log.Warn("failed to manual check current target, skip update load status") + log.Ctx(ctx).Warn("failed to manual check current target, skip update load status", + zap.Int64("collectionID", partition.GetCollectionID()), + zap.Int64("partitionID", partition.GetPartitionID())) return false } delete(ob.partitionLoadedCount, partition.GetPartitionID()) } err := ob.meta.CollectionManager.UpdatePartitionLoadPercent(ctx, partition.PartitionID, loadPercentage) if err != nil { - log.Warn("failed to update partition load percentage") + log.Ctx(ctx).Warn("failed to update partition load percentage", + zap.Int64("collectionID", partition.GetCollectionID()), + zap.Int64("partitionID", partition.GetPartitionID())) } - log.Info("partition load status updated", + log.Ctx(ctx).Info("partition load status updated", + zap.Int64("collectionID", partition.GetCollectionID()), + zap.Int64("partitionID", partition.GetPartitionID()), zap.Int32("partitionLoadPercentage", loadPercentage), ) eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("partition %d load percentage update: %d", partition.PartitionID, loadPercentage))) @@ -387,13 +392,12 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa } func (ob *CollectionObserver) observeCollectionLoadStatus(ctx context.Context, collectionID int64) { - log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID)) - collectionPercentage, err := ob.meta.CollectionManager.UpdateCollectionLoadPercent(ctx, collectionID) if err != nil { - log.Warn("failed to update collection load percentage") + log.Ctx(ctx).Warn("failed to update collection load percentage", zap.Int64("collectionID", collectionID)) } - log.Info("collection load status updated", + log.Ctx(ctx).Info("collection load status updated", + zap.Int64("collectionID", collectionID), zap.Int32("collectionLoadPercentage", collectionPercentage), ) if collectionPercentage == 100 { diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index dfbc4c44ddf52..961840abbee22 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -118,21 +118,7 @@ func (action *SegmentAction) GetScope() querypb.DataScope { func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool { if action.Type() == ActionTypeGrow { // rpc finished - if !action.rpcReturned.Load() { - return false - } - - // segment found in leader view - views := distMgr.LeaderViewManager.GetByFilter( - meta.WithChannelName2LeaderView(action.Shard), - meta.WithSegment2LeaderView(action.SegmentID, false)) - if len(views) == 0 { - return false - } - - // segment found in dist - segmentInTargetNode := distMgr.SegmentDistManager.GetByFilter(meta.WithNodeID(action.Node()), meta.WithSegmentID(action.SegmentID)) - return len(segmentInTargetNode) > 0 + return action.rpcReturned.Load() } else if action.Type() == ActionTypeReduce { // FIXME: Now shard leader's segment view is a map of segment ID to node ID, // loading segment replaces the node ID with the new one, diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index d34a4eeaeb848..c82b6fbffbaa9 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -685,6 +685,8 @@ func (scheduler *taskScheduler) schedule(node int64) { scheduler.remove(task) } + scheduler.updateTaskMetrics() + log.Info("processed tasks", zap.Int("toProcessNum", len(toProcess)), zap.Int32("committedNum", commmittedNum.Load()), @@ -736,10 +738,6 @@ func (scheduler *taskScheduler) isRelated(task Task, node int64) bool { // return true if the task should be executed, // false otherwise func (scheduler *taskScheduler) preProcess(task Task) bool { - log := log.Ctx(scheduler.ctx).WithRateGroup("qcv2.taskScheduler", 1, 60).With( - zap.Int64("collectionID", task.CollectionID()), - zap.Int64("taskID", task.ID()), - ) if task.Status() != TaskStatusStarted { return false } @@ -762,7 +760,9 @@ func (scheduler *taskScheduler) preProcess(task Task) bool { } if !ready { - log.RatedInfo(30, "Blocking reduce action in balance channel task") + log.Ctx(scheduler.ctx).WithRateGroup("qcv2.taskScheduler", 1, 60).RatedInfo(30, "Blocking reduce action in balance channel task", + zap.Int64("collectionID", task.CollectionID()), + zap.Int64("taskID", task.ID())) break } } @@ -881,7 +881,6 @@ func (scheduler *taskScheduler) remove(task Task) { log = log.With(zap.Int64("segmentID", task.SegmentID())) } - scheduler.updateTaskMetrics() log.Info("task removed") if scheduler.meta.Exist(task.Context(), task.CollectionID()) { diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 6de6a70d49175..81d755effa59c 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -1141,22 +1141,6 @@ func (suite *TaskSuite) TestSegmentTaskStale() { suite.dispatchAndWait(targetNode) suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum) - // Process tasks done - // Dist contains channels, first task stale - view := &meta.LeaderView{ - ID: targetNode, - CollectionID: suite.collection, - Segments: map[int64]*querypb.SegmentDist{}, - Channel: channel.ChannelName, - } - for _, segment := range suite.loadSegments[1:] { - view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0} - } - distSegments := lo.Map(segments, func(info *datapb.SegmentInfo, _ int) *meta.Segment { - return meta.SegmentFromInfo(info) - }) - suite.dist.LeaderViewManager.Update(targetNode, view) - suite.dist.SegmentDistManager.Update(targetNode, distSegments...) segments = make([]*datapb.SegmentInfo, 0) for _, segment := range suite.loadSegments[1:] { segments = append(segments, &datapb.SegmentInfo{ diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index af46fd1aada91..928237e1341f8 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -1210,7 +1210,10 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get growingSegments[entry.SegmentID] = &msgpb.MsgPosition{} continue } - growingSegments[entry.SegmentID] = segment.StartPosition() + // QueryCoord only requires the timestamp from the position. + growingSegments[entry.SegmentID] = &msgpb.MsgPosition{ + Timestamp: segment.StartPosition().GetTimestamp(), + } numOfGrowingRows += segment.InsertCount() } From f4b2487e2fcadc2092683d3743699de4acf4dd1d Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Thu, 19 Dec 2024 11:34:18 +0800 Subject: [PATCH 02/12] print observe and dist handler time Signed-off-by: bigsheeper --- internal/querycoordv2/dist/dist_handler.go | 10 +++++++++- internal/querycoordv2/meta/target.go | 9 +++++---- .../observers/collection_observer.go | 16 +++++++++------- internal/querycoordv2/task/scheduler.go | 9 +++++++++ 4 files changed, 32 insertions(+), 12 deletions(-) diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go index 828cedc6e5ce3..55d3371252d94 100644 --- a/internal/querycoordv2/dist/dist_handler.go +++ b/internal/querycoordv2/dist/dist_handler.go @@ -24,6 +24,7 @@ import ( "github.com/samber/lo" "go.uber.org/zap" + "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -37,6 +38,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -91,7 +93,9 @@ func (dh *distHandler) start(ctx context.Context) { } func (dh *distHandler) pullDist(ctx context.Context, failures *int, dispatchTask bool) { + tr := timerecord.NewTimeRecorder("") resp, err := dh.getDistribution(ctx) + d1 := tr.RecordSpan() if err != nil { node := dh.nodeManager.Get(dh.nodeID) *failures = *failures + 1 @@ -100,11 +104,15 @@ func (dh *distHandler) pullDist(ctx context.Context, failures *int, dispatchTask fields = append(fields, zap.Time("lastHeartbeat", node.LastHeartbeat())) } fields = append(fields, zap.Error(err)) - log.RatedWarn(30.0, "failed to get data distribution", fields...) + log.Ctx(ctx).WithRateGroup("distHandler.pullDist", 1, 60). + RatedWarn(30.0, "failed to get data distribution", fields...) } else { *failures = 0 dh.handleDistResp(ctx, resp, dispatchTask) } + log.Ctx(ctx).WithRateGroup("distHandler.pullDist", 1, 120). + RatedInfo(120.0, "pull and handle distribution done", + zap.Int("respSize", proto.Size(resp)), zap.Duration("pullDur", d1), zap.Duration("handleDur", tr.RecordSpan())) } func (dh *distHandler) handleDistResp(ctx context.Context, resp *querypb.GetDataDistributionResponse, dispatchTask bool) { diff --git a/internal/querycoordv2/meta/target.go b/internal/querycoordv2/meta/target.go index 1c8abd1523707..23b76019c13ac 100644 --- a/internal/querycoordv2/meta/target.go +++ b/internal/querycoordv2/meta/target.go @@ -92,10 +92,11 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget } return &CollectionTarget{ - segments: segments, - dmChannels: dmChannels, - partitions: typeutil.NewSet(partitions...), - version: target.GetVersion(), + segments: segments, + partition2Segments: partition2Segments, + dmChannels: dmChannels, + partitions: typeutil.NewSet(partitions...), + version: target.GetVersion(), } } diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index 8a5c016a94bc8..90ae9dfb23593 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -240,9 +240,13 @@ func (ob *CollectionObserver) readyToObserve(ctx context.Context, collectionID i func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) { loading := false + observeTaskNum := 0 + observeStart := time.Now() ob.loadTasks.Range(func(traceID string, task LoadTask) bool { loading = true + observeTaskNum++ + start := time.Now() collection := ob.meta.CollectionManager.GetCollection(ctx, task.CollectionID) if collection == nil { return true @@ -296,9 +300,12 @@ func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) { ob.loadTasks.Remove(traceID) } + log.Info("observe collection done", zap.Int64("collectionID", task.CollectionID), zap.Duration("dur", time.Since(start))) return true }) + log.Info("observe all collections done", zap.Int("num", observeTaskNum), zap.Duration("dur", time.Since(observeStart))) + // trigger check logic when loading collections/partitions if loading { ob.checkerController.Check() @@ -352,13 +359,6 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa group := utils.GroupNodesByReplica(ctx, ob.meta.ReplicaManager, partition.GetCollectionID(), nodes) loadedCount += len(group) } - if loadedCount > 0 { - log.Ctx(ctx).Info("partition load progress", - zap.Int64("collectionID", partition.GetCollectionID()), - zap.Int64("partitionID", partition.GetPartitionID()), - zap.Int("subChannelCount", subChannelCount), - zap.Int("loadSegmentCount", loadedCount-subChannelCount)) - } loadPercentage = int32(loadedCount * 100 / (targetNum * int(replicaNum))) if loadedCount <= ob.partitionLoadedCount[partition.GetPartitionID()] && loadPercentage != 100 { @@ -386,6 +386,8 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa zap.Int64("collectionID", partition.GetCollectionID()), zap.Int64("partitionID", partition.GetPartitionID()), zap.Int32("partitionLoadPercentage", loadPercentage), + zap.Int("subChannelCount", subChannelCount), + zap.Int("loadSegmentCount", loadedCount-subChannelCount), ) eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("partition %d load percentage update: %d", partition.PartitionID, loadPercentage))) return true diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index c82b6fbffbaa9..7a73205ec8704 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -39,6 +39,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/timerecord" . "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -644,11 +645,13 @@ func (scheduler *taskScheduler) schedule(node int64) { return } + tr := timerecord.NewTimeRecorder("") log := log.Ctx(scheduler.ctx).With( zap.Int64("nodeID", node), ) scheduler.tryPromoteAll() + promoteDur := tr.RecordSpan() log.Debug("process tasks related to node", zap.Int("processingTaskNum", scheduler.processQueue.Len()), @@ -670,6 +673,7 @@ func (scheduler *taskScheduler) schedule(node int64) { return true }) + preprocessDur := tr.RecordSpan() // The scheduler doesn't limit the number of tasks, // to commit tasks to executors as soon as possible, to reach higher merge possibility @@ -680,6 +684,7 @@ func (scheduler *taskScheduler) schedule(node int64) { } return nil }, "process") + processDur := tr.RecordSpan() for _, task := range toRemove { scheduler.remove(task) @@ -691,6 +696,10 @@ func (scheduler *taskScheduler) schedule(node int64) { zap.Int("toProcessNum", len(toProcess)), zap.Int32("committedNum", commmittedNum.Load()), zap.Int("toRemoveNum", len(toRemove)), + zap.Duration("promoteDur", promoteDur), + zap.Duration("preprocessDUr", preprocessDur), + zap.Duration("processDUr", processDur), + zap.Duration("totalDur", tr.ElapseSpan()), ) log.Info("process tasks related to node done", From 609039d1ec0e9386842184d4948906d2d5e5b2e0 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Thu, 26 Dec 2024 16:51:22 +0800 Subject: [PATCH 03/12] opt release Signed-off-by: bigsheeper --- internal/querycoordv2/task/action.go | 31 +--------------------------- internal/querycoordv2/utils/util.go | 2 +- 2 files changed, 2 insertions(+), 31 deletions(-) diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index 961840abbee22..2aa5a90d4b2ab 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -24,7 +24,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" - "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -116,35 +115,7 @@ func (action *SegmentAction) GetScope() querypb.DataScope { } func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool { - if action.Type() == ActionTypeGrow { - // rpc finished - return action.rpcReturned.Load() - } else if action.Type() == ActionTypeReduce { - // FIXME: Now shard leader's segment view is a map of segment ID to node ID, - // loading segment replaces the node ID with the new one, - // which confuses the condition of finishing, - // the leader should return a map of segment ID to list of nodes, - // now, we just always commit the release task to executor once. - // NOTE: DO NOT create a task containing release action and the action is not the last action - sealed := distMgr.SegmentDistManager.GetByFilter(meta.WithNodeID(action.Node())) - views := distMgr.LeaderViewManager.GetByFilter(meta.WithNodeID2LeaderView(action.Node())) - growing := lo.FlatMap(views, func(view *meta.LeaderView, _ int) []int64 { - return lo.Keys(view.GrowingSegments) - }) - segments := make([]int64, 0, len(sealed)+len(growing)) - for _, segment := range sealed { - segments = append(segments, segment.GetID()) - } - segments = append(segments, growing...) - if !funcutil.SliceContain(segments, action.GetSegmentID()) { - return true - } - return action.rpcReturned.Load() - } else if action.Type() == ActionTypeUpdate { - return action.rpcReturned.Load() - } - - return true + return action.rpcReturned.Load() } func (action *SegmentAction) Desc() string { diff --git a/internal/querycoordv2/utils/util.go b/internal/querycoordv2/utils/util.go index 5e283b926ee60..881873c418f27 100644 --- a/internal/querycoordv2/utils/util.go +++ b/internal/querycoordv2/utils/util.go @@ -48,7 +48,7 @@ func CheckNodeAvailable(nodeID int64, info *session.NodeInfo) error { func CheckDelegatorDataReady(nodeMgr *session.NodeManager, targetMgr meta.TargetManagerInterface, leader *meta.LeaderView, scope int32) error { log := log.Ctx(context.TODO()). WithRateGroup("utils.CheckLeaderAvailable", 1, 60). - With(zap.Int64("leaderID", leader.ID)) + With(zap.Int64("leaderID", leader.ID), zap.Int64("collectionID", leader.CollectionID)) info := nodeMgr.Get(leader.ID) // Check whether leader is online From 8802f4e168dc2e47c87f3aee69446267a7cea4c3 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Fri, 27 Dec 2024 10:37:43 +0800 Subject: [PATCH 04/12] enhance: Add channel index in target, optimize logs Signed-off-by: bigsheeper --- internal/querycoordv2/meta/target.go | 17 +++++ internal/querycoordv2/meta/target_manager.go | 9 +-- internal/querycoordv2/task/scheduler.go | 72 ++++++++------------ 3 files changed, 48 insertions(+), 50 deletions(-) diff --git a/internal/querycoordv2/meta/target.go b/internal/querycoordv2/meta/target.go index 6ebc485a76b12..1ed9714b6239f 100644 --- a/internal/querycoordv2/meta/target.go +++ b/internal/querycoordv2/meta/target.go @@ -33,6 +33,7 @@ import ( // CollectionTarget collection target is immutable, type CollectionTarget struct { segments map[int64]*datapb.SegmentInfo + channel2Segments map[string][]*datapb.SegmentInfo partition2Segments map[int64][]*datapb.SegmentInfo dmChannels map[string]*DmChannel partitions typeutil.Set[int64] // stores target partitions info @@ -43,8 +44,14 @@ type CollectionTarget struct { } func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel, partitionIDs []int64) *CollectionTarget { + channel2Segments := make(map[string][]*datapb.SegmentInfo, len(dmChannels)) partition2Segments := make(map[int64][]*datapb.SegmentInfo, len(partitionIDs)) for _, segment := range segments { + channel := segment.GetInsertChannel() + if _, ok := channel2Segments[channel]; !ok { + channel2Segments[channel] = make([]*datapb.SegmentInfo, 0) + } + channel2Segments[channel] = append(channel2Segments[channel], segment) partitionID := segment.GetPartitionID() if _, ok := partition2Segments[partitionID]; !ok { partition2Segments[partitionID] = make([]*datapb.SegmentInfo, 0) @@ -63,11 +70,15 @@ func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget { segments := make(map[int64]*datapb.SegmentInfo) dmChannels := make(map[string]*DmChannel) + channel2Segments := make(map[string][]*datapb.SegmentInfo) partition2Segments := make(map[int64][]*datapb.SegmentInfo) var partitions []int64 lackSegmentInfo := false for _, t := range target.GetChannelTargets() { + if _, ok := channel2Segments[t.GetChannelName()]; !ok { + channel2Segments[t.GetChannelName()] = make([]*datapb.SegmentInfo, 0) + } for _, partition := range t.GetPartitionTargets() { if _, ok := partition2Segments[partition.GetPartitionID()]; !ok { partition2Segments[partition.GetPartitionID()] = make([]*datapb.SegmentInfo, 0, len(partition.GetSegments())) @@ -85,6 +96,7 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget NumOfRows: segment.GetNumOfRows(), } segments[segment.GetID()] = info + channel2Segments[t.GetChannelName()] = append(channel2Segments[t.GetChannelName()], info) partition2Segments[partition.GetPartitionID()] = append(partition2Segments[partition.GetPartitionID()], info) } partitions = append(partitions, partition.GetPartitionID()) @@ -107,6 +119,7 @@ func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget return &CollectionTarget{ segments: segments, + channel2Segments: channel2Segments, partition2Segments: partition2Segments, dmChannels: dmChannels, partitions: typeutil.NewSet(partitions...), @@ -172,6 +185,10 @@ func (p *CollectionTarget) GetAllSegments() map[int64]*datapb.SegmentInfo { return p.segments } +func (p *CollectionTarget) GetChannelSegments(channel string) []*datapb.SegmentInfo { + return p.channel2Segments[channel] +} + func (p *CollectionTarget) GetPartitionSegments(partitionID int64) []*datapb.SegmentInfo { return p.partition2Segments[partitionID] } diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index af1644c5348cd..aad32e68d9e01 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -426,12 +426,9 @@ func (mgr *TargetManager) GetSealedSegmentsByChannel(ctx context.Context, collec targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { - ret := make(map[int64]*datapb.SegmentInfo) - for k, v := range t.GetAllSegments() { - if v.GetInsertChannel() == channelName { - ret[k] = v - } - } + ret := lo.KeyBy(t.GetChannelSegments(channelName), func(s *datapb.SegmentInfo) int64 { + return s.GetID() + }) if len(ret) > 0 { return ret diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index cc0563f38b2b3..371aa03b1ef3c 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -948,14 +948,18 @@ func (scheduler *taskScheduler) getTaskMetricsLabel(task Task) string { return metrics.UnknownTaskLabel } -func (scheduler *taskScheduler) checkStale(task Task) error { - log := log.Ctx(task.Context()).With( +func WrapTaskLog(task Task, fields ...zap.Field) []zap.Field { + res := []zap.Field{ zap.Int64("taskID", task.ID()), zap.Int64("collectionID", task.CollectionID()), zap.Int64("replicaID", task.ReplicaID()), zap.String("source", task.Source().String()), - ) + } + res = append(res, fields...) + return res +} +func (scheduler *taskScheduler) checkStale(task Task) error { switch task := task.(type) { case *SegmentTask: if err := scheduler.checkSegmentTaskStale(task); err != nil { @@ -982,7 +986,9 @@ func (scheduler *taskScheduler) checkStale(task Task) error { zap.Int("step", step)) if scheduler.nodeMgr.Get(action.Node()) == nil { - log.Warn("the task is stale, the target node is offline") + log.Warn("the task is stale, the target node is offline", WrapTaskLog(task, + zap.Int64("nodeID", action.Node()), + zap.Int("step", step))...) return merr.WrapErrNodeNotFound(action.Node()) } } @@ -991,38 +997,30 @@ func (scheduler *taskScheduler) checkStale(task Task) error { } func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) error { - log := log.Ctx(task.Context()).With( - zap.Int64("taskID", task.ID()), - zap.Int64("collectionID", task.CollectionID()), - zap.Int64("replicaID", task.ReplicaID()), - zap.String("source", task.Source().String()), - ) - for _, action := range task.Actions() { switch action.Type() { case ActionTypeGrow: if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.Node()); ok { - log.Warn("task stale due to node offline", zap.Int64("segment", task.segmentID)) + log.Ctx(task.Context()).Warn("task stale due to node offline", WrapTaskLog(task, zap.Int64("segment", task.segmentID))...) return merr.WrapErrNodeOffline(action.Node()) } taskType := GetTaskType(task) segment := scheduler.targetMgr.GetSealedSegment(task.ctx, task.CollectionID(), task.SegmentID(), meta.CurrentTargetFirst) if segment == nil { - log.Warn("task stale due to the segment to load not exists in targets", - zap.Int64("segment", task.segmentID), - zap.String("taskType", taskType.String()), - ) + log.Ctx(task.Context()).Warn("task stale due to the segment to load not exists in targets", + WrapTaskLog(task, zap.Int64("segment", task.segmentID), + zap.String("taskType", taskType.String()))...) return merr.WrapErrSegmentReduplicate(task.SegmentID(), "target doesn't contain this segment") } replica := scheduler.meta.ReplicaManager.GetByCollectionAndNode(task.ctx, task.CollectionID(), action.Node()) if replica == nil { - log.Warn("task stale due to replica not found") + log.Ctx(task.Context()).Warn("task stale due to replica not found", WrapTaskLog(task)...) return merr.WrapErrReplicaNotFound(task.CollectionID(), "by collectionID") } _, ok := scheduler.distMgr.GetShardLeader(replica, segment.GetInsertChannel()) if !ok { - log.Warn("task stale due to leader not found") + log.Ctx(task.Context()).Warn("task stale due to leader not found", WrapTaskLog(task)...) return merr.WrapErrChannelNotFound(segment.GetInsertChannel(), "failed to get shard delegator") } @@ -1034,23 +1032,16 @@ func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) error { } func (scheduler *taskScheduler) checkChannelTaskStale(task *ChannelTask) error { - log := log.Ctx(task.Context()).With( - zap.Int64("taskID", task.ID()), - zap.Int64("collectionID", task.CollectionID()), - zap.Int64("replicaID", task.ReplicaID()), - zap.String("source", task.Source().String()), - ) - for _, action := range task.Actions() { switch action.Type() { case ActionTypeGrow: if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.Node()); ok { - log.Warn("task stale due to node offline", zap.String("channel", task.Channel())) + log.Ctx(task.Context()).Warn("task stale due to node offline", WrapTaskLog(task, zap.String("channel", task.Channel()))...) return merr.WrapErrNodeOffline(action.Node()) } if scheduler.targetMgr.GetDmChannel(task.ctx, task.collectionID, task.Channel(), meta.NextTargetFirst) == nil { - log.Warn("the task is stale, the channel to subscribe not exists in targets", - zap.String("channel", task.Channel())) + log.Ctx(task.Context()).Warn("the task is stale, the channel to subscribe not exists in targets", + WrapTaskLog(task, zap.String("channel", task.Channel()))...) return merr.WrapErrChannelReduplicate(task.Channel(), "target doesn't contain this channel") } @@ -1062,48 +1053,41 @@ func (scheduler *taskScheduler) checkChannelTaskStale(task *ChannelTask) error { } func (scheduler *taskScheduler) checkLeaderTaskStale(task *LeaderTask) error { - log := log.Ctx(task.Context()).With( - zap.Int64("taskID", task.ID()), - zap.Int64("collectionID", task.CollectionID()), - zap.Int64("replicaID", task.ReplicaID()), - zap.String("source", task.Source().String()), - zap.Int64("leaderID", task.leaderID), - ) - for _, action := range task.Actions() { switch action.Type() { case ActionTypeGrow: if ok, _ := scheduler.nodeMgr.IsStoppingNode(action.(*LeaderAction).GetLeaderID()); ok { - log.Warn("task stale due to node offline", zap.Int64("segment", task.segmentID)) + log.Ctx(task.Context()).Warn("task stale due to node offline", + WrapTaskLog(task, zap.Int64("leaderID", task.leaderID), zap.Int64("segment", task.segmentID))...) return merr.WrapErrNodeOffline(action.Node()) } taskType := GetTaskType(task) segment := scheduler.targetMgr.GetSealedSegment(task.ctx, task.CollectionID(), task.SegmentID(), meta.CurrentTargetFirst) if segment == nil { - log.Warn("task stale due to the segment to load not exists in targets", - zap.Int64("segment", task.segmentID), - zap.String("taskType", taskType.String()), - ) + log.Ctx(task.Context()).Warn("task stale due to the segment to load not exists in targets", + WrapTaskLog(task, zap.Int64("leaderID", task.leaderID), + zap.Int64("segment", task.segmentID), + zap.String("taskType", taskType.String()))...) return merr.WrapErrSegmentReduplicate(task.SegmentID(), "target doesn't contain this segment") } replica := scheduler.meta.ReplicaManager.GetByCollectionAndNode(task.ctx, task.CollectionID(), action.Node()) if replica == nil { - log.Warn("task stale due to replica not found") + log.Ctx(task.Context()).Warn("task stale due to replica not found", WrapTaskLog(task, zap.Int64("leaderID", task.leaderID))...) return merr.WrapErrReplicaNotFound(task.CollectionID(), "by collectionID") } view := scheduler.distMgr.GetLeaderShardView(task.leaderID, task.Shard()) if view == nil { - log.Warn("task stale due to leader not found") + log.Ctx(task.Context()).Warn("task stale due to leader not found", WrapTaskLog(task, zap.Int64("leaderID", task.leaderID))...) return merr.WrapErrChannelNotFound(task.Shard(), "failed to get shard delegator") } case ActionTypeReduce: view := scheduler.distMgr.GetLeaderShardView(task.leaderID, task.Shard()) if view == nil { - log.Warn("task stale due to leader not found") + log.Ctx(task.Context()).Warn("task stale due to leader not found", WrapTaskLog(task, zap.Int64("leaderID", task.leaderID))...) return merr.WrapErrChannelNotFound(task.Shard(), "failed to get shard delegator") } } From d59d1e72da91e60c8e9b259241c652e7f49b88e0 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Tue, 31 Dec 2024 16:55:04 +0800 Subject: [PATCH 05/12] fix Signed-off-by: bigsheeper --- internal/querycoordv2/meta/target.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/querycoordv2/meta/target.go b/internal/querycoordv2/meta/target.go index 1ed9714b6239f..91006103b4ecf 100644 --- a/internal/querycoordv2/meta/target.go +++ b/internal/querycoordv2/meta/target.go @@ -60,6 +60,7 @@ func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[ } return &CollectionTarget{ segments: segments, + channel2Segments: channel2Segments, partition2Segments: partition2Segments, dmChannels: dmChannels, partitions: typeutil.NewSet(partitionIDs...), From 7580181cbced80acb2352bb32aee750726dae601 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Wed, 1 Jan 2025 15:29:51 +0800 Subject: [PATCH 06/12] fix ut Signed-off-by: bigsheeper --- internal/querycoordv2/task/task_test.go | 39 +++++++------------------ 1 file changed, 10 insertions(+), 29 deletions(-) diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index aebf4b996276e..6247f1f27bc5c 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -771,26 +771,14 @@ func (suite *TaskSuite) TestReleaseGrowingSegmentTask() { suite.NoError(err) } - growings := map[int64]*meta.Segment{} - for _, segment := range suite.releaseSegments[1:] { - growings[segment] = utils.CreateTestSegment(suite.collection, 1, segment, targetNode, 1, "") - } - suite.dist.LeaderViewManager.Update(targetNode, &meta.LeaderView{ - ID: targetNode, - GrowingSegments: growings, - }) - segmentsNum := len(suite.releaseSegments) suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum) - // Process tasks + // Process tasks and Release done suite.dispatchAndWait(targetNode) - suite.AssertTaskNum(segmentsNum-1, 0, 0, segmentsNum-1) - - // Release done - suite.dist.LeaderViewManager.Update(targetNode) + suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum) - // Process tasks done + // Tasks removed suite.dispatchAndWait(targetNode) suite.AssertTaskNum(0, 0, 0, 0) @@ -1090,7 +1078,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() { CollectionID: suite.collection, }, }, nil) - for _, segment := range suite.loadSegments { + for _, segment := range suite.loadSegments[1:] { suite.broker.EXPECT().GetSegmentInfo(mock.Anything, segment).Return([]*datapb.SegmentInfo{ { ID: segment, @@ -1131,15 +1119,6 @@ func (suite *TaskSuite) TestSegmentTaskStale() { err = suite.scheduler.Add(task) suite.NoError(err) } - suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil) - suite.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(suite.collection, partition)) - suite.target.UpdateCollectionNextTarget(ctx, suite.collection) - segmentsNum := len(suite.loadSegments) - suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum) - - // Process tasks - suite.dispatchAndWait(targetNode) - suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum) segments = make([]*datapb.SegmentInfo, 0) for _, segment := range suite.loadSegments[1:] { @@ -1149,13 +1128,16 @@ func (suite *TaskSuite) TestSegmentTaskStale() { InsertChannel: channel.GetChannelName(), }) } - bakExpectations := suite.broker.ExpectedCalls - suite.broker.AssertExpectations(suite.T()) - suite.broker.ExpectedCalls = suite.broker.ExpectedCalls[:0] suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, suite.collection).Return([]*datapb.VchannelInfo{channel}, segments, nil) suite.meta.CollectionManager.PutPartition(ctx, utils.CreateTestPartition(suite.collection, 2)) suite.target.UpdateCollectionNextTarget(ctx, suite.collection) + + // process done + suite.dispatchAndWait(targetNode) + suite.AssertTaskNum(1, 0, 0, 1) + + // task removed suite.dispatchAndWait(targetNode) suite.AssertTaskNum(0, 0, 0, 0) @@ -1168,7 +1150,6 @@ func (suite *TaskSuite) TestSegmentTaskStale() { suite.NoError(task.Err()) } } - suite.broker.ExpectedCalls = bakExpectations } func (suite *TaskSuite) TestChannelTaskReplace() { From 9e4c85929f7900587aa6be34b9185bc94b3ecca9 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Wed, 1 Jan 2025 15:58:47 +0800 Subject: [PATCH 07/12] code format Signed-off-by: bigsheeper --- internal/querycoordv2/task/task_test.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 6247f1f27bc5c..d0612c7f20ae9 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -1099,13 +1099,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() { })) suite.dist.LeaderViewManager.Update(targetNode, utils.CreateTestLeaderView(targetNode, suite.collection, channel.ChannelName, map[int64]int64{}, map[int64]*meta.Segment{})) tasks := []Task{} - segments := make([]*datapb.SegmentInfo, 0) for _, segment := range suite.loadSegments { - segments = append(segments, &datapb.SegmentInfo{ - ID: segment, - PartitionID: 1, - InsertChannel: channel.GetChannelName(), - }) task, err := NewSegmentTask( ctx, timeout, @@ -1120,7 +1114,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() { suite.NoError(err) } - segments = make([]*datapb.SegmentInfo, 0) + segments := make([]*datapb.SegmentInfo, 0) for _, segment := range suite.loadSegments[1:] { segments = append(segments, &datapb.SegmentInfo{ ID: segment, From 19c5d33fa6f60b54d26673bb97bb05c1ef2171ec Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Thu, 2 Jan 2025 20:40:25 +0800 Subject: [PATCH 08/12] enhance: Reducing the granularity of locks in the target manager Signed-off-by: bigsheeper --- internal/querycoordv2/meta/target_manager.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index aad32e68d9e01..b808d3f55627b 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -157,8 +157,6 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(ctx context.Context, collec return err } - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() partitions := mgr.meta.GetPartitionsByCollection(ctx, collectionID) partitionIDs := lo.Map(partitions, func(partition *Partition, i int) int64 { return partition.PartitionID @@ -188,7 +186,7 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(ctx context.Context, collec } for _, infos := range channelInfos { - merged := mgr.mergeDmChannelInfo(infos) + merged := mergeDmChannelInfo(infos) dmChannels[merged.GetChannelName()] = merged } @@ -198,7 +196,11 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(ctx context.Context, collec } allocatedTarget := NewCollectionTarget(segments, dmChannels, partitionIDs) + + mgr.rwMutex.Lock() mgr.next.updateCollectionTarget(collectionID, allocatedTarget) + mgr.rwMutex.Unlock() + log.Debug("finish to update next targets for collection", zap.Int64("collectionID", collectionID), zap.Int64s("PartitionIDs", partitionIDs)) @@ -206,7 +208,7 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(ctx context.Context, collec return nil } -func (mgr *TargetManager) mergeDmChannelInfo(infos []*datapb.VchannelInfo) *DmChannel { +func mergeDmChannelInfo(infos []*datapb.VchannelInfo) *DmChannel { var dmChannel *DmChannel for _, info := range infos { From c99949b60000ffacb6a2602b714f213ec1a9b347 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Fri, 3 Jan 2025 11:00:41 +0800 Subject: [PATCH 09/12] enhance: Remove scheduler and target manager mutex Signed-off-by: bigsheeper --- internal/querycoordv2/meta/target.go | 26 ++- internal/querycoordv2/meta/target_manager.go | 69 +----- .../querycoordv2/meta/target_manager_test.go | 35 +-- internal/querycoordv2/task/scheduler.go | 209 +++++++++--------- internal/querycoordv2/task/task_test.go | 13 +- pkg/metrics/querycoord_metrics.go | 1 + 6 files changed, 162 insertions(+), 191 deletions(-) diff --git a/internal/querycoordv2/meta/target.go b/internal/querycoordv2/meta/target.go index 91006103b4ecf..1b357af21de9a 100644 --- a/internal/querycoordv2/meta/target.go +++ b/internal/querycoordv2/meta/target.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/util/metrics" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -220,34 +221,40 @@ func (p *CollectionTarget) Ready() bool { } type target struct { + keyLock *lock.KeyLock[int64] // guards updateCollectionTarget // just maintain target at collection level - collectionTargetMap map[int64]*CollectionTarget + collectionTargetMap *typeutil.ConcurrentMap[int64, *CollectionTarget] } func newTarget() *target { return &target{ - collectionTargetMap: make(map[int64]*CollectionTarget), + keyLock: lock.NewKeyLock[int64](), + collectionTargetMap: typeutil.NewConcurrentMap[int64, *CollectionTarget](), } } func (t *target) updateCollectionTarget(collectionID int64, target *CollectionTarget) { - if t.collectionTargetMap[collectionID] != nil && target.GetTargetVersion() <= t.collectionTargetMap[collectionID].GetTargetVersion() { + t.keyLock.Lock(collectionID) + defer t.keyLock.Unlock(collectionID) + if old, ok := t.collectionTargetMap.Get(collectionID); ok && old != nil && target.GetTargetVersion() <= old.GetTargetVersion() { return } - t.collectionTargetMap[collectionID] = target + t.collectionTargetMap.Insert(collectionID, target) } func (t *target) removeCollectionTarget(collectionID int64) { - delete(t.collectionTargetMap, collectionID) + t.collectionTargetMap.Remove(collectionID) } func (t *target) getCollectionTarget(collectionID int64) *CollectionTarget { - return t.collectionTargetMap[collectionID] + ret, _ := t.collectionTargetMap.Get(collectionID) + return ret } func (t *target) toQueryCoordCollectionTargets() []*metricsinfo.QueryCoordTarget { - return lo.MapToSlice(t.collectionTargetMap, func(k int64, v *CollectionTarget) *metricsinfo.QueryCoordTarget { + targets := make([]*metricsinfo.QueryCoordTarget, 0, t.collectionTargetMap.Len()) + t.collectionTargetMap.Range(func(k int64, v *CollectionTarget) bool { segments := lo.MapToSlice(v.GetAllSegments(), func(k int64, s *datapb.SegmentInfo) *metricsinfo.Segment { return metrics.NewSegmentFrom(s) }) @@ -256,10 +263,13 @@ func (t *target) toQueryCoordCollectionTargets() []*metricsinfo.QueryCoordTarget return metrics.NewDMChannelFrom(ch.VchannelInfo) }) - return &metricsinfo.QueryCoordTarget{ + qct := &metricsinfo.QueryCoordTarget{ CollectionID: k, Segments: segments, DMChannels: dmChannels, } + targets = append(targets, qct) + return true }) + return targets } diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index b808d3f55627b..b82032023bc7d 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -76,9 +76,8 @@ type TargetManagerInterface interface { } type TargetManager struct { - rwMutex sync.RWMutex - broker Broker - meta *Meta + broker Broker + meta *Meta // all read segment/channel operation happens on current -> only current target are visible to outer // all add segment/channel operation happens on next -> changes can only happen on next target @@ -100,8 +99,6 @@ func NewTargetManager(broker Broker, meta *Meta) *TargetManager { // WARN: DO NOT call this method for an existing collection as target observer running, or it will lead to a double-update, // which may make the current target not available func (mgr *TargetManager) UpdateCollectionCurrentTarget(ctx context.Context, collectionID int64) bool { - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() log := log.With(zap.Int64("collectionID", collectionID)) log.Debug("start to update current target for collection") @@ -197,9 +194,7 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(ctx context.Context, collec allocatedTarget := NewCollectionTarget(segments, dmChannels, partitionIDs) - mgr.rwMutex.Lock() mgr.next.updateCollectionTarget(collectionID, allocatedTarget) - mgr.rwMutex.Unlock() log.Debug("finish to update next targets for collection", zap.Int64("collectionID", collectionID), @@ -230,8 +225,6 @@ func mergeDmChannelInfo(infos []*datapb.VchannelInfo) *DmChannel { // RemoveCollection removes all channels and segments in the given collection func (mgr *TargetManager) RemoveCollection(ctx context.Context, collectionID int64) { - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() log.Info("remove collection from targets", zap.Int64("collectionID", collectionID)) @@ -252,9 +245,6 @@ func (mgr *TargetManager) RemoveCollection(ctx context.Context, collectionID int // RemovePartition removes all segment in the given partition, // NOTE: this doesn't remove any channel even the given one is the only partition func (mgr *TargetManager) RemovePartition(ctx context.Context, collectionID int64, partitionIDs ...int64) { - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() - log := log.With(zap.Int64("collectionID", collectionID), zap.Int64s("PartitionIDs", partitionIDs)) @@ -361,9 +351,6 @@ func (mgr *TargetManager) getCollectionTarget(scope TargetScope, collectionID in func (mgr *TargetManager) GetGrowingSegmentsByCollection(ctx context.Context, collectionID int64, scope TargetScope, ) typeutil.UniqueSet { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { @@ -384,9 +371,6 @@ func (mgr *TargetManager) GetGrowingSegmentsByChannel(ctx context.Context, colle channelName string, scope TargetScope, ) typeutil.UniqueSet { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { segments := typeutil.NewUniqueSet() @@ -407,9 +391,6 @@ func (mgr *TargetManager) GetGrowingSegmentsByChannel(ctx context.Context, colle func (mgr *TargetManager) GetSealedSegmentsByCollection(ctx context.Context, collectionID int64, scope TargetScope, ) map[int64]*datapb.SegmentInfo { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { @@ -423,9 +404,6 @@ func (mgr *TargetManager) GetSealedSegmentsByChannel(ctx context.Context, collec channelName string, scope TargetScope, ) map[int64]*datapb.SegmentInfo { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { ret := lo.KeyBy(t.GetChannelSegments(channelName), func(s *datapb.SegmentInfo) int64 { @@ -444,9 +422,6 @@ func (mgr *TargetManager) GetDroppedSegmentsByChannel(ctx context.Context, colle channelName string, scope TargetScope, ) []int64 { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { if channel, ok := t.dmChannels[channelName]; ok { @@ -461,9 +436,6 @@ func (mgr *TargetManager) GetSealedSegmentsByPartition(ctx context.Context, coll partitionID int64, scope TargetScope, ) map[int64]*datapb.SegmentInfo { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { segments := make(map[int64]*datapb.SegmentInfo) @@ -480,9 +452,6 @@ func (mgr *TargetManager) GetSealedSegmentsByPartition(ctx context.Context, coll } func (mgr *TargetManager) GetDmChannelsByCollection(ctx context.Context, collectionID int64, scope TargetScope) map[string]*DmChannel { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { @@ -493,9 +462,6 @@ func (mgr *TargetManager) GetDmChannelsByCollection(ctx context.Context, collect } func (mgr *TargetManager) GetDmChannel(ctx context.Context, collectionID int64, channel string, scope TargetScope) *DmChannel { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { if ch, ok := t.GetAllDmChannels()[channel]; ok { @@ -506,9 +472,6 @@ func (mgr *TargetManager) GetDmChannel(ctx context.Context, collectionID int64, } func (mgr *TargetManager) GetSealedSegment(ctx context.Context, collectionID int64, id int64, scope TargetScope) *datapb.SegmentInfo { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { if s, ok := t.GetAllSegments()[id]; ok { @@ -520,9 +483,6 @@ func (mgr *TargetManager) GetSealedSegment(ctx context.Context, collectionID int } func (mgr *TargetManager) GetCollectionTargetVersion(ctx context.Context, collectionID int64, scope TargetScope) int64 { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(scope, collectionID) for _, t := range targets { if t.GetTargetVersion() > 0 { @@ -534,9 +494,6 @@ func (mgr *TargetManager) GetCollectionTargetVersion(ctx context.Context, collec } func (mgr *TargetManager) IsCurrentTargetExist(ctx context.Context, collectionID int64, partitionID int64) bool { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - targets := mgr.getCollectionTarget(CurrentTarget, collectionID) return len(targets) > 0 && (targets[0].partitions.Contain(partitionID) || partitionID == common.AllPartitionsID) && len(targets[0].dmChannels) > 0 @@ -549,8 +506,6 @@ func (mgr *TargetManager) IsNextTargetExist(ctx context.Context, collectionID in } func (mgr *TargetManager) SaveCurrentTarget(ctx context.Context, catalog metastore.QueryCoordCatalog) { - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() if mgr.current != nil { // use pool here to control maximal writer used by save target pool := conc.NewPool[any](runtime.GOMAXPROCS(0) * 2) @@ -574,13 +529,14 @@ func (mgr *TargetManager) SaveCurrentTarget(ctx context.Context, catalog metasto }) } tasks := make([]typeutil.Pair[int64, *querypb.CollectionTarget], 0, batchSize) - for id, target := range mgr.current.collectionTargetMap { + mgr.current.collectionTargetMap.Range(func(id int64, target *CollectionTarget) bool { tasks = append(tasks, typeutil.NewPair(id, target.toPbMsg())) if len(tasks) >= batchSize { submit(tasks) tasks = make([]typeutil.Pair[int64, *querypb.CollectionTarget], 0, batchSize) } - } + return true + }) if len(tasks) > 0 { submit(tasks) } @@ -589,9 +545,6 @@ func (mgr *TargetManager) SaveCurrentTarget(ctx context.Context, catalog metasto } func (mgr *TargetManager) Recover(ctx context.Context, catalog metastore.QueryCoordCatalog) error { - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() - targets, err := catalog.GetCollectionTargets(ctx) if err != nil { log.Warn("failed to recover collection target from etcd", zap.Error(err)) @@ -620,8 +573,6 @@ func (mgr *TargetManager) Recover(ctx context.Context, catalog metastore.QueryCo // if segment isn't l0 segment, and exist in current/next target, then it can be moved func (mgr *TargetManager) CanSegmentBeMoved(ctx context.Context, collectionID, segmentID int64) bool { - mgr.rwMutex.Lock() - defer mgr.rwMutex.Unlock() current := mgr.current.getCollectionTarget(collectionID) if current != nil && current.segments[segmentID] != nil && current.segments[segmentID].GetLevel() != datapb.SegmentLevel_L0 { return true @@ -636,9 +587,6 @@ func (mgr *TargetManager) CanSegmentBeMoved(ctx context.Context, collectionID, s } func (mgr *TargetManager) GetTargetJSON(ctx context.Context, scope TargetScope) string { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - ret := mgr.getTarget(scope) if ret == nil { return "" @@ -653,9 +601,6 @@ func (mgr *TargetManager) GetTargetJSON(ctx context.Context, scope TargetScope) } func (mgr *TargetManager) GetPartitions(ctx context.Context, collectionID int64, scope TargetScope) ([]int64, error) { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - ret := mgr.getCollectionTarget(scope, collectionID) if len(ret) == 0 { return nil, merr.WrapErrCollectionNotLoaded(collectionID) @@ -673,9 +618,7 @@ func (mgr *TargetManager) getTarget(scope TargetScope) *target { } func (mgr *TargetManager) IsCurrentTargetReady(ctx context.Context, collectionID int64) bool { - mgr.rwMutex.RLock() - defer mgr.rwMutex.RUnlock() - target, ok := mgr.current.collectionTargetMap[collectionID] + target, ok := mgr.current.collectionTargetMap.Get(collectionID) if !ok { return false } diff --git a/internal/querycoordv2/meta/target_manager_test.go b/internal/querycoordv2/meta/target_manager_test.go index 34bf64136a2e2..95510a6cce895 100644 --- a/internal/querycoordv2/meta/target_manager_test.go +++ b/internal/querycoordv2/meta/target_manager_test.go @@ -425,33 +425,38 @@ func (suite *TargetManagerSuite) TestGetTarget() { current := &CollectionTarget{} next := &CollectionTarget{} + t1 := typeutil.NewConcurrentMap[int64, *CollectionTarget]() + t2 := typeutil.NewConcurrentMap[int64, *CollectionTarget]() + t3 := typeutil.NewConcurrentMap[int64, *CollectionTarget]() + t4 := typeutil.NewConcurrentMap[int64, *CollectionTarget]() + t1.Insert(1000, current) + t2.Insert(1000, next) + t3.Insert(1000, current) + t4.Insert(1000, current) + bothMgr := &TargetManager{ current: &target{ - collectionTargetMap: map[int64]*CollectionTarget{ - 1000: current, - }, + collectionTargetMap: t1, }, next: &target{ - collectionTargetMap: map[int64]*CollectionTarget{ - 1000: next, - }, + collectionTargetMap: t2, }, } currentMgr := &TargetManager{ current: &target{ - collectionTargetMap: map[int64]*CollectionTarget{ - 1000: current, - }, + collectionTargetMap: t3, + }, + next: &target{ + collectionTargetMap: typeutil.NewConcurrentMap[int64, *CollectionTarget](), }, - next: &target{}, } nextMgr := &TargetManager{ next: &target{ - collectionTargetMap: map[int64]*CollectionTarget{ - 1000: current, - }, + collectionTargetMap: t4, + }, + current: &target{ + collectionTargetMap: typeutil.NewConcurrentMap[int64, *CollectionTarget](), }, - current: &target{}, } cases := []testCase{ @@ -720,7 +725,7 @@ func BenchmarkTargetManager(b *testing.B) { collectionNum := 10000 for i := 0; i < collectionNum; i++ { - mgr.current.collectionTargetMap[int64(i)] = NewCollectionTarget(segments, channels, nil) + mgr.current.collectionTargetMap.Insert(int64(i), NewCollectionTarget(segments, channels, nil)) } b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 371aa03b1ef3c..8d97a48b82d23 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -38,6 +38,7 @@ import ( "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/hardware" + "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/timerecord" . "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -92,6 +93,7 @@ type replicaChannelIndex struct { } type taskQueue struct { + mu sync.RWMutex // TaskPriority -> TaskID -> Task buckets []map[int64]Task } @@ -107,6 +109,8 @@ func newTaskQueue() *taskQueue { } func (queue *taskQueue) Len() int { + queue.mu.RLock() + defer queue.mu.RUnlock() taskNum := 0 for _, tasks := range queue.buckets { taskNum += len(tasks) @@ -116,17 +120,23 @@ func (queue *taskQueue) Len() int { } func (queue *taskQueue) Add(task Task) { + queue.mu.Lock() + defer queue.mu.Unlock() bucket := queue.buckets[task.Priority()] bucket[task.ID()] = task } func (queue *taskQueue) Remove(task Task) { + queue.mu.Lock() + defer queue.mu.Unlock() bucket := queue.buckets[task.Priority()] delete(bucket, task.ID()) } // Range iterates all tasks in the queue ordered by priority from high to low func (queue *taskQueue) Range(fn func(task Task) bool) { + queue.mu.RLock() + defer queue.mu.RUnlock() for priority := len(queue.buckets) - 1; priority >= 0; priority-- { for _, task := range queue.buckets[priority] { if !fn(task) { @@ -154,9 +164,8 @@ type Scheduler interface { } type taskScheduler struct { - rwmutex sync.RWMutex ctx context.Context - executors map[int64]*Executor // NodeID -> Executor + executors *ConcurrentMap[int64, *Executor] // NodeID -> Executor idAllocator func() UniqueID distMgr *meta.DistributionManager @@ -166,9 +175,10 @@ type taskScheduler struct { cluster session.Cluster nodeMgr *session.NodeManager - tasks UniqueSet - segmentTasks map[replicaSegmentIndex]Task - channelTasks map[replicaChannelIndex]Task + collKeyLock *lock.KeyLock[int64] // guards Add() + tasks *ConcurrentMap[UniqueID, struct{}] + segmentTasks *ConcurrentMap[replicaSegmentIndex, Task] + channelTasks *ConcurrentMap[replicaChannelIndex, Task] processQueue *taskQueue waitQueue *taskQueue taskStats *expirable.LRU[UniqueID, Task] @@ -185,7 +195,7 @@ func NewScheduler(ctx context.Context, id := time.Now().UnixMilli() return &taskScheduler{ ctx: ctx, - executors: make(map[int64]*Executor), + executors: NewConcurrentMap[int64, *Executor](), idAllocator: func() UniqueID { id++ return id @@ -198,9 +208,10 @@ func NewScheduler(ctx context.Context, cluster: cluster, nodeMgr: nodeMgr, - tasks: make(UniqueSet), - segmentTasks: make(map[replicaSegmentIndex]Task), - channelTasks: make(map[replicaChannelIndex]Task), + collKeyLock: lock.NewKeyLock[int64](), + tasks: NewConcurrentMap[UniqueID, struct{}](), + segmentTasks: NewConcurrentMap[replicaSegmentIndex, Task](), + channelTasks: NewConcurrentMap[replicaChannelIndex, Task](), processQueue: newTaskQueue(), waitQueue: newTaskQueue(), taskStats: expirable.NewLRU[UniqueID, Task](64, nil, time.Minute*15), @@ -210,30 +221,22 @@ func NewScheduler(ctx context.Context, func (scheduler *taskScheduler) Start() {} func (scheduler *taskScheduler) Stop() { - scheduler.rwmutex.Lock() - defer scheduler.rwmutex.Unlock() - - for nodeID, executor := range scheduler.executors { + scheduler.executors.Range(func(nodeID int64, executor *Executor) bool { executor.Stop() - delete(scheduler.executors, nodeID) - } + return true + }) - for _, task := range scheduler.segmentTasks { + scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool { scheduler.remove(task) - } - for _, task := range scheduler.channelTasks { + return true + }) + scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool { scheduler.remove(task) - } + return true + }) } func (scheduler *taskScheduler) AddExecutor(nodeID int64) { - scheduler.rwmutex.Lock() - defer scheduler.rwmutex.Unlock() - - if _, exist := scheduler.executors[nodeID]; exist { - return - } - executor := NewExecutor(scheduler.meta, scheduler.distMgr, scheduler.broker, @@ -241,27 +244,24 @@ func (scheduler *taskScheduler) AddExecutor(nodeID int64) { scheduler.cluster, scheduler.nodeMgr) - scheduler.executors[nodeID] = executor + if _, exist := scheduler.executors.GetOrInsert(nodeID, executor); exist { + return + } executor.Start(scheduler.ctx) log.Ctx(scheduler.ctx).Info("add executor for new QueryNode", zap.Int64("nodeID", nodeID)) } func (scheduler *taskScheduler) RemoveExecutor(nodeID int64) { - scheduler.rwmutex.Lock() - defer scheduler.rwmutex.Unlock() - - executor, ok := scheduler.executors[nodeID] + executor, ok := scheduler.executors.GetAndRemove(nodeID) if ok { executor.Stop() - delete(scheduler.executors, nodeID) log.Ctx(scheduler.ctx).Info("remove executor of offline QueryNode", zap.Int64("nodeID", nodeID)) } } func (scheduler *taskScheduler) Add(task Task) error { - scheduler.rwmutex.Lock() - defer scheduler.rwmutex.Unlock() - + scheduler.collKeyLock.Lock(task.CollectionID()) + defer scheduler.collKeyLock.Unlock(task.CollectionID()) err := scheduler.preAdd(task) if err != nil { task.Cancel(err) @@ -270,19 +270,19 @@ func (scheduler *taskScheduler) Add(task Task) error { task.SetID(scheduler.idAllocator()) scheduler.waitQueue.Add(task) - scheduler.tasks.Insert(task.ID()) + scheduler.tasks.Insert(task.ID(), struct{}{}) switch task := task.(type) { case *SegmentTask: index := NewReplicaSegmentIndex(task) - scheduler.segmentTasks[index] = task + scheduler.segmentTasks.Insert(index, task) case *ChannelTask: index := replicaChannelIndex{task.ReplicaID(), task.Channel()} - scheduler.channelTasks[index] = task + scheduler.channelTasks.Insert(index, task) case *LeaderTask: index := NewReplicaLeaderIndex(task) - scheduler.segmentTasks[index] = task + scheduler.segmentTasks.Insert(index, task) } scheduler.taskStats.Add(task.ID(), task) @@ -293,21 +293,39 @@ func (scheduler *taskScheduler) Add(task Task) error { } func (scheduler *taskScheduler) updateTaskMetrics() { - segmentGrowNum, segmentReduceNum, segmentMoveNum := 0, 0, 0 + segmentGrowNum, segmentReduceNum, segmentUpdateNum, segmentMoveNum := 0, 0, 0, 0 + leaderGrowNum, leaderReduceNum, leaderUpdateNum := 0, 0, 0 channelGrowNum, channelReduceNum, channelMoveNum := 0, 0, 0 - for _, task := range scheduler.segmentTasks { - taskType := GetTaskType(task) - switch taskType { - case TaskTypeGrow: - segmentGrowNum++ - case TaskTypeReduce: - segmentReduceNum++ - case TaskTypeMove: + scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool { + switch { + case len(task.Actions()) > 1: segmentMoveNum++ + case task.Actions()[0].Type() == ActionTypeGrow: + if _, ok := task.Actions()[0].(*SegmentAction); ok { + segmentGrowNum++ + } + if _, ok := task.Actions()[0].(*LeaderAction); ok { + leaderGrowNum++ + } + case task.Actions()[0].Type() == ActionTypeReduce: + if _, ok := task.Actions()[0].(*SegmentAction); ok { + segmentReduceNum++ + } + if _, ok := task.Actions()[0].(*LeaderAction); ok { + leaderReduceNum++ + } + case task.Actions()[0].Type() == ActionTypeUpdate: + if _, ok := task.Actions()[0].(*SegmentAction); ok { + segmentUpdateNum++ + } + if _, ok := task.Actions()[0].(*LeaderAction); ok { + leaderUpdateNum++ + } } - } + return true + }) - for _, task := range scheduler.channelTasks { + scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool { taskType := GetTaskType(task) switch taskType { case TaskTypeGrow: @@ -317,11 +335,18 @@ func (scheduler *taskScheduler) updateTaskMetrics() { case TaskTypeMove: channelMoveNum++ } - } + return true + }) metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentGrowTaskLabel).Set(float64(segmentGrowNum)) metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentReduceTaskLabel).Set(float64(segmentReduceNum)) metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentMoveTaskLabel).Set(float64(segmentMoveNum)) + metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentUpdateTaskLabel).Set(float64(segmentUpdateNum)) + + metrics.QueryCoordTaskNum.WithLabelValues(metrics.LeaderGrowTaskLabel).Set(float64(leaderGrowNum)) + metrics.QueryCoordTaskNum.WithLabelValues(metrics.LeaderReduceTaskLabel).Set(float64(leaderReduceNum)) + metrics.QueryCoordTaskNum.WithLabelValues(metrics.LeaderUpdateTaskLabel).Set(float64(leaderUpdateNum)) + metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelGrowTaskLabel).Set(float64(channelGrowNum)) metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelReduceTaskLabel).Set(float64(channelReduceNum)) metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelMoveTaskLabel).Set(float64(channelMoveNum)) @@ -333,7 +358,7 @@ func (scheduler *taskScheduler) preAdd(task Task) error { switch task := task.(type) { case *SegmentTask: index := NewReplicaSegmentIndex(task) - if old, ok := scheduler.segmentTasks[index]; ok { + if old, ok := scheduler.segmentTasks.Get(index); ok { if task.Priority() > old.Priority() { log.Ctx(scheduler.ctx).Info("replace old task, the new one with higher priority", zap.Int64("oldID", old.ID()), @@ -366,7 +391,7 @@ func (scheduler *taskScheduler) preAdd(task Task) error { case *ChannelTask: index := replicaChannelIndex{task.ReplicaID(), task.Channel()} - if old, ok := scheduler.channelTasks[index]; ok { + if old, ok := scheduler.channelTasks.Get(index); ok { if task.Priority() > old.Priority() { log.Ctx(scheduler.ctx).Info("replace old task, the new one with higher priority", zap.Int64("oldID", old.ID()), @@ -399,7 +424,7 @@ func (scheduler *taskScheduler) preAdd(task Task) error { } case *LeaderTask: index := NewReplicaLeaderIndex(task) - if old, ok := scheduler.segmentTasks[index]; ok { + if old, ok := scheduler.segmentTasks.Get(index); ok { if task.Priority() > old.Priority() { log.Ctx(scheduler.ctx).Info("replace old task, the new one with higher priority", zap.Int64("oldID", old.ID()), @@ -478,46 +503,40 @@ func (scheduler *taskScheduler) Dispatch(node int64) { log.Ctx(scheduler.ctx).Info("scheduler stopped") default: - scheduler.rwmutex.Lock() - defer scheduler.rwmutex.Unlock() scheduler.schedule(node) } } func (scheduler *taskScheduler) GetSegmentTaskDelta(nodeID, collectionID int64) int { - scheduler.rwmutex.RLock() - defer scheduler.rwmutex.RUnlock() - targetActions := make(map[int64][]Action) - for _, task := range scheduler.segmentTasks { // Map key: replicaSegmentIndex + scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool { taskCollID := task.CollectionID() if collectionID != -1 && collectionID != taskCollID { - continue + return true } actions := filterActions(task.Actions(), nodeID) if len(actions) > 0 { targetActions[taskCollID] = append(targetActions[taskCollID], actions...) } - } + return true + }) return scheduler.calculateTaskDelta(targetActions) } func (scheduler *taskScheduler) GetChannelTaskDelta(nodeID, collectionID int64) int { - scheduler.rwmutex.RLock() - defer scheduler.rwmutex.RUnlock() - targetActions := make(map[int64][]Action) - for _, task := range scheduler.channelTasks { // Map key: replicaChannelIndex + scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool { taskCollID := task.CollectionID() if collectionID != -1 && collectionID != taskCollID { - continue + return true } actions := filterActions(task.Actions(), nodeID) if len(actions) > 0 { targetActions[taskCollID] = append(targetActions[taskCollID], actions...) } - } + return true + }) return scheduler.calculateTaskDelta(targetActions) } @@ -562,10 +581,7 @@ func (scheduler *taskScheduler) calculateTaskDelta(targetActions map[int64][]Act } func (scheduler *taskScheduler) GetExecutedFlag(nodeID int64) <-chan struct{} { - scheduler.rwmutex.RLock() - defer scheduler.rwmutex.RUnlock() - - executor, ok := scheduler.executors[nodeID] + executor, ok := scheduler.executors.Get(nodeID) if !ok { return nil } @@ -588,16 +604,13 @@ func WithTaskTypeFilter(taskType Type) TaskFilter { } func (scheduler *taskScheduler) GetChannelTaskNum(filters ...TaskFilter) int { - scheduler.rwmutex.RLock() - defer scheduler.rwmutex.RUnlock() - if len(filters) == 0 { - return len(scheduler.channelTasks) + return scheduler.channelTasks.Len() } // rewrite this with for loop counter := 0 - for _, task := range scheduler.channelTasks { + scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool { allMatch := true for _, filter := range filters { if !filter(task) { @@ -608,21 +621,19 @@ func (scheduler *taskScheduler) GetChannelTaskNum(filters ...TaskFilter) int { if allMatch { counter++ } - } + return true + }) return counter } func (scheduler *taskScheduler) GetSegmentTaskNum(filters ...TaskFilter) int { - scheduler.rwmutex.RLock() - defer scheduler.rwmutex.RUnlock() - if len(filters) == 0 { - return len(scheduler.segmentTasks) + scheduler.segmentTasks.Len() } // rewrite this with for loop counter := 0 - for _, task := range scheduler.segmentTasks { + scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool { allMatch := true for _, filter := range filters { if !filter(task) { @@ -633,7 +644,8 @@ func (scheduler *taskScheduler) GetSegmentTaskNum(filters ...TaskFilter) int { if allMatch { counter++ } - } + return true + }) return counter } @@ -669,8 +681,8 @@ func (scheduler *taskScheduler) schedule(node int64) { log.Debug("process tasks related to node", zap.Int("processingTaskNum", scheduler.processQueue.Len()), zap.Int("waitingTaskNum", scheduler.waitQueue.Len()), - zap.Int("segmentTaskNum", len(scheduler.segmentTasks)), - zap.Int("channelTaskNum", len(scheduler.channelTasks)), + zap.Int("segmentTaskNum", scheduler.segmentTasks.Len()), + zap.Int("channelTaskNum", scheduler.channelTasks.Len()), ) // Process tasks @@ -718,8 +730,8 @@ func (scheduler *taskScheduler) schedule(node int64) { log.Info("process tasks related to node done", zap.Int("processingTaskNum", scheduler.processQueue.Len()), zap.Int("waitingTaskNum", scheduler.waitQueue.Len()), - zap.Int("segmentTaskNum", len(scheduler.segmentTasks)), - zap.Int("channelTaskNum", len(scheduler.channelTasks)), + zap.Int("segmentTaskNum", scheduler.segmentTasks.Len()), + zap.Int("channelTaskNum", scheduler.channelTasks.Len()), ) } @@ -815,7 +827,7 @@ func (scheduler *taskScheduler) process(task Task) bool { ) actions, step := task.Actions(), task.Step() - executor, ok := scheduler.executors[actions[step].Node()] + executor, ok := scheduler.executors.Get(actions[step].Node()) if !ok { log.Warn("no executor for QueryNode", zap.Int("step", step), @@ -836,19 +848,18 @@ func (scheduler *taskScheduler) check(task Task) error { } func (scheduler *taskScheduler) RemoveByNode(node int64) { - scheduler.rwmutex.Lock() - defer scheduler.rwmutex.Unlock() - - for _, task := range scheduler.segmentTasks { + scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool { if scheduler.isRelated(task, node) { scheduler.remove(task) } - } - for _, task := range scheduler.channelTasks { + return true + }) + scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool { if scheduler.isRelated(task, node) { scheduler.remove(task) } - } + return true + }) } func (scheduler *taskScheduler) recordSegmentTaskError(task *SegmentTask) { @@ -884,7 +895,7 @@ func (scheduler *taskScheduler) remove(task Task) { switch task := task.(type) { case *SegmentTask: index := NewReplicaSegmentIndex(task) - delete(scheduler.segmentTasks, index) + scheduler.segmentTasks.Remove(index) log = log.With(zap.Int64("segmentID", task.SegmentID())) if task.Status() == TaskStatusFailed && task.Err() != nil && @@ -894,12 +905,12 @@ func (scheduler *taskScheduler) remove(task Task) { case *ChannelTask: index := replicaChannelIndex{task.ReplicaID(), task.Channel()} - delete(scheduler.channelTasks, index) + scheduler.channelTasks.Remove(index) log = log.With(zap.String("channel", task.Channel())) case *LeaderTask: index := NewReplicaLeaderIndex(task) - delete(scheduler.segmentTasks, index) + scheduler.segmentTasks.Remove(index) log = log.With(zap.Int64("segmentID", task.SegmentID())) } diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index d0612c7f20ae9..c2ae2ff208abd 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -1456,10 +1456,10 @@ func (suite *TaskSuite) AssertTaskNum(process, wait, channel, segment int) { suite.Equal(process, scheduler.processQueue.Len()) suite.Equal(wait, scheduler.waitQueue.Len()) - suite.Len(scheduler.segmentTasks, segment) - suite.Len(scheduler.channelTasks, channel) - suite.Equal(len(scheduler.tasks), process+wait) - suite.Equal(len(scheduler.tasks), segment+channel) + suite.Equal(scheduler.segmentTasks.Len(), segment) + suite.Equal(scheduler.channelTasks.Len(), channel) + suite.Equal(scheduler.tasks.Len(), process+wait) + suite.Equal(scheduler.tasks.Len(), segment+channel) } func (suite *TaskSuite) dispatchAndWait(node int64) { @@ -1471,13 +1471,14 @@ func (suite *TaskSuite) dispatchAndWait(node int64) { count = 0 keys = make([]any, 0) - for _, executor := range suite.scheduler.executors { + suite.scheduler.executors.Range(func(_ int64, executor *Executor) bool { executor.executingTasks.Range(func(taskIndex string) bool { keys = append(keys, taskIndex) count++ return true }) - } + return true + }) if count == 0 { return diff --git a/pkg/metrics/querycoord_metrics.go b/pkg/metrics/querycoord_metrics.go index 67aa181421d72..d66abea2cd129 100644 --- a/pkg/metrics/querycoord_metrics.go +++ b/pkg/metrics/querycoord_metrics.go @@ -36,6 +36,7 @@ const ( LeaderGrowTaskLabel = "leader_grow" LeaderReduceTaskLabel = "leader_reduce" + LeaderUpdateTaskLabel = "leader_update" UnknownTaskLabel = "unknown" From c71657b745b4a0cca8141999c4329e066dc898a3 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Fri, 3 Jan 2025 12:45:12 +0800 Subject: [PATCH 10/12] fix Signed-off-by: bigsheeper --- internal/querycoordv2/task/scheduler.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 8d97a48b82d23..fe432b4bccbbc 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -175,6 +175,7 @@ type taskScheduler struct { cluster session.Cluster nodeMgr *session.NodeManager + scheduleMu sync.Mutex // guards schedule() collKeyLock *lock.KeyLock[int64] // guards Add() tasks *ConcurrentMap[UniqueID, struct{}] segmentTasks *ConcurrentMap[replicaSegmentIndex, Task] @@ -503,6 +504,8 @@ func (scheduler *taskScheduler) Dispatch(node int64) { log.Ctx(scheduler.ctx).Info("scheduler stopped") default: + scheduler.scheduleMu.Lock() + defer scheduler.scheduleMu.Unlock() scheduler.schedule(node) } } From 9d4ee7ffa4510816452a5bbe3a19903773a54986 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Tue, 14 Jan 2025 16:23:06 +0800 Subject: [PATCH 11/12] update Signed-off-by: bigsheeper --- internal/querynodev2/services.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 76ae886a942aa..3f5af3f7d0745 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -1208,10 +1208,7 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get growingSegments[entry.SegmentID] = &msgpb.MsgPosition{} continue } - // QueryCoord only requires the timestamp from the position. - growingSegments[entry.SegmentID] = &msgpb.MsgPosition{ - Timestamp: segment.StartPosition().GetTimestamp(), - } + growingSegments[entry.SegmentID] = segment.StartPosition() numOfGrowingRows += segment.InsertCount() } From e76bcabab9aa04c72ac66c4612027d43cf977ed0 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Wed, 15 Jan 2025 11:13:13 +0800 Subject: [PATCH 12/12] fix data rce Signed-off-by: bigsheeper --- internal/querycoordv2/task/task.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/internal/querycoordv2/task/task.go b/internal/querycoordv2/task/task.go index 25e64d5be1392..c5fa610c20619 100644 --- a/internal/querycoordv2/task/task.go +++ b/internal/querycoordv2/task/task.go @@ -134,12 +134,14 @@ type baseTask struct { name string // startTs - startTs time.Time + startTs atomic.Time } func newBaseTask(ctx context.Context, source Source, collectionID typeutil.UniqueID, replica *meta.Replica, shard string, taskTag string) *baseTask { ctx, cancel := context.WithCancel(ctx) ctx, span := otel.Tracer(typeutil.QueryCoordRole).Start(ctx, taskTag) + startTs := atomic.Time{} + startTs.Store(time.Now()) return &baseTask{ source: source, @@ -154,7 +156,7 @@ func newBaseTask(ctx context.Context, source Source, collectionID typeutil.Uniqu doneCh: make(chan struct{}), canceled: atomic.NewBool(false), span: span, - startTs: time.Now(), + startTs: startTs, } } @@ -217,11 +219,11 @@ func (task *baseTask) Index() string { } func (task *baseTask) RecordStartTs() { - task.startTs = time.Now() + task.startTs.Store(time.Now()) } func (task *baseTask) GetTaskLatency() int64 { - return time.Since(task.startTs).Milliseconds() + return time.Since(task.startTs.Load()).Milliseconds() } func (task *baseTask) Err() error {