Skip to content

Commit

Permalink
fix: Delegator may becomes unserviceable after querycoord restart (mi…
Browse files Browse the repository at this point in the history
…lvus-io#37055)

issue: milvus-io#37054
after querycoord restart, segment_checker may release segment by mistake
due to next target isn't ready yet.

This PR requires release segment must happens after next target is
ready.

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Oct 24, 2024
1 parent 0d376f1 commit 39a91eb
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 2 deletions.
6 changes: 4 additions & 2 deletions internal/querycoordv2/checkers/segment_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,14 @@ func (c *SegmentChecker) getGrowingSegmentDiff(collectionID int64,
continue
}

nextTargetExist := c.targetMgr.IsNextTargetExist(collectionID)
nextTargetSegmentIDs := c.targetMgr.GetGrowingSegmentsByCollection(collectionID, meta.NextTarget)
currentTargetSegmentIDs := c.targetMgr.GetGrowingSegmentsByCollection(collectionID, meta.CurrentTarget)
currentTargetChannelMap := c.targetMgr.GetDmChannelsByCollection(collectionID, meta.CurrentTarget)

// get segment which exist on leader view, but not on current target and next target
for _, segment := range view.GrowingSegments {
if !currentTargetSegmentIDs.Contain(segment.GetID()) && !nextTargetSegmentIDs.Contain(segment.GetID()) {
if !currentTargetSegmentIDs.Contain(segment.GetID()) && nextTargetExist && !nextTargetSegmentIDs.Contain(segment.GetID()) {
if channel, ok := currentTargetChannelMap[segment.InsertChannel]; ok {
timestampInSegment := segment.GetStartPosition().GetTimestamp()
timestampInTarget := channel.GetSeekPosition().GetTimestamp()
Expand Down Expand Up @@ -270,6 +271,7 @@ func (c *SegmentChecker) getSealedSegmentDiff(
return !existInDist
}

nextTargetExist := c.targetMgr.IsNextTargetExist(collectionID)
nextTargetMap := c.targetMgr.GetSealedSegmentsByCollection(collectionID, meta.NextTarget)
currentTargetMap := c.targetMgr.GetSealedSegmentsByCollection(collectionID, meta.CurrentTarget)

Expand Down Expand Up @@ -298,7 +300,7 @@ func (c *SegmentChecker) getSealedSegmentDiff(
_, existOnNext := nextTargetMap[segment.GetID()]

// l0 segment should be release with channel together
if !existOnNext && !existOnCurrent {
if !existOnNext && nextTargetExist && !existOnCurrent {
toRelease = append(toRelease, segment)
}
}
Expand Down
4 changes: 4 additions & 0 deletions internal/querycoordv2/checkers/segment_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseL0Segments() {
channels, nil, nil)
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
checker.targetMgr.UpdateCollectionCurrentTarget(int64(1))
checker.targetMgr.UpdateCollectionNextTarget(int64(1))

tasks = checker.Check(context.TODO())
suite.Len(tasks, 1)
Expand Down Expand Up @@ -532,6 +533,7 @@ func (suite *SegmentCheckerTestSuite) TestSkipReleaseSealedSegments() {
channels, segments, nil)
checker.targetMgr.UpdateCollectionNextTarget(collectionID)
checker.targetMgr.UpdateCollectionCurrentTarget(collectionID)
checker.targetMgr.UpdateCollectionNextTarget(collectionID)
readableVersion := checker.targetMgr.GetCollectionTargetVersion(collectionID, meta.CurrentTarget)

// test less target version exist on leader,meet segment doesn't exit in target, segment should be released
Expand Down Expand Up @@ -602,6 +604,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseGrowingSegments() {
channels, segments, nil)
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
checker.targetMgr.UpdateCollectionCurrentTarget(int64(1))
checker.targetMgr.UpdateCollectionNextTarget(int64(1))

growingSegments := make(map[int64]*meta.Segment)
growingSegments[2] = utils.CreateTestSegment(1, 1, 2, 2, 0, "test-insert-channel")
Expand Down Expand Up @@ -661,6 +664,7 @@ func (suite *SegmentCheckerTestSuite) TestSkipReleaseGrowingSegments() {
channels, segments, nil)
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
checker.targetMgr.UpdateCollectionCurrentTarget(int64(1))
checker.targetMgr.UpdateCollectionNextTarget(int64(1))

growingSegments := make(map[int64]*meta.Segment)
growingSegments[2] = utils.CreateTestSegment(1, 1, 2, 2, 0, "test-insert-channel")
Expand Down

0 comments on commit 39a91eb

Please sign in to comment.