Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#7443
Browse files Browse the repository at this point in the history
close tikv#7416

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
CabinfeverB authored and ti-chi-bot committed Nov 29, 2023
1 parent b092996 commit de73256
Show file tree
Hide file tree
Showing 6 changed files with 3,499 additions and 1 deletion.
4 changes: 3 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,16 +488,18 @@ func (c *client) checkLeaderHealth(ctx context.Context) {
if cc, ok := c.clientConns.Load(c.GetLeaderAddr()); ok {
healthCli := healthpb.NewHealthClient(cc.(*grpc.ClientConn))
resp, err := healthCli.Check(ctx, &healthpb.HealthCheckRequest{Service: ""})
rpcErr, ok := status.FromError(err)
failpoint.Inject("unreachableNetwork1", func() {
resp = nil
err = status.New(codes.Unavailable, "unavailable").Err()
})
rpcErr, ok := status.FromError(err)
if (ok && isNetworkError(rpcErr.Code())) || resp.GetStatus() != healthpb.HealthCheckResponse_SERVING {
atomic.StoreInt32(&(c.leaderNetworkFailure), int32(1))
} else {
atomic.StoreInt32(&(c.leaderNetworkFailure), int32(0))
}
} else {
atomic.StoreInt32(&(c.leaderNetworkFailure), int32(1))
}
}

Expand Down
44 changes: 44 additions & 0 deletions client/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ import (
"crypto/tls"
"net/url"

<<<<<<< HEAD
=======
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
>>>>>>> 54bf70e45 (client: update the leader even if the connection creation fails (#7443))
"github.com/tikv/pd/client/errs"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -64,3 +70,41 @@ func BuildForwardContext(ctx context.Context, addr string) context.Context {
md := metadata.Pairs(ForwardMetadataKey, addr)
return metadata.NewOutgoingContext(ctx, md)
}
<<<<<<< HEAD
=======

// GetOrCreateGRPCConn returns the corresponding grpc client connection of the given addr.
// Returns the old one if's already existed in the clientConns; otherwise creates a new one and returns it.
func GetOrCreateGRPCConn(ctx context.Context, clientConns *sync.Map, addr string, tlsCfg *tlsutil.TLSConfig, opt ...grpc.DialOption) (*grpc.ClientConn, error) {
conn, ok := clientConns.Load(addr)
if ok {
// TODO: check the connection state.
return conn.(*grpc.ClientConn), nil
}
tlsConfig, err := tlsCfg.ToTLSConfig()
if err != nil {
return nil, err
}
dCtx, cancel := context.WithTimeout(ctx, dialTimeout)
defer cancel()
cc, err := GetClientConn(dCtx, addr, tlsConfig, opt...)
failpoint.Inject("unreachableNetwork2", func(val failpoint.Value) {
if val, ok := val.(string); ok && val == addr {
cc = nil
err = errors.Errorf("unreachable network")
}
})
if err != nil {
return nil, err
}
conn, loaded := clientConns.LoadOrStore(addr, cc)
if !loaded {
// Successfully stored the connection.
return cc, nil
}
cc.Close()
cc = conn.(*grpc.ClientConn)
log.Debug("use existing connection", zap.String("target", cc.Target()), zap.String("state", cc.GetState().String()))
return cc, nil
}
>>>>>>> 54bf70e45 (client: update the leader even if the connection creation fails (#7443))
Loading

0 comments on commit de73256

Please sign in to comment.