Skip to content

Commit

Permalink
Fix sync part behavior and fix unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia committed Dec 5, 2024
1 parent 87635d7 commit 019cdfa
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 43 deletions.
2 changes: 2 additions & 0 deletions internal/querycoordv2/job/job_release.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ func (job *ReleasePartitionJob) Execute() error {
return errors.Wrap(err, msg)
}
job.targetObserver.ReleasePartition(req.GetCollectionID(), toRelease...)
// wait current target updated, so following querys will act as expected
waitCurrentTargetUpdated(job.ctx, job.targetObserver, job.req.GetCollectionID())
waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID(), toRelease...)
}
metrics.QueryCoordNumPartitions.WithLabelValues().Sub(float64(len(toRelease)))
Expand Down
17 changes: 1 addition & 16 deletions internal/querycoordv2/job/job_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,5 @@ func (job *SyncNewCreatedPartitionJob) Execute() error {
return errors.Wrap(err, msg)
}

// manual trigger update next target
ready, err := job.targetObserver.UpdateNextTarget(req.GetCollectionID())
if err != nil {
log.Warn("failed to update next target for sync partition job", zap.Error(err))
return err
}

// accelerate check
job.targetObserver.TriggerUpdateCurrentTarget(req.GetCollectionID())
// wait current target ready
select {
case <-ready:
return nil
case <-job.ctx.Done():
return job.ctx.Err()
}
return waitCurrentTargetUpdated(job.ctx, job.targetObserver, job.req.GetCollectionID())
}
82 changes: 58 additions & 24 deletions internal/querycoordv2/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

const (
Expand Down Expand Up @@ -1116,6 +1117,12 @@ func (suite *JobSuite) TestReleasePartition() {
// Test release partial partitions
suite.releaseAll()
suite.loadAll()
for _, collectionID := range suite.collections {
// make collection able to get into loaded state
suite.updateChannelDist(ctx, collectionID, true)
suite.updateSegmentDist(collectionID, 3000, suite.partitions[collectionID]...)
waitCurrentTargetUpdated(ctx, suite.targetObserver, collectionID)
}
for _, collection := range suite.collections {
req := &querypb.ReleasePartitionsRequest{
CollectionID: collection,
Expand All @@ -1133,6 +1140,8 @@ func (suite *JobSuite) TestReleasePartition() {
suite.proxyManager,
)
suite.scheduler.Add(job)
suite.updateChannelDist(ctx, collection, true)
suite.updateSegmentDist(collection, 3000, suite.partitions[collection][:1]...)
err := job.Wait()
suite.NoError(err)
suite.True(suite.meta.Exist(ctx, collection))
Expand Down Expand Up @@ -1189,8 +1198,18 @@ func (suite *JobSuite) TestDynamicRelease() {
// action: release p0
// expect: p0 released, p1, p2 loaded
suite.loadAll()
for _, collectionID := range suite.collections {
// make collection able to get into loaded state
suite.updateChannelDist(ctx, collectionID, true)
suite.updateSegmentDist(collectionID, 3000, suite.partitions[collectionID]...)
waitCurrentTargetUpdated(ctx, suite.targetObserver, collectionID)
}

job := newReleasePartJob(col0, p0)
suite.scheduler.Add(job)
// update segments
suite.updateSegmentDist(col0, 3000, p1, p2)
suite.updateChannelDist(ctx, col0, true)
err := job.Wait()
suite.NoError(err)
suite.assertPartitionReleased(col0, p0)
Expand All @@ -1201,6 +1220,8 @@ func (suite *JobSuite) TestDynamicRelease() {
// expect: p1 released, p2 loaded
job = newReleasePartJob(col0, p0, p1)
suite.scheduler.Add(job)
suite.updateSegmentDist(col0, 3000, p2)
suite.updateChannelDist(ctx, col0, true)
err = job.Wait()
suite.NoError(err)
suite.assertPartitionReleased(col0, p0, p1)
Expand All @@ -1211,6 +1232,8 @@ func (suite *JobSuite) TestDynamicRelease() {
// expect: loadType=col: col loaded, p2 released
job = newReleasePartJob(col0, p2)
suite.scheduler.Add(job)
suite.updateSegmentDist(col0, 3000)
suite.updateChannelDist(ctx, col0, false)
err = job.Wait()
suite.NoError(err)
suite.assertPartitionReleased(col0, p0, p1, p2)
Expand Down Expand Up @@ -1359,8 +1382,8 @@ func (suite *JobSuite) TestSyncNewCreatedPartition() {
suite.loadAll()
collectionID := suite.collections[0]
// make collection able to get into loaded state
suite.updateChannelDist(ctx, collectionID)
suite.updateSegmentDist(collectionID, 3000)
suite.updateChannelDist(ctx, collectionID, true)
suite.updateSegmentDist(collectionID, 3000, suite.partitions[collectionID]...)

req := &querypb.SyncNewCreatedPartitionRequest{
CollectionID: collectionID,
Expand Down Expand Up @@ -1551,9 +1574,13 @@ func (suite *JobSuite) assertPartitionReleased(collection int64, partitionIDs ..
}
}

func (suite *JobSuite) updateSegmentDist(collection, node int64) {
func (suite *JobSuite) updateSegmentDist(collection, node int64, partitions ...int64) {
partitionSet := typeutil.NewSet(partitions...)
metaSegments := make([]*meta.Segment, 0)
for partition, segments := range suite.segments[collection] {
if !partitionSet.Contain(partition) {
continue
}
for _, segment := range segments {
metaSegments = append(metaSegments,
utils.CreateTestSegment(collection, partition, segment, node, 1, "test-channel"))
Expand All @@ -1562,32 +1589,39 @@ func (suite *JobSuite) updateSegmentDist(collection, node int64) {
suite.dist.SegmentDistManager.Update(node, metaSegments...)
}

func (suite *JobSuite) updateChannelDist(ctx context.Context, collection int64) {
func (suite *JobSuite) updateChannelDist(ctx context.Context, collection int64, loaded bool) {
channels := suite.channels[collection]
segments := lo.Flatten(lo.Values(suite.segments[collection]))

replicas := suite.meta.ReplicaManager.GetByCollection(ctx, collection)
for _, replica := range replicas {
i := 0
for _, node := range replica.GetNodes() {
suite.dist.ChannelDistManager.Update(node, meta.DmChannelFromVChannel(&datapb.VchannelInfo{
CollectionID: collection,
ChannelName: channels[i],
}))
suite.dist.LeaderViewManager.Update(node, &meta.LeaderView{
ID: node,
CollectionID: collection,
Channel: channels[i],
Segments: lo.SliceToMap(segments, func(segment int64) (int64, *querypb.SegmentDist) {
return segment, &querypb.SegmentDist{
NodeID: node,
Version: time.Now().Unix(),
}
}),
})
i++
if i >= len(channels) {
break
if loaded {
i := 0
for _, node := range replica.GetNodes() {
suite.dist.ChannelDistManager.Update(node, meta.DmChannelFromVChannel(&datapb.VchannelInfo{
CollectionID: collection,
ChannelName: channels[i],
}))
suite.dist.LeaderViewManager.Update(node, &meta.LeaderView{
ID: node,
CollectionID: collection,
Channel: channels[i],
Segments: lo.SliceToMap(segments, func(segment int64) (int64, *querypb.SegmentDist) {
return segment, &querypb.SegmentDist{
NodeID: node,
Version: time.Now().Unix(),
}
}),
})
i++
if i >= len(channels) {
break
}
}
} else {
for _, node := range replica.GetNodes() {
suite.dist.ChannelDistManager.Update(node)
suite.dist.LeaderViewManager.Update(node)
}
}
}
Expand Down
21 changes: 21 additions & 0 deletions internal/querycoordv2/job/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
package job

import (
"context"
"time"

"github.com/samber/lo"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/querycoordv2/checkers"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/observers"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand Down Expand Up @@ -61,3 +63,22 @@ func waitCollectionReleased(dist *meta.DistributionManager, checkerController *c
time.Sleep(200 * time.Millisecond)
}
}

func waitCurrentTargetUpdated(ctx context.Context, targetObserver *observers.TargetObserver, collection int64) error {
// manual trigger update next target
ready, err := targetObserver.UpdateNextTarget(collection)
if err != nil {
log.Warn("failed to update next target for sync partition job", zap.Error(err))

Check warning on line 71 in internal/querycoordv2/job/utils.go

View check run for this annotation

Codecov / codecov/patch

internal/querycoordv2/job/utils.go#L71

Added line #L71 was not covered by tests
return err
}

// accelerate check
targetObserver.TriggerUpdateCurrentTarget(collection)
// wait current target ready
select {
case <-ready:
return nil
case <-ctx.Done():
return ctx.Err()

Check warning on line 82 in internal/querycoordv2/job/utils.go

View check run for this annotation

Codecov / codecov/patch

internal/querycoordv2/job/utils.go#L81-L82

Added lines #L81 - L82 were not covered by tests
}
}
2 changes: 1 addition & 1 deletion internal/querycoordv2/observers/target_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leade
}
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond))
defer cancel()
log.Warn("before sync", zap.Any("cluster", ob.cluster), zap.Any("lv", leaderView), zap.Any("req", req))

resp, err := ob.cluster.SyncDistribution(ctx, leaderView.ID, req)
if err != nil {
log.Warn("failed to sync distribution", zap.Error(err))
Expand Down
15 changes: 13 additions & 2 deletions internal/querycoordv2/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1043,8 +1043,13 @@ func (suite *ServiceSuite) TestReleaseCollection() {
}

func (suite *ServiceSuite) TestReleasePartition() {
suite.loadAll()
ctx := context.Background()
suite.loadAll()
for _, collection := range suite.collections {
suite.updateChannelDist(ctx, collection)
suite.updateSegmentDist(collection, suite.nodes[0])
}

server := suite.server

// Test release all partitions
Expand All @@ -1053,6 +1058,8 @@ func (suite *ServiceSuite) TestReleasePartition() {
CollectionID: collection,
PartitionIDs: suite.partitions[collection][0:1],
}
suite.updateChannelDist(ctx, collection)
suite.updateSegmentDist(collection, suite.nodes[0], suite.partitions[collection][1:]...)
resp, err := server.ReleasePartitions(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.ErrorCode)
Expand Down Expand Up @@ -1973,9 +1980,13 @@ func (suite *ServiceSuite) getAllSegments(collection int64) []int64 {
return allSegments
}

func (suite *ServiceSuite) updateSegmentDist(collection, node int64) {
func (suite *ServiceSuite) updateSegmentDist(collection, node int64, partitions ...int64) {
partitionSet := typeutil.NewSet(partitions...)
metaSegments := make([]*meta.Segment, 0)
for partition, segments := range suite.segments[collection] {
if partitionSet.Len() > 0 && !partitionSet.Contain(partition) {
continue
}
for _, segment := range segments {
metaSegments = append(metaSegments,
utils.CreateTestSegment(collection, partition, segment, node, 1, "test-channel"))
Expand Down

0 comments on commit 019cdfa

Please sign in to comment.