diff --git a/internal/net/grpc/client.go b/internal/net/grpc/client.go index df9f84ddd0..dc0da88de5 100644 --- a/internal/net/grpc/client.go +++ b/internal/net/grpc/client.go @@ -259,10 +259,11 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error, disconnectTargets = append(disconnectTargets, addr) return true } - // for health check we don't need to reconnect when pool is healthy - if p.IsHealthy(ctx) { + // for health check we don't need to reconnect when ip connection pool is healthy + if p.IsHealthy(ctx) && p.IsIPConn() { return true } + // if connection is not ip direct or unhealthy let's re-connect var err error // if not healthy we should try reconnect p, err = p.Reconnect(ctx, false) diff --git a/internal/net/grpc/pool/pool.go b/internal/net/grpc/pool/pool.go index 16980f4341..e1576aaf65 100644 --- a/internal/net/grpc/pool/pool.go +++ b/internal/net/grpc/pool/pool.go @@ -343,7 +343,10 @@ func (p *pool) Connect(ctx context.Context) (c Conn, err error) { if err != nil { return p.singleTargetConnect(ctx) } + return p.connect(ctx, ips...) +} +func (p *pool) connect(ctx context.Context, ips ...string) (c Conn, err error) { if uint64(len(ips)) > p.Size() { p.grow(uint64(len(ips))) } @@ -385,6 +388,15 @@ func (p *pool) Reconnect(ctx context.Context, force bool) (c Conn, err error) { healthy := p.IsHealthy(ctx) if healthy { + if !p.isIP && p.resolveDNS && hash != nil && *hash != "" { + ips, err := p.lookupIPAddr(ctx) + if err != nil { + return p, nil + } + if *hash != strings.Join(ips, "-") { + return p.connect(ctx, ips...) + } + } return p, nil } @@ -499,24 +511,42 @@ func (p *pool) IsHealthy(ctx context.Context) (healthy bool) { if p == nil || p.closing.Load() { return false } - var cnt int + var cnt, unhealthy int pl := p.len() - unhealthy := pl err := p.loop(ctx, func(ctx context.Context, idx int, pc *poolConn) bool { if pc == nil || !isHealthy(pc.conn) { if p.isIP { + if pc != nil && pc.addr != "" { + err := p.refreshConn(ctx, idx, pc, pc.addr) + if err != nil { + // target addr cannot re-connect so, connection is unhealthy + unhealthy++ + return false + } + return true + } return false } addr := p.addr if pc != nil { addr = pc.addr } + // re-connect to last connected addr err := p.refreshConn(ctx, idx, pc, addr) if err != nil { - return true + if addr == p.addr { + unhealthy++ + return true + } + // last connect addr is not dns and cannot connect then try dns + err = p.refreshConn(ctx, idx, pc, p.addr) + // dns addr cannot connect so, connection is unhealthy + if err != nil { + unhealthy = pl - cnt + return false + } } } - unhealthy-- cnt++ return true }) @@ -525,9 +555,15 @@ func (p *pool) IsHealthy(ctx context.Context) (healthy bool) { } if cnt == 0 { log.Debugf("no connection pool %d/%d found for %s,\thealthy %d/%d", cnt, pl, p.addr, pl-unhealthy, pl) - return cnt != 0 && unhealthy == 0 + return false } - return unhealthy == 0 + if p.isIP { + // if ip pool connection, each connection target should be healthy + return unhealthy == 0 + } + + // some pool target may unhealthy but pool client is healthy when unhealthy is less than pool length + return unhealthy < pl } func (p *pool) Do(ctx context.Context, f func(conn *ClientConn) error) error {