Skip to content

Commit

Permalink
rpc: mark heartbeat RPCs as fail-fast
Browse files Browse the repository at this point in the history
We want heartbeat RPCs to be fail-fast so that we get notified of
transport failures and close the connection. Failure to do this left the
gRPC client connections permanently open and trying to reconnect to down
nodes, fouling up the circuit breaker expectations in Raft transport.

Changed client.NewSender to create a separate gRPC connection which will
not be heartbeat by the rpc.Context. We don't want the heartbeat service
and we don't want these connections closed.

Fixes cockroachdb#8130
  • Loading branch information
petermattis committed Aug 1, 2016
1 parent 5c39e1a commit a79b1b9
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 11 deletions.
13 changes: 12 additions & 1 deletion internal/client/rpc_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/rpc"
"github.com/cockroachdb/cockroach/util/stop"
"github.com/pkg/errors"
)

Expand All @@ -33,10 +34,20 @@ type sender struct {
// database provided by a Cockroach cluster by connecting via RPC to a
// Cockroach node.
func NewSender(ctx *rpc.Context, target string) (Sender, error) {
conn, err := ctx.GRPCDial(target)
// We don't use ctx.GRPCDial because this is an external client connection
// and we don't want to run the heartbeat service which will close the
// connection if the transport fails.
dialOpt, err := ctx.GRPCDialOption()
if err != nil {
return nil, err
}
conn, err := grpc.Dial(target, dialOpt)
if err != nil {
return nil, err
}
ctx.Stopper.AddCloser(stop.CloserFn(func() {
_ = conn.Close() // we're closing, ignore the error
}))
return sender{roachpb.NewExternalClient(conn)}, nil
}

Expand Down
31 changes: 21 additions & 10 deletions rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,9 @@ func (ctx *Context) GRPCDial(target string, opts ...grpc.DialOption) (*grpc.Clie
return meta.conn, nil
}

var dialOpt grpc.DialOption
if ctx.Insecure {
dialOpt = grpc.WithInsecure()
} else {
tlsConfig, err := ctx.GetClientTLSConfig()
if err != nil {
return nil, err
}
dialOpt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
dialOpt, err := ctx.GRPCDialOption()
if err != nil {
return nil, err
}

dialOpts := make([]grpc.DialOption, 0, 1+len(opts))
Expand Down Expand Up @@ -184,6 +178,21 @@ func (ctx *Context) GRPCDial(target string, opts ...grpc.DialOption) (*grpc.Clie
return conn, err
}

// GRPCDialOption returns the GRPC dialing option appropriate for the context.
func (ctx *Context) GRPCDialOption() (grpc.DialOption, error) {
var dialOpt grpc.DialOption
if ctx.Insecure {
dialOpt = grpc.WithInsecure()
} else {
tlsConfig, err := ctx.GetClientTLSConfig()
if err != nil {
return dialOpt, err
}
dialOpt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))
}
return dialOpt, nil
}

// setConnHealthy sets the health status of the connection.
func (ctx *Context) setConnHealthy(remoteAddr string, healthy bool) {
ctx.conns.Lock()
Expand Down Expand Up @@ -268,5 +277,7 @@ func (ctx *Context) runHeartbeat(cc *grpc.ClientConn, remoteAddr string) error {
func (ctx *Context) heartbeat(heartbeatClient HeartbeatClient, request PingRequest) (*PingResponse, error) {
goCtx, cancel := context.WithTimeout(context.Background(), ctx.HeartbeatTimeout)
defer cancel()
return heartbeatClient.Ping(goCtx, &request, grpc.FailFast(false))
// NB: We want the request to fail-fast (the default), otherwise we won't be
// notified of transport failures.
return heartbeatClient.Ping(goCtx, &request)
}

0 comments on commit a79b1b9

Please sign in to comment.