diff --git a/pkg/acceptance/localcluster/cluster.go b/pkg/acceptance/localcluster/cluster.go index b65f8c2c662d..42376bb61c8d 100644 --- a/pkg/acceptance/localcluster/cluster.go +++ b/pkg/acceptance/localcluster/cluster.go @@ -479,7 +479,7 @@ func (n *Node) StatusClient() serverpb.StatusClient { return existingClient } - conn, err := n.rpcCtx.GRPCDialRaw(n.RPCAddr()) + conn, _, err := n.rpcCtx.GRPCDialRaw(n.RPCAddr()) if err != nil { log.Fatalf(context.Background(), "failed to initialize status client: %s", err) } diff --git a/pkg/cli/interactive_tests/common.tcl b/pkg/cli/interactive_tests/common.tcl index 21df4b6c38cc..063cd11a6600 100644 --- a/pkg/cli/interactive_tests/common.tcl +++ b/pkg/cli/interactive_tests/common.tcl @@ -102,11 +102,7 @@ proc stop_server {argv} { # Trigger a normal shutdown. system "$argv quit" # If after 5 seconds the server hasn't shut down, trigger an error. - - ## Re-enable this once issue #22536 is fixed. - # system "for i in `seq 1 5`; do kill -CONT `cat server_pid` 2>/dev/null || exit 0; echo still waiting; sleep 1; done; echo 'server still running?'; exit 0" - # Until #22536 is fixed use a violent kill. - system "for i in `seq 1 5`; do kill -CONT `cat server_pid` 2>/dev/null || exit 0; echo still waiting; sleep 1; done; echo 'server still running?'; kill -9 `cat server_pid`; exit 0" + system "for i in `seq 1 5`; do kill -CONT `cat server_pid` 2>/dev/null || exit 0; echo still waiting; sleep 1; done; echo 'server still running?'; exit 0" report "END STOP SERVER" } diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index c779b6afd05e..25b17dd0cbba 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" ) @@ -423,13 +424,43 @@ func (ctx *Context) GRPCDialOptions() ([]grpc.DialOption, error) { return dialOpts, nil } +// onlyOnceDialer implements the grpc.WithDialer interface but only +// allows a single connection attempt. If a reconnection is attempted, +// redialChan is closed to signal a higher-level retry loop. This +// ensures that our initial heartbeat (and its version/clusterID +// validation) occurs on every new connection. +type onlyOnceDialer struct { + syncutil.Mutex + dialed bool + closed bool + redialChan chan struct{} +} + +func (ood *onlyOnceDialer) dial(addr string, timeout time.Duration) (net.Conn, error) { + ood.Lock() + defer ood.Unlock() + if !ood.dialed { + ood.dialed = true + dialer := net.Dialer{ + Timeout: timeout, + LocalAddr: sourceAddr, + } + return dialer.Dial("tcp", addr) + } else if !ood.closed { + ood.closed = true + close(ood.redialChan) + } + return nil, grpcutil.ErrCannotReuseClientConn +} + // GRPCDialRaw calls grpc.Dial with options appropriate for the context. // Unlike GRPCDial, it does not start an RPC heartbeat to validate the -// connection. -func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, error) { +// connection. This connection will not be reconnected automatically; +// the returned channel is closed when a reconnection is attempted. +func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, <-chan struct{}, error) { dialOpts, err := ctx.GRPCDialOptions() if err != nil { - return nil, err + return nil, nil, err } // Add a stats handler to measure client network stats. @@ -440,24 +471,22 @@ func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, error) { dialOpts = append(dialOpts, grpc.WithInitialWindowSize(initialWindowSize), grpc.WithInitialConnWindowSize(initialConnWindowSize)) - dialOpts = append(dialOpts, ctx.testingDialOpts...) - if sourceAddr != nil { - dialOpts = append(dialOpts, grpc.WithDialer( - func(addr string, timeout time.Duration) (net.Conn, error) { - dialer := net.Dialer{ - Timeout: timeout, - LocalAddr: sourceAddr, - } - return dialer.Dial("tcp", addr) - }, - )) + dialer := onlyOnceDialer{ + redialChan: make(chan struct{}), } + dialOpts = append(dialOpts, grpc.WithDialer(dialer.dial)) + + // add testingDialOpts after our dialer because one of our tests + // uses a custom dialer (this disables the only-one-connection + // behavior and redialChan will never be closed). + dialOpts = append(dialOpts, ctx.testingDialOpts...) if log.V(1) { log.Infof(ctx.masterCtx, "dialing %s", target) } - return grpc.DialContext(ctx.masterCtx, target, dialOpts...) + conn, err := grpc.DialContext(ctx.masterCtx, target, dialOpts...) + return conn, dialer.redialChan, err } // GRPCDial calls grpc.Dial with options appropriate for the context. @@ -469,7 +498,8 @@ func (ctx *Context) GRPCDial(target string) *Connection { conn := value.(*Connection) conn.initOnce.Do(func() { - conn.grpcConn, conn.dialErr = ctx.GRPCDialRaw(target) + var redialChan <-chan struct{} + conn.grpcConn, redialChan, conn.dialErr = ctx.GRPCDialRaw(target) if ctx.GetLocalInternalServerForAddr(target) != nil { conn.heartbeatResult.Store(heartbeatResult{err: nil, everSucceeded: true}) conn.setInitialHeartbeatDone() @@ -479,7 +509,7 @@ func (ctx *Context) GRPCDial(target string) *Connection { if err := ctx.Stopper.RunTask( ctx.masterCtx, "rpc.Context: grpc heartbeat", func(masterCtx context.Context) { ctx.Stopper.RunWorker(masterCtx, func(masterCtx context.Context) { - err := ctx.runHeartbeat(conn, target) + err := ctx.runHeartbeat(conn, target, redialChan) if err != nil && !grpcutil.IsClosedConnection(err) { log.Errorf(masterCtx, "removing connection to %s due to error: %s", target, err) } @@ -505,7 +535,8 @@ func (ctx *Context) NewBreaker() *circuit.Breaker { } // ErrNotConnected is returned by ConnHealth when there is no connection to the -// host (e.g. GRPCDial was never called for that address). +// host (e.g. GRPCDial was never called for that address, or a connection has +// been closed and not reconnected). var ErrNotConnected = errors.New("not connected") // ErrNotHeartbeated is returned by ConnHealth when we have not yet performed @@ -526,7 +557,9 @@ func (ctx *Context) ConnHealth(target string) error { return ErrNotConnected } -func (ctx *Context) runHeartbeat(conn *Connection, target string) error { +func (ctx *Context) runHeartbeat( + conn *Connection, target string, redialChan <-chan struct{}, +) error { maxOffset := ctx.LocalClock.MaxOffset() clusterID := ctx.ClusterID.Get() @@ -546,6 +579,8 @@ func (ctx *Context) runHeartbeat(conn *Connection, target string) error { everSucceeded := false for { select { + case <-redialChan: + return grpcutil.ErrCannotReuseClientConn case <-ctx.Stopper.ShouldStop(): return nil case <-heartbeatTimer.C: diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index 02167ff6e5ae..6587fe1ba2d0 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -377,6 +377,14 @@ func TestHeartbeatHealthTransport(t *testing.T) { } isUnhealthy := func(err error) bool { + // Most of the time, an unhealthy connection will get + // ErrNotConnected, but there are brief periods during which we + // could get ErrNotHeartbeated (while we're trying to start a new + // connection) or one of the grpc errors below (while the old + // connection is in the middle of closing). + if err == ErrNotConnected || err == ErrNotHeartbeated { + return true + } // The expected code here is Unavailable, but at least on OSX you can also get // // rpc error: code = Internal desc = connection error: desc = "transport: authentication @@ -417,12 +425,16 @@ func TestHeartbeatHealthTransport(t *testing.T) { if timeutil.Since(then) > 45*time.Second { t.Fatal(err) } + time.Sleep(10 * time.Millisecond) } close(done) - // Should become healthy again after GRPC reconnects. + // We can reconnect and the connection becomes healthy again. testutils.SucceedsSoon(t, func() error { + if _, err := clientCtx.GRPCDial(remoteAddr).Connect(context.Background()); err != nil { + return err + } return clientCtx.ConnHealth(remoteAddr) }) diff --git a/pkg/util/grpcutil/grpc_util.go b/pkg/util/grpcutil/grpc_util.go index 0ef64044c094..d73f45399b31 100644 --- a/pkg/util/grpcutil/grpc_util.go +++ b/pkg/util/grpcutil/grpc_util.go @@ -30,6 +30,11 @@ import ( "google.golang.org/grpc/transport" ) +// ErrCannotReuseClientConn is returned when a failed connection is +// being reused. We require that new connections be created with +// pkg/rpc.GRPCDial instead. +var ErrCannotReuseClientConn = errors.New("cannot reuse client connection") + type localRequestKey struct{} // NewLocalRequestContext returns a Context that can be used for local (in-process) requests. @@ -46,6 +51,9 @@ func IsLocalRequestContext(ctx context.Context) bool { // on closed connections. func IsClosedConnection(err error) bool { err = errors.Cause(err) + if err == ErrCannotReuseClientConn { + return true + } if s, ok := status.FromError(err); ok { if s.Code() == codes.Canceled || s.Code() == codes.Unavailable ||