From 057bfbe678e640c5f9faf0b9025fc6681e6eb07d Mon Sep 17 00:00:00 2001 From: wei liu Date: Fri, 25 Oct 2024 14:55:31 +0800 Subject: [PATCH] fix: Delegator may becomes unserviceable after querycoord restart (#37055) (#37100) issue: #37054 pr: #37055 after querycoord restart, segment_checker may release segment by mistake due to next target isn't ready yet. This PR requires release segment must happens after next target is ready. Signed-off-by: Wei Liu --- internal/querycoordv2/checkers/segment_checker.go | 6 ++++-- internal/querycoordv2/checkers/segment_checker_test.go | 4 ++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/internal/querycoordv2/checkers/segment_checker.go b/internal/querycoordv2/checkers/segment_checker.go index 184e562c58aa6..659c940378450 100644 --- a/internal/querycoordv2/checkers/segment_checker.go +++ b/internal/querycoordv2/checkers/segment_checker.go @@ -192,13 +192,14 @@ func (c *SegmentChecker) getGrowingSegmentDiff(collectionID int64, continue } + nextTargetExist := c.targetMgr.IsNextTargetExist(collectionID) nextTargetSegmentIDs := c.targetMgr.GetGrowingSegmentsByCollection(collectionID, meta.NextTarget) currentTargetSegmentIDs := c.targetMgr.GetGrowingSegmentsByCollection(collectionID, meta.CurrentTarget) currentTargetChannelMap := c.targetMgr.GetDmChannelsByCollection(collectionID, meta.CurrentTarget) // get segment which exist on leader view, but not on current target and next target for _, segment := range view.GrowingSegments { - if !currentTargetSegmentIDs.Contain(segment.GetID()) && !nextTargetSegmentIDs.Contain(segment.GetID()) { + if !currentTargetSegmentIDs.Contain(segment.GetID()) && nextTargetExist && !nextTargetSegmentIDs.Contain(segment.GetID()) { if channel, ok := currentTargetChannelMap[segment.InsertChannel]; ok { timestampInSegment := segment.GetStartPosition().GetTimestamp() timestampInTarget := channel.GetSeekPosition().GetTimestamp() @@ -270,6 +271,7 @@ func (c *SegmentChecker) getSealedSegmentDiff( return !existInDist } + nextTargetExist := c.targetMgr.IsNextTargetExist(collectionID) nextTargetMap := c.targetMgr.GetSealedSegmentsByCollection(collectionID, meta.NextTarget) currentTargetMap := c.targetMgr.GetSealedSegmentsByCollection(collectionID, meta.CurrentTarget) @@ -298,7 +300,7 @@ func (c *SegmentChecker) getSealedSegmentDiff( _, existOnNext := nextTargetMap[segment.GetID()] // l0 segment should be release with channel together - if !existOnNext && !existOnCurrent { + if !existOnNext && nextTargetExist && !existOnCurrent { toRelease = append(toRelease, segment) } } diff --git a/internal/querycoordv2/checkers/segment_checker_test.go b/internal/querycoordv2/checkers/segment_checker_test.go index c87b7acd42816..d4f74826489e4 100644 --- a/internal/querycoordv2/checkers/segment_checker_test.go +++ b/internal/querycoordv2/checkers/segment_checker_test.go @@ -317,6 +317,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseL0Segments() { channels, nil, nil) checker.targetMgr.UpdateCollectionNextTarget(int64(1)) checker.targetMgr.UpdateCollectionCurrentTarget(int64(1)) + checker.targetMgr.UpdateCollectionNextTarget(int64(1)) tasks = checker.Check(context.TODO()) suite.Len(tasks, 1) @@ -532,6 +533,7 @@ func (suite *SegmentCheckerTestSuite) TestSkipReleaseSealedSegments() { channels, segments, nil) checker.targetMgr.UpdateCollectionNextTarget(collectionID) checker.targetMgr.UpdateCollectionCurrentTarget(collectionID) + checker.targetMgr.UpdateCollectionNextTarget(collectionID) readableVersion := checker.targetMgr.GetCollectionTargetVersion(collectionID, meta.CurrentTarget) // test less target version exist on leader,meet segment doesn't exit in target, segment should be released @@ -602,6 +604,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseGrowingSegments() { channels, segments, nil) checker.targetMgr.UpdateCollectionNextTarget(int64(1)) checker.targetMgr.UpdateCollectionCurrentTarget(int64(1)) + checker.targetMgr.UpdateCollectionNextTarget(int64(1)) growingSegments := make(map[int64]*meta.Segment) growingSegments[2] = utils.CreateTestSegment(1, 1, 2, 2, 0, "test-insert-channel") @@ -661,6 +664,7 @@ func (suite *SegmentCheckerTestSuite) TestSkipReleaseGrowingSegments() { channels, segments, nil) checker.targetMgr.UpdateCollectionNextTarget(int64(1)) checker.targetMgr.UpdateCollectionCurrentTarget(int64(1)) + checker.targetMgr.UpdateCollectionNextTarget(int64(1)) growingSegments := make(map[int64]*meta.Segment) growingSegments[2] = utils.CreateTestSegment(1, 1, 2, 2, 0, "test-insert-channel")