Skip to content

Commit

Permalink
rpc: Perform initial-heartbeat validation on GRPC reconnections
Browse files Browse the repository at this point in the history
GRPC will transparently reconnect when a connection fails, but if the
next process to use that port is not a part of the same cluster, this
leads to confusing errors and potential data corruption. (this is most
common in tests, but it can also occur in other situations).

This change disables grpc's automatic reconnections so that in the
event of a failed connection, we go through our full dialing process
including an initial heartbeat that validates certain parameters.

Fixes #20537

Release note (bug fix): Implement additional safeguards against RPC
connections between nodes that belong to different clusters.
  • Loading branch information
bdarnell committed Feb 8, 2018
1 parent 35a94fc commit bae2cda
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 21 deletions.
2 changes: 1 addition & 1 deletion pkg/acceptance/localcluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ func (n *Node) StatusClient() serverpb.StatusClient {
return existingClient
}

conn, err := n.rpcCtx.GRPCDialRaw(n.RPCAddr())
conn, _, err := n.rpcCtx.GRPCDialRaw(n.RPCAddr())
if err != nil {
log.Fatalf(context.Background(), "failed to initialize status client: %s", err)
}
Expand Down
75 changes: 56 additions & 19 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)
Expand All @@ -65,6 +66,8 @@ const (
initialConnWindowSize = initialWindowSize * 16 // for a connection
)

var errRedialing = errors.New("redialing")

// sourceAddr is the environment-provided local address for outgoing
// connections.
var sourceAddr = func() net.Addr {
Expand Down Expand Up @@ -423,13 +426,43 @@ func (ctx *Context) GRPCDialOptions() ([]grpc.DialOption, error) {
return dialOpts, nil
}

// onlyOnceDialer implements the grpc.WithDialer interface but only
// allows a single connection attempt. If a reconnection is attempted,
// redialChan is closed to signal a higher-level retry loop. This
// ensures that our initial heartbeat (and its version/clusterID
// validation) occurs on every new connection.
type onlyOnceDialer struct {
syncutil.Mutex
dialed bool
closed bool
redialChan chan struct{}
}

func (ood *onlyOnceDialer) dial(addr string, timeout time.Duration) (net.Conn, error) {
ood.Lock()
defer ood.Unlock()
if !ood.dialed {
ood.dialed = true
dialer := net.Dialer{
Timeout: timeout,
LocalAddr: sourceAddr,
}
return dialer.Dial("tcp", addr)
} else if !ood.closed {
ood.closed = true
close(ood.redialChan)
}
return nil, errRedialing
}

// GRPCDialRaw calls grpc.Dial with options appropriate for the context.
// Unlike GRPCDial, it does not start an RPC heartbeat to validate the
// connection.
func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, error) {
// connection. This connection will not be reconnected automatically;
// the returned channel is closed when a reconnection is attempted.
func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, <-chan struct{}, error) {
dialOpts, err := ctx.GRPCDialOptions()
if err != nil {
return nil, err
return nil, nil, err
}

// Add a stats handler to measure client network stats.
Expand All @@ -440,24 +473,22 @@ func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, error) {
dialOpts = append(dialOpts,
grpc.WithInitialWindowSize(initialWindowSize),
grpc.WithInitialConnWindowSize(initialConnWindowSize))
dialOpts = append(dialOpts, ctx.testingDialOpts...)

if sourceAddr != nil {
dialOpts = append(dialOpts, grpc.WithDialer(
func(addr string, timeout time.Duration) (net.Conn, error) {
dialer := net.Dialer{
Timeout: timeout,
LocalAddr: sourceAddr,
}
return dialer.Dial("tcp", addr)
},
))
dialer := onlyOnceDialer{
redialChan: make(chan struct{}),
}
dialOpts = append(dialOpts, grpc.WithDialer(dialer.dial))

// add testingDialOpts after our dialer because one of our tests
// uses a custom dialer (this disables the only-one-connection
// behavior and redialChan will never be closed).
dialOpts = append(dialOpts, ctx.testingDialOpts...)

if log.V(1) {
log.Infof(ctx.masterCtx, "dialing %s", target)
}
return grpc.DialContext(ctx.masterCtx, target, dialOpts...)
conn, err := grpc.DialContext(ctx.masterCtx, target, dialOpts...)
return conn, dialer.redialChan, err
}

// GRPCDial calls grpc.Dial with options appropriate for the context.
Expand All @@ -469,7 +500,8 @@ func (ctx *Context) GRPCDial(target string) *Connection {

conn := value.(*Connection)
conn.initOnce.Do(func() {
conn.grpcConn, conn.dialErr = ctx.GRPCDialRaw(target)
var redialChan <-chan struct{}
conn.grpcConn, redialChan, conn.dialErr = ctx.GRPCDialRaw(target)
if ctx.GetLocalInternalServerForAddr(target) != nil {
conn.heartbeatResult.Store(heartbeatResult{err: nil, everSucceeded: true})
conn.setInitialHeartbeatDone()
Expand All @@ -479,7 +511,7 @@ func (ctx *Context) GRPCDial(target string) *Connection {
if err := ctx.Stopper.RunTask(
ctx.masterCtx, "rpc.Context: grpc heartbeat", func(masterCtx context.Context) {
ctx.Stopper.RunWorker(masterCtx, func(masterCtx context.Context) {
err := ctx.runHeartbeat(conn, target)
err := ctx.runHeartbeat(conn, target, redialChan)
if err != nil && !grpcutil.IsClosedConnection(err) {
log.Errorf(masterCtx, "removing connection to %s due to error: %s", target, err)
}
Expand All @@ -505,7 +537,8 @@ func (ctx *Context) NewBreaker() *circuit.Breaker {
}

// ErrNotConnected is returned by ConnHealth when there is no connection to the
// host (e.g. GRPCDial was never called for that address).
// host (e.g. GRPCDial was never called for that address, or a connection has
// been closed and not reconnected).
var ErrNotConnected = errors.New("not connected")

// ErrNotHeartbeated is returned by ConnHealth when we have not yet performed
Expand All @@ -526,7 +559,9 @@ func (ctx *Context) ConnHealth(target string) error {
return ErrNotConnected
}

func (ctx *Context) runHeartbeat(conn *Connection, target string) error {
func (ctx *Context) runHeartbeat(
conn *Connection, target string, redialChan <-chan struct{},
) error {
maxOffset := ctx.LocalClock.MaxOffset()
clusterID := ctx.ClusterID.Get()

Expand All @@ -546,6 +581,8 @@ func (ctx *Context) runHeartbeat(conn *Connection, target string) error {
everSucceeded := false
for {
select {
case <-redialChan:
return errRedialing
case <-ctx.Stopper.ShouldStop():
return nil
case <-heartbeatTimer.C:
Expand Down
14 changes: 13 additions & 1 deletion pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,14 @@ func TestHeartbeatHealthTransport(t *testing.T) {
}

isUnhealthy := func(err error) bool {
// Most of the time, an unhealthy connection will get
// ErrNotConnected, but there are brief periods during which we
// could get ErrNotHeartbeated (while we're trying to start a new
// connection) or one of the grpc errors below (while the old
// connection is in the middle of closing).
if err == ErrNotConnected || err == ErrNotHeartbeated {
return true
}
// The expected code here is Unavailable, but at least on OSX you can also get
//
// rpc error: code = Internal desc = connection error: desc = "transport: authentication
Expand Down Expand Up @@ -417,12 +425,16 @@ func TestHeartbeatHealthTransport(t *testing.T) {
if timeutil.Since(then) > 45*time.Second {
t.Fatal(err)
}
time.Sleep(10 * time.Millisecond)
}

close(done)

// Should become healthy again after GRPC reconnects.
// We can reconnect and the connection becomes healthy again.
testutils.SucceedsSoon(t, func() error {
if _, err := clientCtx.GRPCDial(remoteAddr).Connect(context.Background()); err != nil {
return err
}
return clientCtx.ConnHealth(remoteAddr)
})

Expand Down

0 comments on commit bae2cda

Please sign in to comment.