Skip to content

Commit

Permalink
[bugfix] gRPC pool connection health check for DNS Addr may fail duri…
Browse files Browse the repository at this point in the history
…ng VIP member disconnection (#2277)

[bugfix] gRPC pool connection health check for DNS Addr may fail during VIP member disconnection

Signed-off-by: kpango <[email protected]>
  • Loading branch information
kpango authored Dec 22, 2023
1 parent d0588fe commit 8dc67ee
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 8 deletions.
5 changes: 3 additions & 2 deletions internal/net/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,11 @@ func (g *gRPCClient) StartConnectionMonitor(ctx context.Context) (<-chan error,
disconnectTargets = append(disconnectTargets, addr)
return true
}
// for health check we don't need to reconnect when pool is healthy
if p.IsHealthy(ctx) {
// for health check we don't need to reconnect when ip connection pool is healthy
if p.IsHealthy(ctx) && p.IsIPConn() {
return true
}
// if connection is not ip direct or unhealthy let's re-connect
var err error
// if not healthy we should try reconnect
p, err = p.Reconnect(ctx, false)
Expand Down
48 changes: 42 additions & 6 deletions internal/net/grpc/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,10 @@ func (p *pool) Connect(ctx context.Context) (c Conn, err error) {
if err != nil {
return p.singleTargetConnect(ctx)
}
return p.connect(ctx, ips...)
}

func (p *pool) connect(ctx context.Context, ips ...string) (c Conn, err error) {
if uint64(len(ips)) > p.Size() {
p.grow(uint64(len(ips)))
}
Expand Down Expand Up @@ -385,6 +388,15 @@ func (p *pool) Reconnect(ctx context.Context, force bool) (c Conn, err error) {

healthy := p.IsHealthy(ctx)
if healthy {
if !p.isIP && p.resolveDNS && hash != nil && *hash != "" {
ips, err := p.lookupIPAddr(ctx)
if err != nil {
return p, nil
}
if *hash != strings.Join(ips, "-") {
return p.connect(ctx, ips...)
}
}
return p, nil
}

Expand Down Expand Up @@ -499,24 +511,42 @@ func (p *pool) IsHealthy(ctx context.Context) (healthy bool) {
if p == nil || p.closing.Load() {
return false
}
var cnt int
var cnt, unhealthy int
pl := p.len()
unhealthy := pl
err := p.loop(ctx, func(ctx context.Context, idx int, pc *poolConn) bool {
if pc == nil || !isHealthy(pc.conn) {
if p.isIP {
if pc != nil && pc.addr != "" {
err := p.refreshConn(ctx, idx, pc, pc.addr)
if err != nil {
// target addr cannot re-connect so, connection is unhealthy
unhealthy++
return false
}
return true
}
return false
}
addr := p.addr
if pc != nil {
addr = pc.addr
}
// re-connect to last connected addr
err := p.refreshConn(ctx, idx, pc, addr)
if err != nil {
return true
if addr == p.addr {
unhealthy++
return true
}
// last connect addr is not dns and cannot connect then try dns
err = p.refreshConn(ctx, idx, pc, p.addr)
// dns addr cannot connect so, connection is unhealthy
if err != nil {
unhealthy = pl - cnt
return false
}
}
}
unhealthy--
cnt++
return true
})
Expand All @@ -525,9 +555,15 @@ func (p *pool) IsHealthy(ctx context.Context) (healthy bool) {
}
if cnt == 0 {
log.Debugf("no connection pool %d/%d found for %s,\thealthy %d/%d", cnt, pl, p.addr, pl-unhealthy, pl)
return cnt != 0 && unhealthy == 0
return false
}
return unhealthy == 0
if p.isIP {
// if ip pool connection, each connection target should be healthy
return unhealthy == 0
}

// some pool target may unhealthy but pool client is healthy when unhealthy is less than pool length
return unhealthy < pl
}

func (p *pool) Do(ctx context.Context, f func(conn *ClientConn) error) error {
Expand Down

0 comments on commit 8dc67ee

Please sign in to comment.