Skip to content

Commit

Permalink
Merge pull request #22658 from bdarnell/grpc-2
Browse files Browse the repository at this point in the history
rpc: Perform initial-heartbeat validation on GRPC reconnections (take 2)
  • Loading branch information
bdarnell authored Feb 13, 2018
2 parents 61ac7fe + 8e0a505 commit 0c433fe
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 26 deletions.
2 changes: 1 addition & 1 deletion pkg/acceptance/localcluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ func (n *Node) StatusClient() serverpb.StatusClient {
return existingClient
}

conn, err := n.rpcCtx.GRPCDialRaw(n.RPCAddr())
conn, _, err := n.rpcCtx.GRPCDialRaw(n.RPCAddr())
if err != nil {
log.Fatalf(context.Background(), "failed to initialize status client: %s", err)
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/cli/interactive_tests/common.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,7 @@ proc stop_server {argv} {
# Trigger a normal shutdown.
system "$argv quit"
# If after 5 seconds the server hasn't shut down, trigger an error.

## Re-enable this once issue #22536 is fixed.
# system "for i in `seq 1 5`; do kill -CONT `cat server_pid` 2>/dev/null || exit 0; echo still waiting; sleep 1; done; echo 'server still running?'; exit 0"
# Until #22536 is fixed use a violent kill.
system "for i in `seq 1 5`; do kill -CONT `cat server_pid` 2>/dev/null || exit 0; echo still waiting; sleep 1; done; echo 'server still running?'; kill -9 `cat server_pid`; exit 0"
system "for i in `seq 1 5`; do kill -CONT `cat server_pid` 2>/dev/null || exit 0; echo still waiting; sleep 1; done; echo 'server still running?'; exit 0"

report "END STOP SERVER"
}
Expand Down
73 changes: 54 additions & 19 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)
Expand Down Expand Up @@ -423,13 +424,43 @@ func (ctx *Context) GRPCDialOptions() ([]grpc.DialOption, error) {
return dialOpts, nil
}

// onlyOnceDialer implements the grpc.WithDialer interface but only
// allows a single connection attempt. If a reconnection is attempted,
// redialChan is closed to signal a higher-level retry loop. This
// ensures that our initial heartbeat (and its version/clusterID
// validation) occurs on every new connection.
type onlyOnceDialer struct {
syncutil.Mutex
dialed bool
closed bool
redialChan chan struct{}
}

func (ood *onlyOnceDialer) dial(addr string, timeout time.Duration) (net.Conn, error) {
ood.Lock()
defer ood.Unlock()
if !ood.dialed {
ood.dialed = true
dialer := net.Dialer{
Timeout: timeout,
LocalAddr: sourceAddr,
}
return dialer.Dial("tcp", addr)
} else if !ood.closed {
ood.closed = true
close(ood.redialChan)
}
return nil, grpcutil.ErrCannotReuseClientConn
}

// GRPCDialRaw calls grpc.Dial with options appropriate for the context.
// Unlike GRPCDial, it does not start an RPC heartbeat to validate the
// connection.
func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, error) {
// connection. This connection will not be reconnected automatically;
// the returned channel is closed when a reconnection is attempted.
func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, <-chan struct{}, error) {
dialOpts, err := ctx.GRPCDialOptions()
if err != nil {
return nil, err
return nil, nil, err
}

// Add a stats handler to measure client network stats.
Expand All @@ -440,24 +471,22 @@ func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, error) {
dialOpts = append(dialOpts,
grpc.WithInitialWindowSize(initialWindowSize),
grpc.WithInitialConnWindowSize(initialConnWindowSize))
dialOpts = append(dialOpts, ctx.testingDialOpts...)

if sourceAddr != nil {
dialOpts = append(dialOpts, grpc.WithDialer(
func(addr string, timeout time.Duration) (net.Conn, error) {
dialer := net.Dialer{
Timeout: timeout,
LocalAddr: sourceAddr,
}
return dialer.Dial("tcp", addr)
},
))
dialer := onlyOnceDialer{
redialChan: make(chan struct{}),
}
dialOpts = append(dialOpts, grpc.WithDialer(dialer.dial))

// add testingDialOpts after our dialer because one of our tests
// uses a custom dialer (this disables the only-one-connection
// behavior and redialChan will never be closed).
dialOpts = append(dialOpts, ctx.testingDialOpts...)

if log.V(1) {
log.Infof(ctx.masterCtx, "dialing %s", target)
}
return grpc.DialContext(ctx.masterCtx, target, dialOpts...)
conn, err := grpc.DialContext(ctx.masterCtx, target, dialOpts...)
return conn, dialer.redialChan, err
}

// GRPCDial calls grpc.Dial with options appropriate for the context.
Expand All @@ -469,7 +498,8 @@ func (ctx *Context) GRPCDial(target string) *Connection {

conn := value.(*Connection)
conn.initOnce.Do(func() {
conn.grpcConn, conn.dialErr = ctx.GRPCDialRaw(target)
var redialChan <-chan struct{}
conn.grpcConn, redialChan, conn.dialErr = ctx.GRPCDialRaw(target)
if ctx.GetLocalInternalServerForAddr(target) != nil {
conn.heartbeatResult.Store(heartbeatResult{err: nil, everSucceeded: true})
conn.setInitialHeartbeatDone()
Expand All @@ -479,7 +509,7 @@ func (ctx *Context) GRPCDial(target string) *Connection {
if err := ctx.Stopper.RunTask(
ctx.masterCtx, "rpc.Context: grpc heartbeat", func(masterCtx context.Context) {
ctx.Stopper.RunWorker(masterCtx, func(masterCtx context.Context) {
err := ctx.runHeartbeat(conn, target)
err := ctx.runHeartbeat(conn, target, redialChan)
if err != nil && !grpcutil.IsClosedConnection(err) {
log.Errorf(masterCtx, "removing connection to %s due to error: %s", target, err)
}
Expand All @@ -505,7 +535,8 @@ func (ctx *Context) NewBreaker() *circuit.Breaker {
}

// ErrNotConnected is returned by ConnHealth when there is no connection to the
// host (e.g. GRPCDial was never called for that address).
// host (e.g. GRPCDial was never called for that address, or a connection has
// been closed and not reconnected).
var ErrNotConnected = errors.New("not connected")

// ErrNotHeartbeated is returned by ConnHealth when we have not yet performed
Expand All @@ -526,7 +557,9 @@ func (ctx *Context) ConnHealth(target string) error {
return ErrNotConnected
}

func (ctx *Context) runHeartbeat(conn *Connection, target string) error {
func (ctx *Context) runHeartbeat(
conn *Connection, target string, redialChan <-chan struct{},
) error {
maxOffset := ctx.LocalClock.MaxOffset()
clusterID := ctx.ClusterID.Get()

Expand All @@ -546,6 +579,8 @@ func (ctx *Context) runHeartbeat(conn *Connection, target string) error {
everSucceeded := false
for {
select {
case <-redialChan:
return grpcutil.ErrCannotReuseClientConn
case <-ctx.Stopper.ShouldStop():
return nil
case <-heartbeatTimer.C:
Expand Down
14 changes: 13 additions & 1 deletion pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,14 @@ func TestHeartbeatHealthTransport(t *testing.T) {
}

isUnhealthy := func(err error) bool {
// Most of the time, an unhealthy connection will get
// ErrNotConnected, but there are brief periods during which we
// could get ErrNotHeartbeated (while we're trying to start a new
// connection) or one of the grpc errors below (while the old
// connection is in the middle of closing).
if err == ErrNotConnected || err == ErrNotHeartbeated {
return true
}
// The expected code here is Unavailable, but at least on OSX you can also get
//
// rpc error: code = Internal desc = connection error: desc = "transport: authentication
Expand Down Expand Up @@ -417,12 +425,16 @@ func TestHeartbeatHealthTransport(t *testing.T) {
if timeutil.Since(then) > 45*time.Second {
t.Fatal(err)
}
time.Sleep(10 * time.Millisecond)
}

close(done)

// Should become healthy again after GRPC reconnects.
// We can reconnect and the connection becomes healthy again.
testutils.SucceedsSoon(t, func() error {
if _, err := clientCtx.GRPCDial(remoteAddr).Connect(context.Background()); err != nil {
return err
}
return clientCtx.ConnHealth(remoteAddr)
})

Expand Down
8 changes: 8 additions & 0 deletions pkg/util/grpcutil/grpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ import (
"google.golang.org/grpc/transport"
)

// ErrCannotReuseClientConn is returned when a failed connection is
// being reused. We require that new connections be created with
// pkg/rpc.GRPCDial instead.
var ErrCannotReuseClientConn = errors.New("cannot reuse client connection")

type localRequestKey struct{}

// NewLocalRequestContext returns a Context that can be used for local (in-process) requests.
Expand All @@ -46,6 +51,9 @@ func IsLocalRequestContext(ctx context.Context) bool {
// on closed connections.
func IsClosedConnection(err error) bool {
err = errors.Cause(err)
if err == ErrCannotReuseClientConn {
return true
}
if s, ok := status.FromError(err); ok {
if s.Code() == codes.Canceled ||
s.Code() == codes.Unavailable ||
Expand Down

0 comments on commit 0c433fe

Please sign in to comment.