From 9b20d8aa15356b047ab9364b01de24363b063a17 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Mon, 12 Feb 2018 18:45:03 -0500 Subject: [PATCH] Revert "rpc: Perform initial-heartbeat validation on GRPC reconnections" This reverts commit bae2cda3a5db6cbc69a34587556a44a9c8c87475. --- pkg/acceptance/localcluster/cluster.go | 2 +- pkg/rpc/context.go | 75 +++++++------------------- pkg/rpc/context_test.go | 14 +---- 3 files changed, 21 insertions(+), 70 deletions(-) diff --git a/pkg/acceptance/localcluster/cluster.go b/pkg/acceptance/localcluster/cluster.go index 42376bb61c8d..b65f8c2c662d 100644 --- a/pkg/acceptance/localcluster/cluster.go +++ b/pkg/acceptance/localcluster/cluster.go @@ -479,7 +479,7 @@ func (n *Node) StatusClient() serverpb.StatusClient { return existingClient } - conn, _, err := n.rpcCtx.GRPCDialRaw(n.RPCAddr()) + conn, err := n.rpcCtx.GRPCDialRaw(n.RPCAddr()) if err != nil { log.Fatalf(context.Background(), "failed to initialize status client: %s", err) } diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index 5a2711e11986..c779b6afd05e 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -39,7 +39,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" ) @@ -66,8 +65,6 @@ const ( initialConnWindowSize = initialWindowSize * 16 // for a connection ) -var errRedialing = errors.New("redialing") - // sourceAddr is the environment-provided local address for outgoing // connections. var sourceAddr = func() net.Addr { @@ -426,43 +423,13 @@ func (ctx *Context) GRPCDialOptions() ([]grpc.DialOption, error) { return dialOpts, nil } -// onlyOnceDialer implements the grpc.WithDialer interface but only -// allows a single connection attempt. If a reconnection is attempted, -// redialChan is closed to signal a higher-level retry loop. This -// ensures that our initial heartbeat (and its version/clusterID -// validation) occurs on every new connection. -type onlyOnceDialer struct { - syncutil.Mutex - dialed bool - closed bool - redialChan chan struct{} -} - -func (ood *onlyOnceDialer) dial(addr string, timeout time.Duration) (net.Conn, error) { - ood.Lock() - defer ood.Unlock() - if !ood.dialed { - ood.dialed = true - dialer := net.Dialer{ - Timeout: timeout, - LocalAddr: sourceAddr, - } - return dialer.Dial("tcp", addr) - } else if !ood.closed { - ood.closed = true - close(ood.redialChan) - } - return nil, errRedialing -} - // GRPCDialRaw calls grpc.Dial with options appropriate for the context. // Unlike GRPCDial, it does not start an RPC heartbeat to validate the -// connection. This connection will not be reconnected automatically; -// the returned channel is closed when a reconnection is attempted. -func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, <-chan struct{}, error) { +// connection. +func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, error) { dialOpts, err := ctx.GRPCDialOptions() if err != nil { - return nil, nil, err + return nil, err } // Add a stats handler to measure client network stats. @@ -473,22 +440,24 @@ func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, <-chan struct{ dialOpts = append(dialOpts, grpc.WithInitialWindowSize(initialWindowSize), grpc.WithInitialConnWindowSize(initialConnWindowSize)) + dialOpts = append(dialOpts, ctx.testingDialOpts...) - dialer := onlyOnceDialer{ - redialChan: make(chan struct{}), + if sourceAddr != nil { + dialOpts = append(dialOpts, grpc.WithDialer( + func(addr string, timeout time.Duration) (net.Conn, error) { + dialer := net.Dialer{ + Timeout: timeout, + LocalAddr: sourceAddr, + } + return dialer.Dial("tcp", addr) + }, + )) } - dialOpts = append(dialOpts, grpc.WithDialer(dialer.dial)) - - // add testingDialOpts after our dialer because one of our tests - // uses a custom dialer (this disables the only-one-connection - // behavior and redialChan will never be closed). - dialOpts = append(dialOpts, ctx.testingDialOpts...) if log.V(1) { log.Infof(ctx.masterCtx, "dialing %s", target) } - conn, err := grpc.DialContext(ctx.masterCtx, target, dialOpts...) - return conn, dialer.redialChan, err + return grpc.DialContext(ctx.masterCtx, target, dialOpts...) } // GRPCDial calls grpc.Dial with options appropriate for the context. @@ -500,8 +469,7 @@ func (ctx *Context) GRPCDial(target string) *Connection { conn := value.(*Connection) conn.initOnce.Do(func() { - var redialChan <-chan struct{} - conn.grpcConn, redialChan, conn.dialErr = ctx.GRPCDialRaw(target) + conn.grpcConn, conn.dialErr = ctx.GRPCDialRaw(target) if ctx.GetLocalInternalServerForAddr(target) != nil { conn.heartbeatResult.Store(heartbeatResult{err: nil, everSucceeded: true}) conn.setInitialHeartbeatDone() @@ -511,7 +479,7 @@ func (ctx *Context) GRPCDial(target string) *Connection { if err := ctx.Stopper.RunTask( ctx.masterCtx, "rpc.Context: grpc heartbeat", func(masterCtx context.Context) { ctx.Stopper.RunWorker(masterCtx, func(masterCtx context.Context) { - err := ctx.runHeartbeat(conn, target, redialChan) + err := ctx.runHeartbeat(conn, target) if err != nil && !grpcutil.IsClosedConnection(err) { log.Errorf(masterCtx, "removing connection to %s due to error: %s", target, err) } @@ -537,8 +505,7 @@ func (ctx *Context) NewBreaker() *circuit.Breaker { } // ErrNotConnected is returned by ConnHealth when there is no connection to the -// host (e.g. GRPCDial was never called for that address, or a connection has -// been closed and not reconnected). +// host (e.g. GRPCDial was never called for that address). var ErrNotConnected = errors.New("not connected") // ErrNotHeartbeated is returned by ConnHealth when we have not yet performed @@ -559,9 +526,7 @@ func (ctx *Context) ConnHealth(target string) error { return ErrNotConnected } -func (ctx *Context) runHeartbeat( - conn *Connection, target string, redialChan <-chan struct{}, -) error { +func (ctx *Context) runHeartbeat(conn *Connection, target string) error { maxOffset := ctx.LocalClock.MaxOffset() clusterID := ctx.ClusterID.Get() @@ -581,8 +546,6 @@ func (ctx *Context) runHeartbeat( everSucceeded := false for { select { - case <-redialChan: - return errRedialing case <-ctx.Stopper.ShouldStop(): return nil case <-heartbeatTimer.C: diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index 6587fe1ba2d0..02167ff6e5ae 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -377,14 +377,6 @@ func TestHeartbeatHealthTransport(t *testing.T) { } isUnhealthy := func(err error) bool { - // Most of the time, an unhealthy connection will get - // ErrNotConnected, but there are brief periods during which we - // could get ErrNotHeartbeated (while we're trying to start a new - // connection) or one of the grpc errors below (while the old - // connection is in the middle of closing). - if err == ErrNotConnected || err == ErrNotHeartbeated { - return true - } // The expected code here is Unavailable, but at least on OSX you can also get // // rpc error: code = Internal desc = connection error: desc = "transport: authentication @@ -425,16 +417,12 @@ func TestHeartbeatHealthTransport(t *testing.T) { if timeutil.Since(then) > 45*time.Second { t.Fatal(err) } - time.Sleep(10 * time.Millisecond) } close(done) - // We can reconnect and the connection becomes healthy again. + // Should become healthy again after GRPC reconnects. testutils.SucceedsSoon(t, func() error { - if _, err := clientCtx.GRPCDial(remoteAddr).Connect(context.Background()); err != nil { - return err - } return clientCtx.ConnHealth(remoteAddr) })