Skip to content

Commit

Permalink
fix: Delegator stuck at unserviceable status (milvus-io#37694) (milvu…
Browse files Browse the repository at this point in the history
…s-io#37702)

issue: milvus-io#37679
pr: milvus-io#37694

pr milvus-io#36549 introduce the logic error which update current target when
only parts of channel is ready.

This PR fix the logic error and let dist handler keep pull distribution
on querynode until all delegator becomes serviceable.

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Nov 15, 2024
1 parent e222289 commit 1bd502b
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 5 deletions.
14 changes: 12 additions & 2 deletions internal/querycoordv2/dist/dist_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type TriggerUpdateTargetVersion = func(collectionID int64)
Expand Down Expand Up @@ -194,6 +195,8 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons
channels := lo.SliceToMap(resp.GetChannels(), func(channel *querypb.ChannelVersionInfo) (string, *querypb.ChannelVersionInfo) {
return channel.GetChannel(), channel
})

collectionsToSync := typeutil.NewUniqueSet()
for _, lview := range resp.GetLeaderViews() {
segments := make(map[int64]*meta.Segment)

Expand Down Expand Up @@ -246,9 +249,10 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons
err := merr.WrapErrServiceInternal(fmt.Sprintf("target version mismatch, collection: %d, channel: %s, current target version: %v, leader version: %v",
lview.GetCollection(), lview.GetChannel(), currentTargetVersion, lview.TargetVersion))

// segment and channel already loaded, trigger target observer to check target version
dh.syncTargetVersionFn(lview.GetCollection())
view.UnServiceableError = err
// make dist handler pull next distribution until all delegator is serviceable
dh.lastUpdateTs = 0
collectionsToSync.Insert(lview.Collection)
log.Info("leader is not available due to target version not ready",
zap.Int64("collectionID", view.CollectionID),
zap.Int64("nodeID", view.ID),
Expand All @@ -258,6 +262,12 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons
}

dh.dist.LeaderViewManager.Update(resp.GetNodeID(), updates...)

// segment and channel already loaded, trigger target observer to update
collectionsToSync.Range(func(collection int64) bool {
dh.syncTargetVersionFn(collection)
return true
})
}

func (dh *distHandler) getDistribution(ctx context.Context) (*querypb.GetDataDistributionResponse, error) {
Expand Down
4 changes: 2 additions & 2 deletions internal/querycoordv2/observers/target_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,16 +374,16 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect
channelReadyLeaders := lo.Filter(ob.distMgr.LeaderViewManager.GetByFilter(meta.WithChannelName2LeaderView(channel)), func(leader *meta.LeaderView, _ int) bool {
return utils.CheckDelegatorDataReady(ob.nodeMgr, ob.targetMgr, leader, meta.NextTarget) == nil
})
collectionReadyLeaders = append(collectionReadyLeaders, channelReadyLeaders...)

// to avoid stuck here in dynamic increase replica case, we just check available delegator number
if int32(len(collectionReadyLeaders)) < replicaNum {
if int32(len(channelReadyLeaders)) < replicaNum {
log.RatedInfo(10, "channel not ready",
zap.Int("readyReplicaNum", len(channelReadyLeaders)),
zap.String("channelName", channel),
)
return false
}
collectionReadyLeaders = append(collectionReadyLeaders, channelReadyLeaders...)
}

var collectionInfo *milvuspb.DescribeCollectionResponse
Expand Down
11 changes: 10 additions & 1 deletion tests/integration/target/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ func (s *TargetTestSuit) TestQueryCoordRestart() {
s.initCollection(name, 1, 2, 2, 2000)

ctx := context.Background()

info, err := s.Cluster.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
Base: commonpbutil.NewMsgBase(),
CollectionName: name,
Expand All @@ -156,6 +155,16 @@ func (s *TargetTestSuit) TestQueryCoordRestart() {
s.True(merr.Ok(info.GetStatus()))
collectionID := info.GetCollectionID()

// wait until all shards are ready
// cause showCollections won't just wait all collection becomes loaded, proxy will use retry to block until all shard are ready
s.Eventually(func() bool {
resp, err := s.Cluster.QueryCoord.GetShardLeaders(ctx, &querypb.GetShardLeadersRequest{
Base: commonpbutil.NewMsgBase(),
CollectionID: collectionID,
})
return err == nil && merr.Ok(resp.GetStatus()) && len(resp.Shards) == 2
}, 60*time.Second, 1*time.Second)

// trigger old coord stop
s.Cluster.StopQueryCoord()

Expand Down

0 comments on commit 1bd502b

Please sign in to comment.