Skip to content

Commit

Permalink
Revert "rpc: Perform initial-heartbeat validation on GRPC reconnections"
Browse files Browse the repository at this point in the history
This reverts commit bae2cda.
  • Loading branch information
bdarnell committed Feb 12, 2018
1 parent 45ab257 commit 9b20d8a
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 70 deletions.
2 changes: 1 addition & 1 deletion pkg/acceptance/localcluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ func (n *Node) StatusClient() serverpb.StatusClient {
return existingClient
}

conn, _, err := n.rpcCtx.GRPCDialRaw(n.RPCAddr())
conn, err := n.rpcCtx.GRPCDialRaw(n.RPCAddr())
if err != nil {
log.Fatalf(context.Background(), "failed to initialize status client: %s", err)
}
Expand Down
75 changes: 19 additions & 56 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)
Expand All @@ -66,8 +65,6 @@ const (
initialConnWindowSize = initialWindowSize * 16 // for a connection
)

var errRedialing = errors.New("redialing")

// sourceAddr is the environment-provided local address for outgoing
// connections.
var sourceAddr = func() net.Addr {
Expand Down Expand Up @@ -426,43 +423,13 @@ func (ctx *Context) GRPCDialOptions() ([]grpc.DialOption, error) {
return dialOpts, nil
}

// onlyOnceDialer implements the grpc.WithDialer interface but only
// allows a single connection attempt. If a reconnection is attempted,
// redialChan is closed to signal a higher-level retry loop. This
// ensures that our initial heartbeat (and its version/clusterID
// validation) occurs on every new connection.
type onlyOnceDialer struct {
syncutil.Mutex
dialed bool
closed bool
redialChan chan struct{}
}

func (ood *onlyOnceDialer) dial(addr string, timeout time.Duration) (net.Conn, error) {
ood.Lock()
defer ood.Unlock()
if !ood.dialed {
ood.dialed = true
dialer := net.Dialer{
Timeout: timeout,
LocalAddr: sourceAddr,
}
return dialer.Dial("tcp", addr)
} else if !ood.closed {
ood.closed = true
close(ood.redialChan)
}
return nil, errRedialing
}

// GRPCDialRaw calls grpc.Dial with options appropriate for the context.
// Unlike GRPCDial, it does not start an RPC heartbeat to validate the
// connection. This connection will not be reconnected automatically;
// the returned channel is closed when a reconnection is attempted.
func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, <-chan struct{}, error) {
// connection.
func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, error) {
dialOpts, err := ctx.GRPCDialOptions()
if err != nil {
return nil, nil, err
return nil, err
}

// Add a stats handler to measure client network stats.
Expand All @@ -473,22 +440,24 @@ func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, <-chan struct{
dialOpts = append(dialOpts,
grpc.WithInitialWindowSize(initialWindowSize),
grpc.WithInitialConnWindowSize(initialConnWindowSize))
dialOpts = append(dialOpts, ctx.testingDialOpts...)

dialer := onlyOnceDialer{
redialChan: make(chan struct{}),
if sourceAddr != nil {
dialOpts = append(dialOpts, grpc.WithDialer(
func(addr string, timeout time.Duration) (net.Conn, error) {
dialer := net.Dialer{
Timeout: timeout,
LocalAddr: sourceAddr,
}
return dialer.Dial("tcp", addr)
},
))
}
dialOpts = append(dialOpts, grpc.WithDialer(dialer.dial))

// add testingDialOpts after our dialer because one of our tests
// uses a custom dialer (this disables the only-one-connection
// behavior and redialChan will never be closed).
dialOpts = append(dialOpts, ctx.testingDialOpts...)

if log.V(1) {
log.Infof(ctx.masterCtx, "dialing %s", target)
}
conn, err := grpc.DialContext(ctx.masterCtx, target, dialOpts...)
return conn, dialer.redialChan, err
return grpc.DialContext(ctx.masterCtx, target, dialOpts...)
}

// GRPCDial calls grpc.Dial with options appropriate for the context.
Expand All @@ -500,8 +469,7 @@ func (ctx *Context) GRPCDial(target string) *Connection {

conn := value.(*Connection)
conn.initOnce.Do(func() {
var redialChan <-chan struct{}
conn.grpcConn, redialChan, conn.dialErr = ctx.GRPCDialRaw(target)
conn.grpcConn, conn.dialErr = ctx.GRPCDialRaw(target)
if ctx.GetLocalInternalServerForAddr(target) != nil {
conn.heartbeatResult.Store(heartbeatResult{err: nil, everSucceeded: true})
conn.setInitialHeartbeatDone()
Expand All @@ -511,7 +479,7 @@ func (ctx *Context) GRPCDial(target string) *Connection {
if err := ctx.Stopper.RunTask(
ctx.masterCtx, "rpc.Context: grpc heartbeat", func(masterCtx context.Context) {
ctx.Stopper.RunWorker(masterCtx, func(masterCtx context.Context) {
err := ctx.runHeartbeat(conn, target, redialChan)
err := ctx.runHeartbeat(conn, target)
if err != nil && !grpcutil.IsClosedConnection(err) {
log.Errorf(masterCtx, "removing connection to %s due to error: %s", target, err)
}
Expand All @@ -537,8 +505,7 @@ func (ctx *Context) NewBreaker() *circuit.Breaker {
}

// ErrNotConnected is returned by ConnHealth when there is no connection to the
// host (e.g. GRPCDial was never called for that address, or a connection has
// been closed and not reconnected).
// host (e.g. GRPCDial was never called for that address).
var ErrNotConnected = errors.New("not connected")

// ErrNotHeartbeated is returned by ConnHealth when we have not yet performed
Expand All @@ -559,9 +526,7 @@ func (ctx *Context) ConnHealth(target string) error {
return ErrNotConnected
}

func (ctx *Context) runHeartbeat(
conn *Connection, target string, redialChan <-chan struct{},
) error {
func (ctx *Context) runHeartbeat(conn *Connection, target string) error {
maxOffset := ctx.LocalClock.MaxOffset()
clusterID := ctx.ClusterID.Get()

Expand All @@ -581,8 +546,6 @@ func (ctx *Context) runHeartbeat(
everSucceeded := false
for {
select {
case <-redialChan:
return errRedialing
case <-ctx.Stopper.ShouldStop():
return nil
case <-heartbeatTimer.C:
Expand Down
14 changes: 1 addition & 13 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,14 +377,6 @@ func TestHeartbeatHealthTransport(t *testing.T) {
}

isUnhealthy := func(err error) bool {
// Most of the time, an unhealthy connection will get
// ErrNotConnected, but there are brief periods during which we
// could get ErrNotHeartbeated (while we're trying to start a new
// connection) or one of the grpc errors below (while the old
// connection is in the middle of closing).
if err == ErrNotConnected || err == ErrNotHeartbeated {
return true
}
// The expected code here is Unavailable, but at least on OSX you can also get
//
// rpc error: code = Internal desc = connection error: desc = "transport: authentication
Expand Down Expand Up @@ -425,16 +417,12 @@ func TestHeartbeatHealthTransport(t *testing.T) {
if timeutil.Since(then) > 45*time.Second {
t.Fatal(err)
}
time.Sleep(10 * time.Millisecond)
}

close(done)

// We can reconnect and the connection becomes healthy again.
// Should become healthy again after GRPC reconnects.
testutils.SucceedsSoon(t, func() error {
if _, err := clientCtx.GRPCDial(remoteAddr).Connect(context.Background()); err != nil {
return err
}
return clientCtx.ConnHealth(remoteAddr)
})

Expand Down

0 comments on commit 9b20d8a

Please sign in to comment.