From d1e3a30659785955f42f74e8b3683196c5883d37 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Mon, 28 Oct 2024 11:41:40 +0800 Subject: [PATCH 1/3] fix: Search/Query may failed during updating delegator cache casue init query node client is too heavy, so we remove updateShardClient from leader mutex, which cause much more concurrent cornor cases. This PR delay query node client's init operation until `getClient` is called, then use leader mutex to protect updating shard client progress to avoid concurrent issues. Signed-off-by: Wei Liu --- internal/proxy/lb_policy.go | 20 ++++++++++-- internal/proxy/meta_cache.go | 6 ++-- internal/proxy/shard_client.go | 60 ++++++++++++++++++---------------- 3 files changed, 53 insertions(+), 33 deletions(-) diff --git a/internal/proxy/lb_policy.go b/internal/proxy/lb_policy.go index e0bb9794d2eff..91f80ffb14102 100644 --- a/internal/proxy/lb_policy.go +++ b/internal/proxy/lb_policy.go @@ -98,6 +98,22 @@ func (lb *LBPolicyImpl) Start(ctx context.Context) { } } +func (lb *LBPolicyImpl) GetShardLeaders(ctx context.Context, dbName string, collName string, collectionID int64, withCache bool) (map[string][]nodeInfo, error) { + var shardLeaders map[string][]nodeInfo + // use retry to handle query coord service not ready + err := retry.Handle(ctx, func() (bool, error) { + var err error + shardLeaders, err = globalMetaCache.GetShards(ctx, withCache, dbName, collName, collectionID) + if err != nil { + return true, err + } + + return false, nil + }) + + return shardLeaders, err +} + // try to select the best node from the available nodes func (lb *LBPolicyImpl) selectNode(ctx context.Context, balancer LBBalancer, workload ChannelWorkload, excludeNodes typeutil.UniqueSet) (int64, error) { availableNodes := lo.FilterMap(workload.shardLeaders, func(node int64, _ int) (int64, bool) { return node, !excludeNodes.Contain(node) }) @@ -105,7 +121,7 @@ func (lb *LBPolicyImpl) selectNode(ctx context.Context, balancer LBBalancer, wor if err != nil { log := log.Ctx(ctx) globalMetaCache.DeprecateShardCache(workload.db, workload.collectionName) - shardLeaders, err := globalMetaCache.GetShards(ctx, false, workload.db, workload.collectionName, workload.collectionID) + shardLeaders, err := lb.GetShardLeaders(ctx, workload.db, workload.collectionName, workload.collectionID, false) if err != nil { log.Warn("failed to get shard delegator", zap.Int64("collectionID", workload.collectionID), @@ -195,7 +211,7 @@ func (lb *LBPolicyImpl) ExecuteWithRetry(ctx context.Context, workload ChannelWo // Execute will execute collection workload in parallel func (lb *LBPolicyImpl) Execute(ctx context.Context, workload CollectionWorkLoad) error { - dml2leaders, err := globalMetaCache.GetShards(ctx, true, workload.db, workload.collectionName, workload.collectionID) + dml2leaders, err := lb.GetShardLeaders(ctx, workload.db, workload.collectionName, workload.collectionID, true) if err != nil { log.Ctx(ctx).Warn("failed to get shards", zap.Error(err)) return err diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index 16d77bce503bb..870b6ce12df02 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -1001,9 +1001,7 @@ func (m *MetaCache) GetShards(ctx context.Context, withCache bool, database, col if _, ok := m.collLeader[database]; !ok { m.collLeader[database] = make(map[string]*shardLeaders) } - m.collLeader[database][collectionName] = newShardLeaders - m.leaderMut.Unlock() iterator := newShardLeaders.GetReader() ret := iterator.Shuffle() @@ -1013,8 +1011,10 @@ func (m *MetaCache) GetShards(ctx context.Context, withCache bool, database, col oldLeaders = cacheShardLeaders.shardLeaders } // update refcnt in shardClientMgr - // and create new client for new leaders + // update shard leader's just create a empty client pool + // and init new client will be execute in getClient _ = m.shardMgr.UpdateShardLeaders(oldLeaders, ret) + m.leaderMut.Unlock() metrics.ProxyUpdateCacheLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return ret, nil diff --git a/internal/proxy/shard_client.go b/internal/proxy/shard_client.go index 237b3b3e8a0a3..8fbe0f7202b03 100644 --- a/internal/proxy/shard_client.go +++ b/internal/proxy/shard_client.go @@ -31,26 +31,35 @@ var errClosed = errors.New("client is closed") type shardClient struct { sync.RWMutex info nodeInfo - client types.QueryNodeClient isClosed bool refCnt int clients []types.QueryNodeClient idx atomic.Int64 poolSize int pooling bool + + creator queryNodeCreatorFunc } func (n *shardClient) getClient(ctx context.Context) (types.QueryNodeClient, error) { + if len(n.clients) == 0 { + n.Lock() + if len(n.clients) == 0 { + if err := n.initClients(); err != nil { + return nil, err + } + } + n.Unlock() + } + n.RLock() defer n.RUnlock() if n.isClosed { return nil, errClosed } - if n.pooling { - idx := n.idx.Inc() - return n.clients[int(idx)%n.poolSize], nil - } - return n.client, nil + + idx := n.idx.Inc() + return n.clients[int(idx)%n.poolSize], nil } func (n *shardClient) inc() { @@ -65,12 +74,13 @@ func (n *shardClient) inc() { func (n *shardClient) close() { n.isClosed = true n.refCnt = 0 - if n.client != nil { - if err := n.client.Close(); err != nil { + + for _, client := range n.clients { + if err := client.Close(); err != nil { log.Warn("close grpc client failed", zap.Error(err)) } - n.client = nil } + n.clients = nil } func (n *shardClient) dec() bool { @@ -94,41 +104,35 @@ func (n *shardClient) Close() { n.close() } -func newShardClient(info *nodeInfo, client types.QueryNodeClient) *shardClient { - ret := &shardClient{ +func newPoolingShardClient(info *nodeInfo, creator queryNodeCreatorFunc) (*shardClient, error) { + return &shardClient{ info: nodeInfo{ nodeID: info.nodeID, address: info.address, }, - client: client, - refCnt: 1, - } - return ret + refCnt: 1, + pooling: true, + creator: creator, + }, nil } -func newPoolingShardClient(info *nodeInfo, creator queryNodeCreatorFunc) (*shardClient, error) { +func (n *shardClient) initClients() error { num := paramtable.Get().ProxyCfg.QueryNodePoolingSize.GetAsInt() if num <= 0 { num = 1 } clients := make([]types.QueryNodeClient, 0, num) for i := 0; i < num; i++ { - client, err := creator(context.Background(), info.address, info.nodeID) + client, err := n.creator(context.Background(), n.info.address, n.info.nodeID) if err != nil { - return nil, err + return errors.Wrap(err, fmt.Sprintf("create client for node=%d failed", n.info.nodeID)) } clients = append(clients, client) } - return &shardClient{ - info: nodeInfo{ - nodeID: info.nodeID, - address: info.address, - }, - refCnt: 1, - pooling: true, - clients: clients, - poolSize: num, - }, nil + + n.clients = clients + n.poolSize = num + return nil } type shardClientMgr interface { From fee94de628766d70e54cd07492fe3f65ecb2945d Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Mon, 28 Oct 2024 14:03:06 +0800 Subject: [PATCH 2/3] fix data race Signed-off-by: Wei Liu --- internal/proxy/shard_client.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/proxy/shard_client.go b/internal/proxy/shard_client.go index 8fbe0f7202b03..369a16a69f151 100644 --- a/internal/proxy/shard_client.go +++ b/internal/proxy/shard_client.go @@ -38,16 +38,18 @@ type shardClient struct { poolSize int pooling bool - creator queryNodeCreatorFunc + initialized atomic.Bool + creator queryNodeCreatorFunc } func (n *shardClient) getClient(ctx context.Context) (types.QueryNodeClient, error) { - if len(n.clients) == 0 { + if !n.initialized.Load() { n.Lock() - if len(n.clients) == 0 { + if !n.initialized.Load() { if err := n.initClients(); err != nil { return nil, err } + n.initialized.Store(true) } n.Unlock() } From c5262995519f7df0fe49c79bd4f5d32749b8f594 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Mon, 28 Oct 2024 16:09:51 +0800 Subject: [PATCH 3/3] fix review comment Signed-off-by: Wei Liu --- internal/proxy/shard_client.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/proxy/shard_client.go b/internal/proxy/shard_client.go index 369a16a69f151..dd0b36e494062 100644 --- a/internal/proxy/shard_client.go +++ b/internal/proxy/shard_client.go @@ -127,6 +127,10 @@ func (n *shardClient) initClients() error { for i := 0; i < num; i++ { client, err := n.creator(context.Background(), n.info.address, n.info.nodeID) if err != nil { + // roll back already created clients + for _, c := range clients[:i] { + c.Close() + } return errors.Wrap(err, fmt.Sprintf("create client for node=%d failed", n.info.nodeID)) } clients = append(clients, client)