Skip to content

Commit

Permalink
fix: Wrong behavior of CurrentTargetFirst/NextTargetFirst in target m…
Browse files Browse the repository at this point in the history
…anager (milvus-io#31378)

issue: milvus-io#31162
pr: milvus-io#31379

when give scope CurrentTargetFirst/NextTargetFirst, it's expected to
scan both current and next target.

This PR fixed wrong behavior of CurrentTargetFirst/NextTargetFirst in
target manager, which may cause unexpected task generated, and load
collection may stuck forever due to dirty leader view.

---------

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Mar 19, 2024
1 parent 8946aa1 commit f4449d4
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 98 deletions.
195 changes: 117 additions & 78 deletions internal/querycoordv2/meta/target_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,22 +330,49 @@ func (mgr *TargetManager) removePartitionFromCollectionTarget(oldTarget *Collect
return NewCollectionTarget(segments, channels)
}

func (mgr *TargetManager) getCollectionTarget(scope TargetScope, collectionID int64) *CollectionTarget {
func (mgr *TargetManager) getCollectionTarget(scope TargetScope, collectionID int64) []*CollectionTarget {
switch scope {
case CurrentTarget:
return mgr.current.getCollectionTarget(collectionID)

ret := make([]*CollectionTarget, 0, 1)
current := mgr.current.getCollectionTarget(collectionID)
if current != nil {
ret = append(ret, current)
}
return ret
case NextTarget:
return mgr.next.getCollectionTarget(collectionID)
ret := make([]*CollectionTarget, 0, 1)
next := mgr.next.getCollectionTarget(collectionID)
if next != nil {
ret = append(ret, next)
}
return ret
case CurrentTargetFirst:
if current := mgr.current.getCollectionTarget(collectionID); current != nil {
return current
ret := make([]*CollectionTarget, 0, 2)
current := mgr.current.getCollectionTarget(collectionID)
if current != nil {
ret = append(ret, current)
}

next := mgr.next.getCollectionTarget(collectionID)
if next != nil {
ret = append(ret, next)
}
return mgr.next.getCollectionTarget(collectionID)

return ret
case NextTargetFirst:
if next := mgr.next.getCollectionTarget(collectionID); next != nil {
return next
ret := make([]*CollectionTarget, 0, 2)
next := mgr.next.getCollectionTarget(collectionID)
if next != nil {
ret = append(ret, next)
}

current := mgr.current.getCollectionTarget(collectionID)
if current != nil {
ret = append(ret, current)
}
return mgr.current.getCollectionTarget(collectionID)

return ret
}
return nil
}
Expand All @@ -356,18 +383,20 @@ func (mgr *TargetManager) GetGrowingSegmentsByCollection(collectionID int64,
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()

collectionTarget := mgr.getCollectionTarget(scope, collectionID)
targets := mgr.getCollectionTarget(scope, collectionID)

if collectionTarget == nil {
return nil
}
for _, t := range targets {
segments := typeutil.NewUniqueSet()
for _, channel := range t.GetAllDmChannels() {
segments.Insert(channel.GetUnflushedSegmentIds()...)
}

segments := typeutil.NewUniqueSet()
for _, channel := range collectionTarget.GetAllDmChannels() {
segments.Insert(channel.GetUnflushedSegmentIds()...)
if len(segments) > 0 {
return segments
}
}

return segments
return nil
}

func (mgr *TargetManager) GetGrowingSegmentsByChannel(collectionID int64,
Expand All @@ -377,20 +406,21 @@ func (mgr *TargetManager) GetGrowingSegmentsByChannel(collectionID int64,
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()

collectionTarget := mgr.getCollectionTarget(scope, collectionID)

if collectionTarget == nil {
return nil
}
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
segments := typeutil.NewUniqueSet()
for _, channel := range t.GetAllDmChannels() {
if channel.ChannelName == channelName {
segments.Insert(channel.GetUnflushedSegmentIds()...)
}
}

segments := typeutil.NewUniqueSet()
for _, channel := range collectionTarget.GetAllDmChannels() {
if channel.ChannelName == channelName {
segments.Insert(channel.GetUnflushedSegmentIds()...)
if len(segments) > 0 {
return segments
}
}

return segments
return nil
}

func (mgr *TargetManager) GetSealedSegmentsByCollection(collectionID int64,
Expand All @@ -399,12 +429,13 @@ func (mgr *TargetManager) GetSealedSegmentsByCollection(collectionID int64,
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()

collectionTarget := mgr.getCollectionTarget(scope, collectionID)
targets := mgr.getCollectionTarget(scope, collectionID)

if collectionTarget == nil {
return nil
for _, t := range targets {
return t.GetAllSegments()
}
return collectionTarget.GetAllSegments()

return nil
}

func (mgr *TargetManager) GetSealedSegmentsByChannel(collectionID int64,
Expand All @@ -414,19 +445,21 @@ func (mgr *TargetManager) GetSealedSegmentsByChannel(collectionID int64,
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()

collectionTarget := mgr.getCollectionTarget(scope, collectionID)
if collectionTarget == nil {
return nil
}
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
ret := make(map[int64]*datapb.SegmentInfo)
for k, v := range t.GetAllSegments() {
if v.GetInsertChannel() == channelName {
ret[k] = v
}
}

ret := make(map[int64]*datapb.SegmentInfo)
for k, v := range collectionTarget.GetAllSegments() {
if v.GetInsertChannel() == channelName {
ret[k] = v
if len(ret) > 0 {
return ret
}
}

return ret
return nil
}

func (mgr *TargetManager) GetDroppedSegmentsByChannel(collectionID int64,
Expand All @@ -436,86 +469,92 @@ func (mgr *TargetManager) GetDroppedSegmentsByChannel(collectionID int64,
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()

collectionTarget := mgr.getCollectionTarget(scope, collectionID)

if collectionTarget == nil {
return nil
}

channel := collectionTarget.dmChannels[channelName]
if channel == nil {
return nil
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
if channel, ok := t.dmChannels[channelName]; ok {
return channel.GetDroppedSegmentIds()
}
}

return channel.GetDroppedSegmentIds()
return nil
}

func (mgr *TargetManager) GetSealedSegmentsByPartition(collectionID int64,
partitionID int64, scope TargetScope,
partitionID int64,
scope TargetScope,
) map[int64]*datapb.SegmentInfo {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()

collectionTarget := mgr.getCollectionTarget(scope, collectionID)

if collectionTarget == nil {
return nil
}
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
segments := make(map[int64]*datapb.SegmentInfo)
for _, s := range t.GetAllSegments() {
if s.GetPartitionID() == partitionID {
segments[s.GetID()] = s
}
}

segments := make(map[int64]*datapb.SegmentInfo)
for _, s := range collectionTarget.GetAllSegments() {
if s.GetPartitionID() == partitionID {
segments[s.GetID()] = s
if len(segments) > 0 {
return segments
}
}

return segments
return nil
}

func (mgr *TargetManager) GetDmChannelsByCollection(collectionID int64, scope TargetScope) map[string]*DmChannel {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()

collectionTarget := mgr.getCollectionTarget(scope, collectionID)
targets := mgr.getCollectionTarget(scope, collectionID)

if collectionTarget == nil {
return nil
for _, t := range targets {
return t.GetAllDmChannels()
}
return collectionTarget.GetAllDmChannels()

return nil
}

func (mgr *TargetManager) GetDmChannel(collectionID int64, channel string, scope TargetScope) *DmChannel {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()

collectionTarget := mgr.getCollectionTarget(scope, collectionID)

if collectionTarget == nil {
return nil
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
if ch, ok := t.GetAllDmChannels()[channel]; ok {
return ch
}
}
return collectionTarget.GetAllDmChannels()[channel]
return nil
}

func (mgr *TargetManager) GetSealedSegment(collectionID int64, id int64, scope TargetScope) *datapb.SegmentInfo {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
collectionTarget := mgr.getCollectionTarget(scope, collectionID)

if collectionTarget == nil {
return nil
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
if s, ok := t.GetAllSegments()[id]; ok {
return s
}
}
return collectionTarget.GetAllSegments()[id]

return nil
}

func (mgr *TargetManager) GetCollectionTargetVersion(collectionID int64, scope TargetScope) int64 {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
collectionTarget := mgr.getCollectionTarget(scope, collectionID)

if collectionTarget == nil {
return 0
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
if t.GetTargetVersion() > 0 {
return t.GetTargetVersion()
}
}
return collectionTarget.GetTargetVersion()

return 0
}

func (mgr *TargetManager) IsCurrentTargetExist(collectionID int64) bool {
Expand Down
Loading

0 comments on commit f4449d4

Please sign in to comment.