diff --git a/internal/querycoordv2/meta/target_manager.go b/internal/querycoordv2/meta/target_manager.go index 2553129570ed6..e448dbb9d62a6 100644 --- a/internal/querycoordv2/meta/target_manager.go +++ b/internal/querycoordv2/meta/target_manager.go @@ -330,22 +330,49 @@ func (mgr *TargetManager) removePartitionFromCollectionTarget(oldTarget *Collect return NewCollectionTarget(segments, channels) } -func (mgr *TargetManager) getCollectionTarget(scope TargetScope, collectionID int64) *CollectionTarget { +func (mgr *TargetManager) getCollectionTarget(scope TargetScope, collectionID int64) []*CollectionTarget { switch scope { case CurrentTarget: - return mgr.current.getCollectionTarget(collectionID) + + ret := make([]*CollectionTarget, 0, 1) + current := mgr.current.getCollectionTarget(collectionID) + if current != nil { + ret = append(ret, current) + } + return ret case NextTarget: - return mgr.next.getCollectionTarget(collectionID) + ret := make([]*CollectionTarget, 0, 1) + next := mgr.next.getCollectionTarget(collectionID) + if next != nil { + ret = append(ret, next) + } + return ret case CurrentTargetFirst: - if current := mgr.current.getCollectionTarget(collectionID); current != nil { - return current + ret := make([]*CollectionTarget, 0, 2) + current := mgr.current.getCollectionTarget(collectionID) + if current != nil { + ret = append(ret, current) + } + + next := mgr.next.getCollectionTarget(collectionID) + if next != nil { + ret = append(ret, next) } - return mgr.next.getCollectionTarget(collectionID) + + return ret case NextTargetFirst: - if next := mgr.next.getCollectionTarget(collectionID); next != nil { - return next + ret := make([]*CollectionTarget, 0, 2) + next := mgr.next.getCollectionTarget(collectionID) + if next != nil { + ret = append(ret, next) + } + + current := mgr.current.getCollectionTarget(collectionID) + if current != nil { + ret = append(ret, current) } - return mgr.current.getCollectionTarget(collectionID) + + return ret } return nil } @@ -356,18 +383,20 @@ func (mgr *TargetManager) GetGrowingSegmentsByCollection(collectionID int64, mgr.rwMutex.RLock() defer mgr.rwMutex.RUnlock() - collectionTarget := mgr.getCollectionTarget(scope, collectionID) + targets := mgr.getCollectionTarget(scope, collectionID) - if collectionTarget == nil { - return nil - } + for _, t := range targets { + segments := typeutil.NewUniqueSet() + for _, channel := range t.GetAllDmChannels() { + segments.Insert(channel.GetUnflushedSegmentIds()...) + } - segments := typeutil.NewUniqueSet() - for _, channel := range collectionTarget.GetAllDmChannels() { - segments.Insert(channel.GetUnflushedSegmentIds()...) + if len(segments) > 0 { + return segments + } } - return segments + return nil } func (mgr *TargetManager) GetGrowingSegmentsByChannel(collectionID int64, @@ -377,20 +406,21 @@ func (mgr *TargetManager) GetGrowingSegmentsByChannel(collectionID int64, mgr.rwMutex.RLock() defer mgr.rwMutex.RUnlock() - collectionTarget := mgr.getCollectionTarget(scope, collectionID) - - if collectionTarget == nil { - return nil - } + targets := mgr.getCollectionTarget(scope, collectionID) + for _, t := range targets { + segments := typeutil.NewUniqueSet() + for _, channel := range t.GetAllDmChannels() { + if channel.ChannelName == channelName { + segments.Insert(channel.GetUnflushedSegmentIds()...) + } + } - segments := typeutil.NewUniqueSet() - for _, channel := range collectionTarget.GetAllDmChannels() { - if channel.ChannelName == channelName { - segments.Insert(channel.GetUnflushedSegmentIds()...) + if len(segments) > 0 { + return segments } } - return segments + return nil } func (mgr *TargetManager) GetSealedSegmentsByCollection(collectionID int64, @@ -399,12 +429,13 @@ func (mgr *TargetManager) GetSealedSegmentsByCollection(collectionID int64, mgr.rwMutex.RLock() defer mgr.rwMutex.RUnlock() - collectionTarget := mgr.getCollectionTarget(scope, collectionID) + targets := mgr.getCollectionTarget(scope, collectionID) - if collectionTarget == nil { - return nil + for _, t := range targets { + return t.GetAllSegments() } - return collectionTarget.GetAllSegments() + + return nil } func (mgr *TargetManager) GetSealedSegmentsByChannel(collectionID int64, @@ -414,19 +445,21 @@ func (mgr *TargetManager) GetSealedSegmentsByChannel(collectionID int64, mgr.rwMutex.RLock() defer mgr.rwMutex.RUnlock() - collectionTarget := mgr.getCollectionTarget(scope, collectionID) - if collectionTarget == nil { - return nil - } + targets := mgr.getCollectionTarget(scope, collectionID) + for _, t := range targets { + ret := make(map[int64]*datapb.SegmentInfo) + for k, v := range t.GetAllSegments() { + if v.GetInsertChannel() == channelName { + ret[k] = v + } + } - ret := make(map[int64]*datapb.SegmentInfo) - for k, v := range collectionTarget.GetAllSegments() { - if v.GetInsertChannel() == channelName { - ret[k] = v + if len(ret) > 0 { + return ret } } - return ret + return nil } func (mgr *TargetManager) GetDroppedSegmentsByChannel(collectionID int64, @@ -436,86 +469,92 @@ func (mgr *TargetManager) GetDroppedSegmentsByChannel(collectionID int64, mgr.rwMutex.RLock() defer mgr.rwMutex.RUnlock() - collectionTarget := mgr.getCollectionTarget(scope, collectionID) - - if collectionTarget == nil { - return nil - } - - channel := collectionTarget.dmChannels[channelName] - if channel == nil { - return nil + targets := mgr.getCollectionTarget(scope, collectionID) + for _, t := range targets { + if channel, ok := t.dmChannels[channelName]; ok { + return channel.GetDroppedSegmentIds() + } } - return channel.GetDroppedSegmentIds() + return nil } func (mgr *TargetManager) GetSealedSegmentsByPartition(collectionID int64, - partitionID int64, scope TargetScope, + partitionID int64, + scope TargetScope, ) map[int64]*datapb.SegmentInfo { mgr.rwMutex.RLock() defer mgr.rwMutex.RUnlock() - collectionTarget := mgr.getCollectionTarget(scope, collectionID) - - if collectionTarget == nil { - return nil - } + targets := mgr.getCollectionTarget(scope, collectionID) + for _, t := range targets { + segments := make(map[int64]*datapb.SegmentInfo) + for _, s := range t.GetAllSegments() { + if s.GetPartitionID() == partitionID { + segments[s.GetID()] = s + } + } - segments := make(map[int64]*datapb.SegmentInfo) - for _, s := range collectionTarget.GetAllSegments() { - if s.GetPartitionID() == partitionID { - segments[s.GetID()] = s + if len(segments) > 0 { + return segments } } - return segments + return nil } func (mgr *TargetManager) GetDmChannelsByCollection(collectionID int64, scope TargetScope) map[string]*DmChannel { mgr.rwMutex.RLock() defer mgr.rwMutex.RUnlock() - collectionTarget := mgr.getCollectionTarget(scope, collectionID) + targets := mgr.getCollectionTarget(scope, collectionID) - if collectionTarget == nil { - return nil + for _, t := range targets { + return t.GetAllDmChannels() } - return collectionTarget.GetAllDmChannels() + + return nil } func (mgr *TargetManager) GetDmChannel(collectionID int64, channel string, scope TargetScope) *DmChannel { mgr.rwMutex.RLock() defer mgr.rwMutex.RUnlock() - collectionTarget := mgr.getCollectionTarget(scope, collectionID) - - if collectionTarget == nil { - return nil + targets := mgr.getCollectionTarget(scope, collectionID) + for _, t := range targets { + if ch, ok := t.GetAllDmChannels()[channel]; ok { + return ch + } } - return collectionTarget.GetAllDmChannels()[channel] + return nil } func (mgr *TargetManager) GetSealedSegment(collectionID int64, id int64, scope TargetScope) *datapb.SegmentInfo { mgr.rwMutex.RLock() defer mgr.rwMutex.RUnlock() - collectionTarget := mgr.getCollectionTarget(scope, collectionID) - if collectionTarget == nil { - return nil + targets := mgr.getCollectionTarget(scope, collectionID) + for _, t := range targets { + if s, ok := t.GetAllSegments()[id]; ok { + return s + } } - return collectionTarget.GetAllSegments()[id] + + return nil } func (mgr *TargetManager) GetCollectionTargetVersion(collectionID int64, scope TargetScope) int64 { mgr.rwMutex.RLock() defer mgr.rwMutex.RUnlock() - collectionTarget := mgr.getCollectionTarget(scope, collectionID) - if collectionTarget == nil { - return 0 + targets := mgr.getCollectionTarget(scope, collectionID) + for _, t := range targets { + if t.GetTargetVersion() > 0 { + return t.GetTargetVersion() + } } - return collectionTarget.GetTargetVersion() + + return 0 } func (mgr *TargetManager) IsCurrentTargetExist(collectionID int64) bool { diff --git a/internal/querycoordv2/meta/target_manager_test.go b/internal/querycoordv2/meta/target_manager_test.go index 5c8a152b3314e..daba49b4398be 100644 --- a/internal/querycoordv2/meta/target_manager_test.go +++ b/internal/querycoordv2/meta/target_manager_test.go @@ -418,6 +418,10 @@ func (suite *TargetManagerSuite) TestGetSegmentByChannel() { suite.Len(suite.mgr.GetGrowingSegmentsByChannel(collectionID, "channel-1", NextTarget), 4) suite.Len(suite.mgr.GetGrowingSegmentsByChannel(collectionID, "channel-2", NextTarget), 1) suite.Len(suite.mgr.GetDroppedSegmentsByChannel(collectionID, "channel-1", NextTarget), 3) + suite.Len(suite.mgr.GetGrowingSegmentsByCollection(collectionID, NextTarget), 5) + suite.Len(suite.mgr.GetSealedSegmentsByPartition(collectionID, 1, NextTarget), 2) + suite.NotNil(suite.mgr.GetSealedSegment(collectionID, 11, NextTarget)) + suite.NotNil(suite.mgr.GetDmChannel(collectionID, "channel-1", NextTarget)) } func (suite *TargetManagerSuite) TestGetTarget() { @@ -425,7 +429,7 @@ func (suite *TargetManagerSuite) TestGetTarget() { tag string mgr *TargetManager scope TargetScope - expectTarget *CollectionTarget + expectTarget int } current := &CollectionTarget{} @@ -439,7 +443,7 @@ func (suite *TargetManagerSuite) TestGetTarget() { }, next: &target{ collectionTargetMap: map[int64]*CollectionTarget{ - 1000: current, + 1000: next, }, }, } @@ -462,89 +466,90 @@ func (suite *TargetManagerSuite) TestGetTarget() { cases := []testCase{ { - tag: "both_scope_unknown", - mgr: bothMgr, - scope: -1, - expectTarget: nil, + tag: "both_scope_unknown", + mgr: bothMgr, + scope: -1, + + expectTarget: 0, }, { tag: "both_scope_current", mgr: bothMgr, scope: CurrentTarget, - expectTarget: current, + expectTarget: 1, }, { tag: "both_scope_next", mgr: bothMgr, scope: NextTarget, - expectTarget: next, + expectTarget: 1, }, { tag: "both_scope_current_first", mgr: bothMgr, scope: CurrentTargetFirst, - expectTarget: current, + expectTarget: 2, }, { tag: "both_scope_next_first", mgr: bothMgr, scope: NextTargetFirst, - expectTarget: next, + expectTarget: 2, }, { tag: "next_scope_current", mgr: nextMgr, scope: CurrentTarget, - expectTarget: nil, + expectTarget: 0, }, { tag: "next_scope_next", mgr: nextMgr, scope: NextTarget, - expectTarget: next, + expectTarget: 1, }, { tag: "next_scope_current_first", mgr: nextMgr, scope: CurrentTargetFirst, - expectTarget: next, + expectTarget: 1, }, { tag: "next_scope_next_first", mgr: nextMgr, scope: NextTargetFirst, - expectTarget: next, + expectTarget: 1, }, { tag: "current_scope_current", mgr: currentMgr, scope: CurrentTarget, - expectTarget: current, + expectTarget: 1, }, { tag: "current_scope_next", mgr: currentMgr, scope: NextTarget, - expectTarget: nil, + expectTarget: 0, }, { tag: "current_scope_current_first", mgr: currentMgr, scope: CurrentTargetFirst, - expectTarget: current, + expectTarget: 1, }, { tag: "current_scope_next_first", mgr: currentMgr, scope: NextTargetFirst, - expectTarget: current, + expectTarget: 1, }, } for _, tc := range cases { suite.Run(tc.tag, func() { - target := tc.mgr.getCollectionTarget(tc.scope, 1000) - suite.Equal(tc.expectTarget, target) + targets := tc.mgr.getCollectionTarget(tc.scope, 1000) + suite.Equal(tc.expectTarget, len(targets)) }) } }