Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: Perform initial-heartbeat validation on GRPC reconnections (take 2) #22658

Merged
merged 2 commits into from
Feb 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/acceptance/localcluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ func (n *Node) StatusClient() serverpb.StatusClient {
return existingClient
}

conn, err := n.rpcCtx.GRPCDialRaw(n.RPCAddr())
conn, _, err := n.rpcCtx.GRPCDialRaw(n.RPCAddr())
if err != nil {
log.Fatalf(context.Background(), "failed to initialize status client: %s", err)
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/cli/interactive_tests/common.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,7 @@ proc stop_server {argv} {
# Trigger a normal shutdown.
system "$argv quit"
# If after 5 seconds the server hasn't shut down, trigger an error.

## Re-enable this once issue #22536 is fixed.
# system "for i in `seq 1 5`; do kill -CONT `cat server_pid` 2>/dev/null || exit 0; echo still waiting; sleep 1; done; echo 'server still running?'; exit 0"
# Until #22536 is fixed use a violent kill.
system "for i in `seq 1 5`; do kill -CONT `cat server_pid` 2>/dev/null || exit 0; echo still waiting; sleep 1; done; echo 'server still running?'; kill -9 `cat server_pid`; exit 0"
system "for i in `seq 1 5`; do kill -CONT `cat server_pid` 2>/dev/null || exit 0; echo still waiting; sleep 1; done; echo 'server still running?'; exit 0"

report "END STOP SERVER"
}
Expand Down
73 changes: 54 additions & 19 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)
Expand Down Expand Up @@ -423,13 +424,43 @@ func (ctx *Context) GRPCDialOptions() ([]grpc.DialOption, error) {
return dialOpts, nil
}

// onlyOnceDialer implements the grpc.WithDialer interface but only
// allows a single connection attempt. If a reconnection is attempted,
// redialChan is closed to signal a higher-level retry loop. This
// ensures that our initial heartbeat (and its version/clusterID
// validation) occurs on every new connection.
type onlyOnceDialer struct {
syncutil.Mutex
dialed bool
closed bool
redialChan chan struct{}
}

func (ood *onlyOnceDialer) dial(addr string, timeout time.Duration) (net.Conn, error) {
ood.Lock()
defer ood.Unlock()
if !ood.dialed {
ood.dialed = true
dialer := net.Dialer{
Timeout: timeout,
LocalAddr: sourceAddr,
}
return dialer.Dial("tcp", addr)
} else if !ood.closed {
ood.closed = true
close(ood.redialChan)
}
return nil, grpcutil.ErrCannotReuseClientConn
}

// GRPCDialRaw calls grpc.Dial with options appropriate for the context.
// Unlike GRPCDial, it does not start an RPC heartbeat to validate the
// connection.
func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, error) {
// connection. This connection will not be reconnected automatically;
// the returned channel is closed when a reconnection is attempted.
func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, <-chan struct{}, error) {
dialOpts, err := ctx.GRPCDialOptions()
if err != nil {
return nil, err
return nil, nil, err
}

// Add a stats handler to measure client network stats.
Expand All @@ -440,24 +471,22 @@ func (ctx *Context) GRPCDialRaw(target string) (*grpc.ClientConn, error) {
dialOpts = append(dialOpts,
grpc.WithInitialWindowSize(initialWindowSize),
grpc.WithInitialConnWindowSize(initialConnWindowSize))
dialOpts = append(dialOpts, ctx.testingDialOpts...)

if sourceAddr != nil {
dialOpts = append(dialOpts, grpc.WithDialer(
func(addr string, timeout time.Duration) (net.Conn, error) {
dialer := net.Dialer{
Timeout: timeout,
LocalAddr: sourceAddr,
}
return dialer.Dial("tcp", addr)
},
))
dialer := onlyOnceDialer{
redialChan: make(chan struct{}),
}
dialOpts = append(dialOpts, grpc.WithDialer(dialer.dial))

// add testingDialOpts after our dialer because one of our tests
// uses a custom dialer (this disables the only-one-connection
// behavior and redialChan will never be closed).
dialOpts = append(dialOpts, ctx.testingDialOpts...)

if log.V(1) {
log.Infof(ctx.masterCtx, "dialing %s", target)
}
return grpc.DialContext(ctx.masterCtx, target, dialOpts...)
conn, err := grpc.DialContext(ctx.masterCtx, target, dialOpts...)
return conn, dialer.redialChan, err
}

// GRPCDial calls grpc.Dial with options appropriate for the context.
Expand All @@ -469,7 +498,8 @@ func (ctx *Context) GRPCDial(target string) *Connection {

conn := value.(*Connection)
conn.initOnce.Do(func() {
conn.grpcConn, conn.dialErr = ctx.GRPCDialRaw(target)
var redialChan <-chan struct{}
conn.grpcConn, redialChan, conn.dialErr = ctx.GRPCDialRaw(target)
if ctx.GetLocalInternalServerForAddr(target) != nil {
conn.heartbeatResult.Store(heartbeatResult{err: nil, everSucceeded: true})
conn.setInitialHeartbeatDone()
Expand All @@ -479,7 +509,7 @@ func (ctx *Context) GRPCDial(target string) *Connection {
if err := ctx.Stopper.RunTask(
ctx.masterCtx, "rpc.Context: grpc heartbeat", func(masterCtx context.Context) {
ctx.Stopper.RunWorker(masterCtx, func(masterCtx context.Context) {
err := ctx.runHeartbeat(conn, target)
err := ctx.runHeartbeat(conn, target, redialChan)
if err != nil && !grpcutil.IsClosedConnection(err) {
log.Errorf(masterCtx, "removing connection to %s due to error: %s", target, err)
}
Expand All @@ -505,7 +535,8 @@ func (ctx *Context) NewBreaker() *circuit.Breaker {
}

// ErrNotConnected is returned by ConnHealth when there is no connection to the
// host (e.g. GRPCDial was never called for that address).
// host (e.g. GRPCDial was never called for that address, or a connection has
// been closed and not reconnected).
var ErrNotConnected = errors.New("not connected")

// ErrNotHeartbeated is returned by ConnHealth when we have not yet performed
Expand All @@ -526,7 +557,9 @@ func (ctx *Context) ConnHealth(target string) error {
return ErrNotConnected
}

func (ctx *Context) runHeartbeat(conn *Connection, target string) error {
func (ctx *Context) runHeartbeat(
conn *Connection, target string, redialChan <-chan struct{},
) error {
maxOffset := ctx.LocalClock.MaxOffset()
clusterID := ctx.ClusterID.Get()

Expand All @@ -546,6 +579,8 @@ func (ctx *Context) runHeartbeat(conn *Connection, target string) error {
everSucceeded := false
for {
select {
case <-redialChan:
return grpcutil.ErrCannotReuseClientConn
case <-ctx.Stopper.ShouldStop():
return nil
case <-heartbeatTimer.C:
Expand Down
14 changes: 13 additions & 1 deletion pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,14 @@ func TestHeartbeatHealthTransport(t *testing.T) {
}

isUnhealthy := func(err error) bool {
// Most of the time, an unhealthy connection will get
// ErrNotConnected, but there are brief periods during which we
// could get ErrNotHeartbeated (while we're trying to start a new
// connection) or one of the grpc errors below (while the old
// connection is in the middle of closing).
if err == ErrNotConnected || err == ErrNotHeartbeated {
return true
}
// The expected code here is Unavailable, but at least on OSX you can also get
//
// rpc error: code = Internal desc = connection error: desc = "transport: authentication
Expand Down Expand Up @@ -417,12 +425,16 @@ func TestHeartbeatHealthTransport(t *testing.T) {
if timeutil.Since(then) > 45*time.Second {
t.Fatal(err)
}
time.Sleep(10 * time.Millisecond)
}

close(done)

// Should become healthy again after GRPC reconnects.
// We can reconnect and the connection becomes healthy again.
testutils.SucceedsSoon(t, func() error {
if _, err := clientCtx.GRPCDial(remoteAddr).Connect(context.Background()); err != nil {
return err
}
return clientCtx.ConnHealth(remoteAddr)
})

Expand Down
8 changes: 8 additions & 0 deletions pkg/util/grpcutil/grpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ import (
"google.golang.org/grpc/transport"
)

// ErrCannotReuseClientConn is returned when a failed connection is
// being reused. We require that new connections be created with
// pkg/rpc.GRPCDial instead.
var ErrCannotReuseClientConn = errors.New("cannot reuse client connection")

type localRequestKey struct{}

// NewLocalRequestContext returns a Context that can be used for local (in-process) requests.
Expand All @@ -46,6 +51,9 @@ func IsLocalRequestContext(ctx context.Context) bool {
// on closed connections.
func IsClosedConnection(err error) bool {
err = errors.Cause(err)
if err == ErrCannotReuseClientConn {
return true
}
if s, ok := status.FromError(err); ok {
if s.Code() == codes.Canceled ||
s.Code() == codes.Unavailable ||
Expand Down