From a79b1b9406e83e7a1293cffff24c9918b46084fc Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Sun, 31 Jul 2016 16:35:56 -0400 Subject: [PATCH] rpc: mark heartbeat RPCs as fail-fast We want heartbeat RPCs to be fail-fast so that we get notified of transport failures and close the connection. Failure to do this left the gRPC client connections permanently open and trying to reconnect to down nodes, fouling up the circuit breaker expectations in Raft transport. Changed client.NewSender to create a separate gRPC connection which will not be heartbeat by the rpc.Context. We don't want the heartbeat service and we don't want these connections closed. Fixes #8130 --- internal/client/rpc_sender.go | 13 ++++++++++++- rpc/context.go | 31 +++++++++++++++++++++---------- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/internal/client/rpc_sender.go b/internal/client/rpc_sender.go index dedd1cd3103b..22345931e74d 100644 --- a/internal/client/rpc_sender.go +++ b/internal/client/rpc_sender.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/roachpb" "github.com/cockroachdb/cockroach/rpc" + "github.com/cockroachdb/cockroach/util/stop" "github.com/pkg/errors" ) @@ -33,10 +34,20 @@ type sender struct { // database provided by a Cockroach cluster by connecting via RPC to a // Cockroach node. func NewSender(ctx *rpc.Context, target string) (Sender, error) { - conn, err := ctx.GRPCDial(target) + // We don't use ctx.GRPCDial because this is an external client connection + // and we don't want to run the heartbeat service which will close the + // connection if the transport fails. + dialOpt, err := ctx.GRPCDialOption() if err != nil { return nil, err } + conn, err := grpc.Dial(target, dialOpt) + if err != nil { + return nil, err + } + ctx.Stopper.AddCloser(stop.CloserFn(func() { + _ = conn.Close() // we're closing, ignore the error + })) return sender{roachpb.NewExternalClient(conn)}, nil } diff --git a/rpc/context.go b/rpc/context.go index 0c6d0a56700d..dabf2149bdda 100644 --- a/rpc/context.go +++ b/rpc/context.go @@ -146,15 +146,9 @@ func (ctx *Context) GRPCDial(target string, opts ...grpc.DialOption) (*grpc.Clie return meta.conn, nil } - var dialOpt grpc.DialOption - if ctx.Insecure { - dialOpt = grpc.WithInsecure() - } else { - tlsConfig, err := ctx.GetClientTLSConfig() - if err != nil { - return nil, err - } - dialOpt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) + dialOpt, err := ctx.GRPCDialOption() + if err != nil { + return nil, err } dialOpts := make([]grpc.DialOption, 0, 1+len(opts)) @@ -184,6 +178,21 @@ func (ctx *Context) GRPCDial(target string, opts ...grpc.DialOption) (*grpc.Clie return conn, err } +// GRPCDialOption returns the GRPC dialing option appropriate for the context. +func (ctx *Context) GRPCDialOption() (grpc.DialOption, error) { + var dialOpt grpc.DialOption + if ctx.Insecure { + dialOpt = grpc.WithInsecure() + } else { + tlsConfig, err := ctx.GetClientTLSConfig() + if err != nil { + return dialOpt, err + } + dialOpt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) + } + return dialOpt, nil +} + // setConnHealthy sets the health status of the connection. func (ctx *Context) setConnHealthy(remoteAddr string, healthy bool) { ctx.conns.Lock() @@ -268,5 +277,7 @@ func (ctx *Context) runHeartbeat(cc *grpc.ClientConn, remoteAddr string) error { func (ctx *Context) heartbeat(heartbeatClient HeartbeatClient, request PingRequest) (*PingResponse, error) { goCtx, cancel := context.WithTimeout(context.Background(), ctx.HeartbeatTimeout) defer cancel() - return heartbeatClient.Ping(goCtx, &request, grpc.FailFast(false)) + // NB: We want the request to fail-fast (the default), otherwise we won't be + // notified of transport failures. + return heartbeatClient.Ping(goCtx, &request) }