From bae2cda3a5db6cbc69a34587556a44a9c8c87475 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Thu, 8 Feb 2018 13:42:47 -0500 Subject: [PATCH] rpc: Perform initial-heartbeat validation on GRPC reconnections GRPC will transparently reconnect when a connection fails, but if the next process to use that port is not a part of the same cluster, this leads to confusing errors and potential data corruption. (this is most common in tests, but it can also occur in other situations). This change disables grpc's automatic reconnections so that in the event of a failed connection, we go through our full dialing process including an initial heartbeat that validates certain parameters. Fixes #20537 Release note (bug fix): Implement additional safeguards against RPC connections between nodes that belong to different clusters. --- pkg/acceptance/localcluster/cluster.go | 2 +- pkg/rpc/context.go | 75 +++++++++++++++++++------- pkg/rpc/context_test.go | 14 ++++- 3 files changed, 70 insertions(+), 21 deletions(-) diff --git a/pkg/acceptance/localcluster/cluster.go b/pkg/acceptance/localcluster/cluster.go index c989e8b3b1b3..009a72214873 100644 --- a/pkg/acceptance/localcluster/cluster.go +++ b/pkg/acceptance/localcluster/cluster.go @@ -479,7 +479,7 @@ func (n *Node) StatusClient() serverpb.StatusClient { return existingClient } - conn, err := n.rpcCtx.GRPCDialRaw(n.RPCAddr()) + conn, _, err := n.rpcCtx.GRPCDialRaw(n.RPCAddr()) if err != nil { log.Fatalf(context.Background(), "failed to initialize status client: %s", err) } diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index c779b6afd05e..5a2711e11986 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" ) @@ -65,6 +66,8 @@ const ( initialConnWindowSize = initialWindowSize * 16 // for a connection ) +var errRedialing = errors.New("redialing") + // sourceAddr is the environment-provided local address for outgoing // connections. var sourceAddr = func() net.Addr { @@ -423,13 +426,43 @@ func (ctx *Context) GRPCDialOptions() ([]grpc.DialOption, error) { return dialOpts, nil } +// onlyOnceDialer implements the grpc.WithDialer interface but only +// allows a single connection attempt. If a reconnection is attempted, +// redialChan is closed to signal a higher-level retry loop. This +// ensures that our initial heartbeat (and its version/clusterID +// validation) occurs on every new connection. +type onlyOnceDialer struct { + syncutil.Mutex + dialed bool + closed bool + redialChan chan struct{} +} + +func (ood *onlyOnceDialer) dial(addr string, timeout time.Duration) (net.Conn, error) { + ood.Lock() + defer ood.Unlock() + if !ood.dialed { + ood.dialed = true + dialer := net.Dialer{ + Timeout: timeout, + LocalAddr: sourceAddr, + } + return dialer.Dial("tcp", addr) + } else if !ood.closed { + ood.closed = true + close(ood.redialChan) + } + return nil, errRedialing +} + // GRPCDialRaw calls grpc.Dial with options appropriate for the context. // Unlike GRPCDial, it does not start an RPC heartbeat to validate the -// connection. -func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, error) { +// connection. This connection will not be reconnected automatically; +// the returned channel is closed when a reconnection is attempted. +func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, <-chan struct{}, error) { dialOpts, err := ctx.GRPCDialOptions() if err != nil { - return nil, err + return nil, nil, err } // Add a stats handler to measure client network stats. @@ -440,24 +473,22 @@ func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, error) { dialOpts = append(dialOpts, grpc.WithInitialWindowSize(initialWindowSize), grpc.WithInitialConnWindowSize(initialConnWindowSize)) - dialOpts = append(dialOpts, ctx.testingDialOpts...) - if sourceAddr != nil { - dialOpts = append(dialOpts, grpc.WithDialer( - func(addr string, timeout time.Duration) (net.Conn, error) { - dialer := net.Dialer{ - Timeout: timeout, - LocalAddr: sourceAddr, - } - return dialer.Dial("tcp", addr) - }, - )) + dialer := onlyOnceDialer{ + redialChan: make(chan struct{}), } + dialOpts = append(dialOpts, grpc.WithDialer(dialer.dial)) + + // add testingDialOpts after our dialer because one of our tests + // uses a custom dialer (this disables the only-one-connection + // behavior and redialChan will never be closed). + dialOpts = append(dialOpts, ctx.testingDialOpts...) if log.V(1) { log.Infof(ctx.masterCtx, "dialing %s", target) } - return grpc.DialContext(ctx.masterCtx, target, dialOpts...) + conn, err := grpc.DialContext(ctx.masterCtx, target, dialOpts...) + return conn, dialer.redialChan, err } // GRPCDial calls grpc.Dial with options appropriate for the context. @@ -469,7 +500,8 @@ func (ctx *Context) GRPCDial(target string) *Connection { conn := value.(*Connection) conn.initOnce.Do(func() { - conn.grpcConn, conn.dialErr = ctx.GRPCDialRaw(target) + var redialChan <-chan struct{} + conn.grpcConn, redialChan, conn.dialErr = ctx.GRPCDialRaw(target) if ctx.GetLocalInternalServerForAddr(target) != nil { conn.heartbeatResult.Store(heartbeatResult{err: nil, everSucceeded: true}) conn.setInitialHeartbeatDone() @@ -479,7 +511,7 @@ func (ctx *Context) GRPCDial(target string) *Connection { if err := ctx.Stopper.RunTask( ctx.masterCtx, "rpc.Context: grpc heartbeat", func(masterCtx context.Context) { ctx.Stopper.RunWorker(masterCtx, func(masterCtx context.Context) { - err := ctx.runHeartbeat(conn, target) + err := ctx.runHeartbeat(conn, target, redialChan) if err != nil && !grpcutil.IsClosedConnection(err) { log.Errorf(masterCtx, "removing connection to %s due to error: %s", target, err) } @@ -505,7 +537,8 @@ func (ctx *Context) NewBreaker() *circuit.Breaker { } // ErrNotConnected is returned by ConnHealth when there is no connection to the -// host (e.g. GRPCDial was never called for that address). +// host (e.g. GRPCDial was never called for that address, or a connection has +// been closed and not reconnected). var ErrNotConnected = errors.New("not connected") // ErrNotHeartbeated is returned by ConnHealth when we have not yet performed @@ -526,7 +559,9 @@ func (ctx *Context) ConnHealth(target string) error { return ErrNotConnected } -func (ctx *Context) runHeartbeat(conn *Connection, target string) error { +func (ctx *Context) runHeartbeat( + conn *Connection, target string, redialChan <-chan struct{}, +) error { maxOffset := ctx.LocalClock.MaxOffset() clusterID := ctx.ClusterID.Get() @@ -546,6 +581,8 @@ func (ctx *Context) runHeartbeat(conn *Connection, target string) error { everSucceeded := false for { select { + case <-redialChan: + return errRedialing case <-ctx.Stopper.ShouldStop(): return nil case <-heartbeatTimer.C: diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index 02167ff6e5ae..6587fe1ba2d0 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -377,6 +377,14 @@ func TestHeartbeatHealthTransport(t *testing.T) { } isUnhealthy := func(err error) bool { + // Most of the time, an unhealthy connection will get + // ErrNotConnected, but there are brief periods during which we + // could get ErrNotHeartbeated (while we're trying to start a new + // connection) or one of the grpc errors below (while the old + // connection is in the middle of closing). + if err == ErrNotConnected || err == ErrNotHeartbeated { + return true + } // The expected code here is Unavailable, but at least on OSX you can also get // // rpc error: code = Internal desc = connection error: desc = "transport: authentication @@ -417,12 +425,16 @@ func TestHeartbeatHealthTransport(t *testing.T) { if timeutil.Since(then) > 45*time.Second { t.Fatal(err) } + time.Sleep(10 * time.Millisecond) } close(done) - // Should become healthy again after GRPC reconnects. + // We can reconnect and the connection becomes healthy again. testutils.SucceedsSoon(t, func() error { + if _, err := clientCtx.GRPCDial(remoteAddr).Connect(context.Background()); err != nil { + return err + } return clientCtx.ConnHealth(remoteAddr) })