Skip to content

Commit

Permalink
core: Check for cluster ID conflicts on handshake
Browse files Browse the repository at this point in the history
In the heartbeat, nodes now share their cluster IDs and check that they
match. We allow for missing cluster IDs, since new nodes do not have a
cluster ID until they obtain one via gossip, but conflicting IDs will
result in a heartbeat error.

In addition, connections are now not added to the connection pool until
the heartbeat succeeds. This allows us to fail fast when a node attempts
to join the wrong cluster.

Fixes cockroachdb#15801.
Fixes cockroachdb#15898.
Refers cockroachdb#18058.
  • Loading branch information
solongordon committed Nov 22, 2017
1 parent c81cc12 commit f9c09c2
Show file tree
Hide file tree
Showing 13 changed files with 349 additions and 142 deletions.
3 changes: 2 additions & 1 deletion pkg/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ communicate with a secure cluster\).
// up the stack as a roachpb.NewError(roachpb.NewSendError(.)).
// Error returned directly from GRPC.
{`quit`, styled(
`Failed to connect to the node: error sending drain request: rpc error: code = Unavailable desc = grpc: the connection is unavailable`),
`Failed to connect to the node: initial connection heartbeat failed: rpc error: ` +
`code = Unavailable desc = grpc: the connection is unavailable`),
},
// Going through the SQL client libraries gives a *net.OpError which
// we also handle.
Expand Down
4 changes: 3 additions & 1 deletion pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,10 +931,12 @@ func getClientGRPCConn() (*grpc.ClientConn, *hlc.Clock, *stop.Stopper, error) {
)
addr, err := addrWithDefaultHost(serverCfg.AdvertiseAddr)
if err != nil {
stopper.Stop(stopperContext(stopper))
return nil, nil, nil, err
}
conn, err := rpcContext.GRPCDial(addr)
if err != nil {
stopper.Stop(stopperContext(stopper))
return nil, nil, nil, err
}
return conn, clock, stopper, nil
Expand All @@ -943,7 +945,7 @@ func getClientGRPCConn() (*grpc.ClientConn, *hlc.Clock, *stop.Stopper, error) {
func getAdminClient() (serverpb.AdminClient, *stop.Stopper, error) {
conn, _, stopper, err := getClientGRPCConn()
if err != nil {
return nil, nil, err
return nil, nil, errors.Wrap(err, "Failed to connect to the node")
}
return serverpb.NewAdminClient(conn), stopper, nil
}
Expand Down
112 changes: 63 additions & 49 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

func init() {
Expand Down Expand Up @@ -201,10 +202,7 @@ func NewServerWithInterceptor(
}

s := grpc.NewServer(opts...)
RegisterHeartbeatServer(s, &HeartbeatService{
clock: ctx.LocalClock,
remoteClockMonitor: ctx.RemoteClocks,
})
RegisterHeartbeatServer(s, &HeartbeatService{ctx})
return s
}

Expand Down Expand Up @@ -244,6 +242,8 @@ type Context struct {

stats StatsHandler

ClusterID uuid.UUID

// For unittesting.
BreakerFactory func() *circuit.Breaker
}
Expand Down Expand Up @@ -433,11 +433,21 @@ func (ctx *Context) GRPCDial(target string, opts ...grpc.DialOption) (*grpc.Clie
log.Infof(ctx.masterCtx, "dialing %s", target)
}
meta.conn, meta.dialErr = grpc.DialContext(ctx.masterCtx, target, dialOpts...)

if ctx.GetLocalInternalServerForAddr(target) == nil && meta.dialErr == nil {
// Run an initial heartbeat to ensure the connection is healthy.
ctx.runHeartbeat(meta, target)
err = meta.heartbeatErr.Load().(errValue).error
if err != nil {
meta.dialErr = errors.Wrap(err, "initial connection heartbeat failed")
ctx.removeConn(target, meta)
return
}

if err := ctx.Stopper.RunTask(
ctx.masterCtx, "rpc.Context: grpc heartbeat", func(masterCtx context.Context) {
ctx.Stopper.RunWorker(masterCtx, func(masterCtx context.Context) {
err := ctx.runHeartbeat(meta, target)
err := ctx.runHeartbeatTimer(meta, target)
if err != nil && !grpcutil.IsClosedConnection(err) {
log.Errorf(masterCtx, "removing connection to %s due to error: %s", target, err)
}
Expand Down Expand Up @@ -488,15 +498,61 @@ func (ctx *Context) ConnHealth(target string) error {
return ErrNotConnected
}

func (ctx *Context) runHeartbeat(meta *connMeta, target string) error {
func (ctx *Context) runHeartbeat(meta *connMeta, target string) {
maxOffset := ctx.LocalClock.MaxOffset()

request := PingRequest{
Addr: ctx.Addr,
MaxOffsetNanos: maxOffset.Nanoseconds(),
ClusterID: ctx.ClusterID,
}
heartbeatClient := NewHeartbeatClient(meta.conn)

goCtx := ctx.masterCtx
var cancel context.CancelFunc
if hbTimeout := ctx.heartbeatTimeout; hbTimeout > 0 {
goCtx, cancel = context.WithTimeout(goCtx, hbTimeout)
}
sendTime := ctx.LocalClock.PhysicalTime()
// NB: We want the request to fail-fast (the default), otherwise we won't
// be notified of transport failures.
response, err := heartbeatClient.Ping(goCtx, &request)
if cancel != nil {
cancel()
}
meta.heartbeatErr.Store(errValue{err})

if err == nil {
receiveTime := ctx.LocalClock.PhysicalTime()

// Only update the clock offset measurement if we actually got a
// successful response from the server.
pingDuration := receiveTime.Sub(sendTime)
maxOffset := ctx.LocalClock.MaxOffset()
if maxOffset != timeutil.ClocklessMaxOffset &&
pingDuration > maximumPingDurationMult*maxOffset {

request.Offset.Reset()
} else {
// Offset and error are measured using the remote clock reading
// technique described in
// http://se.inf.tu-dresden.de/pubs/papers/SRDS1994.pdf, page 6.
// However, we assume that drift and min message delay are 0, for
// now.
request.Offset.MeasuredAt = receiveTime.UnixNano()
request.Offset.Uncertainty = (pingDuration / 2).Nanoseconds()
remoteTimeNow := timeutil.Unix(0, response.ServerTime).Add(pingDuration / 2)
request.Offset.Offset = remoteTimeNow.Sub(receiveTime).Nanoseconds()
}
ctx.RemoteClocks.UpdateOffset(ctx.masterCtx, target, request.Offset, pingDuration)

if cb := ctx.HeartbeatCB; cb != nil {
cb()
}
}
}

func (ctx *Context) runHeartbeatTimer(meta *connMeta, target string) error {
var heartbeatTimer timeutil.Timer
defer heartbeatTimer.Stop()

Expand All @@ -509,49 +565,7 @@ func (ctx *Context) runHeartbeat(meta *connMeta, target string) error {
case <-heartbeatTimer.C:
heartbeatTimer.Read = true
}

goCtx := ctx.masterCtx
var cancel context.CancelFunc
if hbTimeout := ctx.heartbeatTimeout; hbTimeout > 0 {
goCtx, cancel = context.WithTimeout(goCtx, hbTimeout)
}
sendTime := ctx.LocalClock.PhysicalTime()
// NB: We want the request to fail-fast (the default), otherwise we won't
// be notified of transport failures.
response, err := heartbeatClient.Ping(goCtx, &request)
if cancel != nil {
cancel()
}
meta.heartbeatErr.Store(errValue{err})

if err == nil {
receiveTime := ctx.LocalClock.PhysicalTime()

// Only update the clock offset measurement if we actually got a
// successful response from the server.
pingDuration := receiveTime.Sub(sendTime)
maxOffset := ctx.LocalClock.MaxOffset()
if maxOffset != timeutil.ClocklessMaxOffset &&
pingDuration > maximumPingDurationMult*maxOffset {

request.Offset.Reset()
} else {
// Offset and error are measured using the remote clock reading
// technique described in
// http://se.inf.tu-dresden.de/pubs/papers/SRDS1994.pdf, page 6.
// However, we assume that drift and min message delay are 0, for
// now.
request.Offset.MeasuredAt = receiveTime.UnixNano()
request.Offset.Uncertainty = (pingDuration / 2).Nanoseconds()
remoteTimeNow := timeutil.Unix(0, response.ServerTime).Add(pingDuration / 2)
request.Offset.Offset = remoteTimeNow.Sub(receiveTime).Nanoseconds()
}
ctx.RemoteClocks.UpdateOffset(ctx.masterCtx, target, request.Offset, pingDuration)

if cb := ctx.HeartbeatCB; cb != nil {
cb()
}
}
ctx.runHeartbeat(meta, target)

heartbeatTimer.Reset(ctx.heartbeatInterval)
}
Expand Down
88 changes: 58 additions & 30 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

func newTestServer(t testing.TB, ctx *Context, compression bool) *grpc.Server {
Expand Down Expand Up @@ -71,10 +72,7 @@ func TestHeartbeatCB(t *testing.T) {
serverCtx := NewContext(log.AmbientContext{Tracer: tracing.NewTracer()}, testutils.NewNodeTestBaseContext(), clock, stopper)
serverCtx.rpcCompression = compression
s := newTestServer(t, serverCtx, true)
RegisterHeartbeatServer(s, &HeartbeatService{
clock: clock,
remoteClockMonitor: serverCtx.RemoteClocks,
})
RegisterHeartbeatServer(s, &HeartbeatService{serverCtx})

ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr)
if err != nil {
Expand Down Expand Up @@ -167,15 +165,6 @@ func TestHeartbeatHealth(t *testing.T) {
}
remoteAddr := ln.Addr().String()

clientCtx := NewContext(log.AmbientContext{Tracer: tracing.NewTracer()}, testutils.NewNodeTestBaseContext(), clock, stopper)
clientCtx.Addr = "notlocalserver"
clientCtx.AdvertiseAddr = "localserver"
// Make the interval shorter to speed up the test.
clientCtx.heartbeatInterval = 1 * time.Millisecond
if _, err := clientCtx.GRPCDial(remoteAddr); err != nil {
t.Fatal(err)
}

errFailedHeartbeat := errors.New("failed heartbeat")

var hbSuccess atomic.Value
Expand All @@ -196,6 +185,15 @@ func TestHeartbeatHealth(t *testing.T) {
}
}()

clientCtx := NewContext(log.AmbientContext{Tracer: tracing.NewTracer()}, testutils.NewNodeTestBaseContext(), clock, stopper)
clientCtx.Addr = "notlocalserver"
clientCtx.AdvertiseAddr = "localserver"
// Make the interval shorter to speed up the test.
clientCtx.heartbeatInterval = 1 * time.Millisecond
if _, err := clientCtx.GRPCDial(remoteAddr); err != nil {
t.Fatal(err)
}

// Wait for the connection.
testutils.SucceedsSoon(t, func() error {
err := clientCtx.ConnHealth(remoteAddr)
Expand Down Expand Up @@ -291,10 +289,7 @@ func TestHeartbeatHealthTransport(t *testing.T) {
}
s := grpc.NewServer(
grpc.RPCDecompressor(snappyDecompressor{}), grpc.Creds(credentials.NewTLS(tlsConfig)))
RegisterHeartbeatServer(s, &HeartbeatService{
clock: clock,
remoteClockMonitor: serverCtx.RemoteClocks,
})
RegisterHeartbeatServer(s, &HeartbeatService{serverCtx})

mu := struct {
syncutil.Mutex
Expand Down Expand Up @@ -452,10 +447,7 @@ func TestOffsetMeasurement(t *testing.T) {
serverClock := hlc.NewClock(serverTime.UnixNano, time.Nanosecond)
serverCtx := NewContext(log.AmbientContext{Tracer: tracing.NewTracer()}, testutils.NewNodeTestBaseContext(), serverClock, stopper)
s := newTestServer(t, serverCtx, true)
RegisterHeartbeatServer(s, &HeartbeatService{
clock: serverClock,
remoteClockMonitor: serverCtx.RemoteClocks,
})
RegisterHeartbeatServer(s, &HeartbeatService{serverCtx})

ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr)
if err != nil {
Expand Down Expand Up @@ -533,10 +525,10 @@ func TestFailedOffsetMeasurement(t *testing.T) {
// Remove the timeout so that failure arises from exceeding the maximum
// clock reading delay, not the timeout.
clientCtx.heartbeatTimeout = 0
go func() { heartbeat.ready <- nil }() // Allow one heartbeat for initialization.
if _, err := clientCtx.GRPCDial(remoteAddr); err != nil {
t.Fatal(err)
}
heartbeat.ready <- nil // Allow one heartbeat for initialization.

testutils.SucceedsSoon(t, func() error {
clientCtx.RemoteClocks.mu.Lock()
Expand Down Expand Up @@ -616,10 +608,7 @@ func TestRemoteOffsetUnhealthy(t *testing.T) {
nodeCtxs[i].ctx.heartbeatInterval = maxOffset

s := newTestServer(t, nodeCtxs[i].ctx, true)
RegisterHeartbeatServer(s, &HeartbeatService{
clock: clock,
remoteClockMonitor: nodeCtxs[i].ctx.RemoteClocks,
})
RegisterHeartbeatServer(s, &HeartbeatService{nodeCtxs[i].ctx})
ln, err := netutil.ListenAndServeGRPC(nodeCtxs[i].ctx.Stopper, s, util.TestAddr)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -687,10 +676,7 @@ func TestGRPCKeepaliveFailureFailsInflightRPCs(t *testing.T) {
stopper,
)
s := newTestServer(t, serverCtx, true)
RegisterHeartbeatServer(s, &HeartbeatService{
clock: clock,
remoteClockMonitor: serverCtx.RemoteClocks,
})
RegisterHeartbeatServer(s, &HeartbeatService{serverCtx})

ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr)
if err != nil {
Expand Down Expand Up @@ -788,6 +774,48 @@ func TestGRPCKeepaliveFailureFailsInflightRPCs(t *testing.T) {
// sufficiently tested in TestHeartbeatHealthTransport.
}

func TestClusterIDMismatch(t *testing.T) {
defer leaktest.AfterTest(t)()

stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())

clock := hlc.NewClock(timeutil.Unix(0, 20).UnixNano, time.Nanosecond)
serverCtx := NewContext(
log.AmbientContext{Tracer: tracing.NewTracer()}, testutils.NewNodeTestBaseContext(), clock, stopper)
serverCtx.ClusterID = uuid.MakeV4()
s := newTestServer(t, serverCtx, true)
RegisterHeartbeatServer(s, &HeartbeatService{serverCtx})

ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr)
if err != nil {
t.Fatal(err)
}
remoteAddr := ln.Addr().String()

clientCtx := NewContext(
log.AmbientContext{Tracer: tracing.NewTracer()}, testutils.NewNodeTestBaseContext(), clock, stopper)
// Set client ClusterID differently from server.
clientCtx.ClusterID = uuid.MakeV4()

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
_, err = clientCtx.GRPCDial(remoteAddr)
expected := "initial connection heartbeat failed.*doesn't match server cluster ID"
if !testutils.IsError(err, expected) {
t.Errorf("expected %s error, got %v", expected, err)
}
wg.Done()
}()
}
wg.Wait()
if _, ok := clientCtx.conns.Load(remoteAddr); ok {
t.Fatal("connection should not have been added to client connection pool")
}
}

func BenchmarkGRPCDial(b *testing.B) {
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
Expand Down
Loading

0 comments on commit f9c09c2

Please sign in to comment.