From 7f0191e7457a052b51b2a5276f419e10e75c4eb3 Mon Sep 17 00:00:00 2001 From: Congqi Xia Date: Thu, 5 Dec 2024 09:12:29 +0800 Subject: [PATCH] Fix sync part behavior and fix unit tests Signed-off-by: Congqi Xia --- internal/querycoordv2/job/job_release.go | 2 + internal/querycoordv2/job/job_sync.go | 17 +--- internal/querycoordv2/job/job_test.go | 82 +++++++++++++------ internal/querycoordv2/job/utils.go | 21 +++++ .../querycoordv2/observers/target_observer.go | 2 +- internal/querycoordv2/services_test.go | 15 +++- 6 files changed, 96 insertions(+), 43 deletions(-) diff --git a/internal/querycoordv2/job/job_release.go b/internal/querycoordv2/job/job_release.go index 204fd7a14ac66..fa1f3bf47abc0 100644 --- a/internal/querycoordv2/job/job_release.go +++ b/internal/querycoordv2/job/job_release.go @@ -208,6 +208,8 @@ func (job *ReleasePartitionJob) Execute() error { return errors.Wrap(err, msg) } job.targetObserver.ReleasePartition(req.GetCollectionID(), toRelease...) + // wait current target updated, so following querys will act as expected + waitCurrentTargetUpdated(job.ctx, job.targetObserver, job.req.GetCollectionID()) waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID(), toRelease...) } metrics.QueryCoordNumPartitions.WithLabelValues().Sub(float64(len(toRelease))) diff --git a/internal/querycoordv2/job/job_sync.go b/internal/querycoordv2/job/job_sync.go index 52dc4d47c8e93..899cbe56495fc 100644 --- a/internal/querycoordv2/job/job_sync.go +++ b/internal/querycoordv2/job/job_sync.go @@ -96,20 +96,5 @@ func (job *SyncNewCreatedPartitionJob) Execute() error { return errors.Wrap(err, msg) } - // manual trigger update next target - ready, err := job.targetObserver.UpdateNextTarget(req.GetCollectionID()) - if err != nil { - log.Warn("failed to update next target for sync partition job", zap.Error(err)) - return err - } - - // accelerate check - job.targetObserver.TriggerUpdateCurrentTarget(req.GetCollectionID()) - // wait current target ready - select { - case <-ready: - return nil - case <-job.ctx.Done(): - return job.ctx.Err() - } + return waitCurrentTargetUpdated(job.ctx, job.targetObserver, job.req.GetCollectionID()) } diff --git a/internal/querycoordv2/job/job_test.go b/internal/querycoordv2/job/job_test.go index b254e5a101c62..7c7e188c9d56e 100644 --- a/internal/querycoordv2/job/job_test.go +++ b/internal/querycoordv2/job/job_test.go @@ -45,6 +45,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) const ( @@ -1116,6 +1117,12 @@ func (suite *JobSuite) TestReleasePartition() { // Test release partial partitions suite.releaseAll() suite.loadAll() + for _, collectionID := range suite.collections { + // make collection able to get into loaded state + suite.updateChannelDist(ctx, collectionID, true) + suite.updateSegmentDist(collectionID, 3000, suite.partitions[collectionID]...) + waitCurrentTargetUpdated(ctx, suite.targetObserver, collectionID) + } for _, collection := range suite.collections { req := &querypb.ReleasePartitionsRequest{ CollectionID: collection, @@ -1133,6 +1140,8 @@ func (suite *JobSuite) TestReleasePartition() { suite.proxyManager, ) suite.scheduler.Add(job) + suite.updateChannelDist(ctx, collection, true) + suite.updateSegmentDist(collection, 3000, suite.partitions[collection][:1]...) err := job.Wait() suite.NoError(err) suite.True(suite.meta.Exist(ctx, collection)) @@ -1189,8 +1198,18 @@ func (suite *JobSuite) TestDynamicRelease() { // action: release p0 // expect: p0 released, p1, p2 loaded suite.loadAll() + for _, collectionID := range suite.collections { + // make collection able to get into loaded state + suite.updateChannelDist(ctx, collectionID, true) + suite.updateSegmentDist(collectionID, 3000, suite.partitions[collectionID]...) + waitCurrentTargetUpdated(ctx, suite.targetObserver, collectionID) + } + job := newReleasePartJob(col0, p0) suite.scheduler.Add(job) + // update segments + suite.updateSegmentDist(col0, 3000, p1, p2) + suite.updateChannelDist(ctx, col0, true) err := job.Wait() suite.NoError(err) suite.assertPartitionReleased(col0, p0) @@ -1201,6 +1220,8 @@ func (suite *JobSuite) TestDynamicRelease() { // expect: p1 released, p2 loaded job = newReleasePartJob(col0, p0, p1) suite.scheduler.Add(job) + suite.updateSegmentDist(col0, 3000, p2) + suite.updateChannelDist(ctx, col0, true) err = job.Wait() suite.NoError(err) suite.assertPartitionReleased(col0, p0, p1) @@ -1211,6 +1232,8 @@ func (suite *JobSuite) TestDynamicRelease() { // expect: loadType=col: col loaded, p2 released job = newReleasePartJob(col0, p2) suite.scheduler.Add(job) + suite.updateSegmentDist(col0, 3000) + suite.updateChannelDist(ctx, col0, false) err = job.Wait() suite.NoError(err) suite.assertPartitionReleased(col0, p0, p1, p2) @@ -1359,8 +1382,8 @@ func (suite *JobSuite) TestSyncNewCreatedPartition() { suite.loadAll() collectionID := suite.collections[0] // make collection able to get into loaded state - suite.updateChannelDist(ctx, collectionID) - suite.updateSegmentDist(collectionID, 3000) + suite.updateChannelDist(ctx, collectionID, true) + suite.updateSegmentDist(collectionID, 3000, suite.partitions[collectionID]...) req := &querypb.SyncNewCreatedPartitionRequest{ CollectionID: collectionID, @@ -1551,9 +1574,13 @@ func (suite *JobSuite) assertPartitionReleased(collection int64, partitionIDs .. } } -func (suite *JobSuite) updateSegmentDist(collection, node int64) { +func (suite *JobSuite) updateSegmentDist(collection, node int64, partitions ...int64) { + partitionSet := typeutil.NewSet(partitions...) metaSegments := make([]*meta.Segment, 0) for partition, segments := range suite.segments[collection] { + if !partitionSet.Contain(partition) { + continue + } for _, segment := range segments { metaSegments = append(metaSegments, utils.CreateTestSegment(collection, partition, segment, node, 1, "test-channel")) @@ -1562,32 +1589,39 @@ func (suite *JobSuite) updateSegmentDist(collection, node int64) { suite.dist.SegmentDistManager.Update(node, metaSegments...) } -func (suite *JobSuite) updateChannelDist(ctx context.Context, collection int64) { +func (suite *JobSuite) updateChannelDist(ctx context.Context, collection int64, loaded bool) { channels := suite.channels[collection] segments := lo.Flatten(lo.Values(suite.segments[collection])) replicas := suite.meta.ReplicaManager.GetByCollection(ctx, collection) for _, replica := range replicas { - i := 0 - for _, node := range replica.GetNodes() { - suite.dist.ChannelDistManager.Update(node, meta.DmChannelFromVChannel(&datapb.VchannelInfo{ - CollectionID: collection, - ChannelName: channels[i], - })) - suite.dist.LeaderViewManager.Update(node, &meta.LeaderView{ - ID: node, - CollectionID: collection, - Channel: channels[i], - Segments: lo.SliceToMap(segments, func(segment int64) (int64, *querypb.SegmentDist) { - return segment, &querypb.SegmentDist{ - NodeID: node, - Version: time.Now().Unix(), - } - }), - }) - i++ - if i >= len(channels) { - break + if loaded { + i := 0 + for _, node := range replica.GetNodes() { + suite.dist.ChannelDistManager.Update(node, meta.DmChannelFromVChannel(&datapb.VchannelInfo{ + CollectionID: collection, + ChannelName: channels[i], + })) + suite.dist.LeaderViewManager.Update(node, &meta.LeaderView{ + ID: node, + CollectionID: collection, + Channel: channels[i], + Segments: lo.SliceToMap(segments, func(segment int64) (int64, *querypb.SegmentDist) { + return segment, &querypb.SegmentDist{ + NodeID: node, + Version: time.Now().Unix(), + } + }), + }) + i++ + if i >= len(channels) { + break + } + } + } else { + for _, node := range replica.GetNodes() { + suite.dist.ChannelDistManager.Update(node) + suite.dist.LeaderViewManager.Update(node) } } } diff --git a/internal/querycoordv2/job/utils.go b/internal/querycoordv2/job/utils.go index 99e995273aa58..a98abfbe8f8be 100644 --- a/internal/querycoordv2/job/utils.go +++ b/internal/querycoordv2/job/utils.go @@ -17,6 +17,7 @@ package job import ( + "context" "time" "github.com/samber/lo" @@ -24,6 +25,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/checkers" "github.com/milvus-io/milvus/internal/querycoordv2/meta" + "github.com/milvus-io/milvus/internal/querycoordv2/observers" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -61,3 +63,22 @@ func waitCollectionReleased(dist *meta.DistributionManager, checkerController *c time.Sleep(200 * time.Millisecond) } } + +func waitCurrentTargetUpdated(ctx context.Context, targetObserver *observers.TargetObserver, collection int64) error { + // manual trigger update next target + ready, err := targetObserver.UpdateNextTarget(collection) + if err != nil { + log.Warn("failed to update next target for sync partition job", zap.Error(err)) + return err + } + + // accelerate check + targetObserver.TriggerUpdateCurrentTarget(collection) + // wait current target ready + select { + case <-ready: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 528728c572612..ac636c40f31f4 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -467,7 +467,7 @@ func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leade } ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond)) defer cancel() - log.Warn("before sync", zap.Any("cluster", ob.cluster), zap.Any("lv", leaderView), zap.Any("req", req)) + resp, err := ob.cluster.SyncDistribution(ctx, leaderView.ID, req) if err != nil { log.Warn("failed to sync distribution", zap.Error(err)) diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index f2e4e2f328061..1381bc2a23b51 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -1043,8 +1043,13 @@ func (suite *ServiceSuite) TestReleaseCollection() { } func (suite *ServiceSuite) TestReleasePartition() { - suite.loadAll() ctx := context.Background() + suite.loadAll() + for _, collection := range suite.collections { + suite.updateChannelDist(ctx, collection) + suite.updateSegmentDist(collection, suite.nodes[0]) + } + server := suite.server // Test release all partitions @@ -1053,6 +1058,8 @@ func (suite *ServiceSuite) TestReleasePartition() { CollectionID: collection, PartitionIDs: suite.partitions[collection][0:1], } + suite.updateChannelDist(ctx, collection) + suite.updateSegmentDist(collection, suite.nodes[0], suite.partitions[collection][1:]...) resp, err := server.ReleasePartitions(ctx, req) suite.NoError(err) suite.Equal(commonpb.ErrorCode_Success, resp.ErrorCode) @@ -1973,9 +1980,13 @@ func (suite *ServiceSuite) getAllSegments(collection int64) []int64 { return allSegments } -func (suite *ServiceSuite) updateSegmentDist(collection, node int64) { +func (suite *ServiceSuite) updateSegmentDist(collection, node int64, partitions ...int64) { + partitionSet := typeutil.NewSet(partitions...) metaSegments := make([]*meta.Segment, 0) for partition, segments := range suite.segments[collection] { + if partitionSet.Len() > 0 && !partitionSet.Contain(partition) { + continue + } for _, segment := range segments { metaSegments = append(metaSegments, utils.CreateTestSegment(collection, partition, segment, node, 1, "test-channel"))