diff --git a/internal/net/grpc/pool/pool.go b/internal/net/grpc/pool/pool.go index d54fcf0e32..34c17bca02 100644 --- a/internal/net/grpc/pool/pool.go +++ b/internal/net/grpc/pool/pool.go @@ -284,7 +284,7 @@ func (p *pool) flush() { } func (p *pool) refreshConn(ctx context.Context, idx int, pc *poolConn, addr string) (err error) { - if pc != nil && pc.addr == addr && isHealthy(pc.conn) { + if pc != nil && pc.addr == addr && isHealthy(ctx, pc.conn) { return nil } if pc != nil { @@ -295,7 +295,7 @@ func (p *pool) refreshConn(ctx context.Context, idx int, pc *poolConn, addr stri conn, err := p.dial(ctx, addr) if err != nil { if pc != nil { - if isHealthy(pc.conn) { + if isHealthy(ctx, pc.conn) { log.Debugf("dialing new connection to %s failed,\terror: %v,\tbut existing connection to %s is healthy will keep existing connection", addr, err, pc.addr) return nil } @@ -478,7 +478,7 @@ func (p *pool) dial(ctx context.Context, addr string) (conn *ClientConn, err err log.Debugf("failed to dial gRPC connection to %s,\terror: %v", addr, err) return nil, err } - if !isHealthy(conn) { + if !isHealthy(ctx, conn) { if conn != nil { err = conn.Close() if err != nil && !errors.Is(err, grpc.ErrClientConnClosing) { @@ -514,7 +514,7 @@ func (p *pool) IsHealthy(ctx context.Context) (healthy bool) { var cnt, unhealthy int pl := p.len() err := p.loop(ctx, func(ctx context.Context, idx int, pc *poolConn) bool { - if pc == nil || !isHealthy(pc.conn) { + if pc == nil || !isHealthy(ctx, pc.conn) { if p.isIP { if pc != nil && pc.addr != "" { err := p.refreshConn(ctx, idx, pc, pc.addr) @@ -603,11 +603,11 @@ func (p *pool) getHealthyConn(ctx context.Context, cnt, retry uint64) (*ClientCo if pl > 0 { idx = int(p.current.Add(1) % pl) } - if pc := p.load(idx); pc != nil && isHealthy(pc.conn) { + if pc := p.load(idx); pc != nil && isHealthy(ctx, pc.conn) { return pc.conn, true } conn, err := p.dial(ctx, p.addr) - if err == nil && conn != nil && isHealthy(conn) { + if err == nil && conn != nil && isHealthy(ctx, conn) { p.store(idx, &poolConn{ conn: conn, addr: p.addr, @@ -619,7 +619,7 @@ func (p *pool) getHealthyConn(ctx context.Context, cnt, retry uint64) (*ClientCo } if pl > 0 { - if pc := p.load(int(p.current.Add(1) % pl)); pc != nil && isHealthy(pc.conn) { + if pc := p.load(int(p.current.Add(1) % pl)); pc != nil && isHealthy(ctx, pc.conn) { return pc.conn, true } } @@ -699,7 +699,7 @@ func (p *pool) scanGRPCPort(ctx context.Context) (port uint16, err error) { net.JoinHostPort(p.host, port), append(p.dopts, grpc.WithBlock())...) - if err == nil && isHealthy(conn) && conn.Close() == nil { + if err == nil && isHealthy(ctx, conn) && conn.Close() == nil { // if no error and healthy the port is ready for gRPC return port, nil } @@ -784,7 +784,7 @@ func (pc *poolConn) Close(ctx context.Context, delay time.Duration) error { } } -func isHealthy(conn *ClientConn) bool { +func isHealthy(ctx context.Context, conn *ClientConn) bool { if conn == nil { log.Warn("gRPC target connection is nil") return false @@ -797,7 +797,15 @@ func isHealthy(conn *ClientConn) bool { log.Debugf("gRPC target %s's connection status will be Ready soon\tstatus: %s", conn.Target(), state.String()) return true case connectivity.Idle: - log.Warnf("gRPC target %s's connection status is waiting for target\tstatus: %s", conn.Target(), state.String()) + log.Debugf("gRPC target %s's connection status is waiting for target\tstatus: %s", conn.Target(), state.String()) + conn.Connect() + if conn.WaitForStateChange(ctx, state) { + state = conn.GetState() + if state == connectivity.Ready || state == connectivity.Connecting { + log.Debugf("gRPC target %s's connection status enabled for target\tstatus: %s", conn.Target(), state.String()) + return true + } + } return false case connectivity.Shutdown, connectivity.TransientFailure: log.Errorf("gRPC target %s's connection status is unhealthy\tstatus: %s", conn.Target(), state.String())