Skip to content

Commit

Permalink
core: Check for cluster ID conflicts on handshake
Browse files Browse the repository at this point in the history
In the heartbeat, nodes now share their cluster IDs and check that they
match. We allow for missing cluster IDs, since new nodes do not have a
cluster ID until they obtain one via gossip, but conflicting IDs will
result in a heartbeat error.

In addition, connections are now not added to the connection pool until
the heartbeat succeeds. This allows us to fail fast when a node attempts
to join the wrong cluster.

Refers #18058.
  • Loading branch information
solongordon committed Dec 5, 2017
1 parent 6695a57 commit a826bfb
Show file tree
Hide file tree
Showing 47 changed files with 634 additions and 210 deletions.
2 changes: 1 addition & 1 deletion pkg/acceptance/cluster/dockercluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ func (l *DockerCluster) NewClient(ctx context.Context, i int) (*roachClient.DB,
User: security.NodeUser,
SSLCertsDir: l.CertsDir,
}, clock, l.stopper)
conn, err := rpcContext.GRPCDial(l.Nodes[i].Addr(ctx, DefaultTCP).String())
conn, err := rpcContext.GRPCDial(l.Nodes[i].Addr(ctx, DefaultTCP).String()).Connect(ctx)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/acceptance/localcluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ func (n *Node) Client() *client.DB {
return existingClient
}

conn, err := n.rpcCtx.GRPCDial(n.RPCAddr())
conn, err := n.rpcCtx.GRPCDial(n.RPCAddr()).Connect(context.Background())
if err != nil {
log.Fatalf(context.Background(), "failed to initialize KV client: %s", err)
}
Expand All @@ -531,7 +531,7 @@ func (n *Node) StatusClient() serverpb.StatusClient {
return existingClient
}

conn, err := n.rpcCtx.GRPCDial(n.RPCAddr())
conn, err := n.rpcCtx.GRPCDial(n.RPCAddr()).Connect(context.Background())
if err != nil {
log.Fatalf(context.Background(), "failed to initialize status client: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/acceptance/terrafarm/farmer.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func (f *Farmer) Exec(i int, cmd string) error {

// NewClient implements the Cluster interface.
func (f *Farmer) NewClient(ctx context.Context, i int) (*client.DB, error) {
conn, err := f.RPCContext.GRPCDial(f.Addr(ctx, i, base.DefaultPort))
conn, err := f.RPCContext.GRPCDial(f.Addr(ctx, i, base.DefaultPort)).Connect(ctx)
if err != nil {
return nil, err
}
Expand Down
71 changes: 71 additions & 0 deletions pkg/base/cluster_id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2017 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package base

import (
"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

// ClusterIDContainer is used to share a single Cluster ID instance between
// multiple layers. It allows setting and getting the value. Once a value is
// set, the value cannot change.
type ClusterIDContainer struct {
syncutil.Mutex

clusterID uuid.UUID
}

// String returns the cluster ID, or "?" if it is unset.
func (c *ClusterIDContainer) String() string {
val := c.Get()
if val == uuid.Nil {
return "?"
}
return val.String()
}

// Get returns the current cluster ID; uuid.Nil if it is unset.
func (c *ClusterIDContainer) Get() uuid.UUID {
c.Lock()
defer c.Unlock()
return c.clusterID
}

// Set sets the current cluster ID. If it is already set, the value must match.
func (c *ClusterIDContainer) Set(ctx context.Context, val uuid.UUID) {
c.Lock()
defer c.Unlock()
if c.clusterID == uuid.Nil {
c.clusterID = val
if log.V(2) {
log.Infof(ctx, "ClusterID set to %s", val)
}
} else if c.clusterID != val {
log.Fatalf(ctx, "different ClusterIDs set: %s, then %s", c.clusterID, val)
}
}

// Reset changes the ClusterID regardless of the old value.
//
// Should only be used in testing code.
func (c *ClusterIDContainer) Reset(val uuid.UUID) {
c.Lock()
defer c.Unlock()
c.clusterID = val
}
69 changes: 69 additions & 0 deletions pkg/base/cluster_id_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2017 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package base_test

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/util/uuid"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"golang.org/x/net/context"
)

func TestClusterIDContainerEmpty(t *testing.T) {
defer leaktest.AfterTest(t)()
c := &base.ClusterIDContainer{}

if val := c.Get(); val != uuid.Nil {
t.Errorf("initial value should be uuid.Nil, not %s", val)
}
if str := c.String(); str != "?" {
t.Errorf("initial string should be ?, not %s", str)
}
}

func TestClusterIDContainerSet(t *testing.T) {
defer leaktest.AfterTest(t)()
c := &base.ClusterIDContainer{}
u := uuid.MakeV4()

for i := 0; i < 2; i++ {
c.Set(context.Background(), u)
if val := c.Get(); val != u {
t.Errorf("value should be %s, not %s", u, val)
}
if str := c.String(); str != u.String() {
t.Errorf("string should be %s, not %s", u.String(), str)
}
}
}

func TestClusterIDContainerReset(t *testing.T) {
defer leaktest.AfterTest(t)()
c := &base.ClusterIDContainer{}
uuid1 := uuid.MakeV4()
uuid2 := uuid.MakeV4()

c.Set(context.Background(), uuid1)
c.Reset(uuid2)
if val := c.Get(); val != uuid2 {
t.Errorf("value should be %s, not %s", uuid2, val)
}
if str := c.String(); str != uuid2.String() {
t.Errorf("string should be %s, not %s", uuid2.String(), str)
}
}
2 changes: 1 addition & 1 deletion pkg/ccl/sqlccl/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func newKVWriteBatch(b *testing.B) kvInterface {
// sent over the network.
rpcContext := s.RPCContext()

conn, err := rpcContext.GRPCDial(s.ServingAddr())
conn, err := rpcContext.GRPCDial(s.ServingAddr()).Connect(context.Background())
if err != nil {
b.Fatal(err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,8 @@ communicate with a secure cluster\).
// up the stack as a roachpb.NewError(roachpb.NewSendError(.)).
// Error returned directly from GRPC.
{`quit`, styled(
`Failed to connect to the node: error sending drain request: rpc error: code = Unavailable desc = grpc: the connection is unavailable`),
`Failed to connect to the node: initial connection heartbeat failed: rpc error: ` +
`code = Unavailable desc = grpc: the connection is unavailable`),
},
// Going through the SQL client libraries gives a *net.OpError which
// we also handle.
Expand Down
4 changes: 2 additions & 2 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ func getClientGRPCConn(ctx context.Context) (*grpc.ClientConn, *hlc.Clock, func(
if err != nil {
return nil, nil, nil, err
}
conn, err := rpcContext.GRPCDial(addr)
conn, err := rpcContext.GRPCDial(addr).Connect(ctx)
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -963,7 +963,7 @@ func getClientGRPCConn(ctx context.Context) (*grpc.ClientConn, *hlc.Clock, func(
func getAdminClient(ctx context.Context) (serverpb.AdminClient, func(), error) {
conn, _, finish, err := getClientGRPCConn(ctx)
if err != nil {
return nil, nil, err
return nil, nil, errors.Wrap(err, "Failed to connect to the node")
}
return serverpb.NewAdminClient(conn), finish, nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/gossip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (c *client) startLocked(
// asynchronous from the caller's perspective, so the only effect of
// `WithBlock` here is blocking shutdown - at the time of this writing,
// that ends ups up making `kv` tests take twice as long.
conn, err := rpcCtx.GRPCDial(c.addr.String())
conn, err := rpcCtx.GRPCDial(c.addr.String()).Connect(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -159,7 +159,7 @@ func (c *client) requestGossip(g *Gossip, stream Gossip_GossipClient) error {
NodeID: g.NodeID.Get(),
Addr: g.mu.is.NodeAddr,
HighWaterStamps: g.mu.is.getHighWaterStamps(),
ClusterID: g.mu.clusterID,
ClusterID: g.clusterID.Get(),
}
g.mu.Unlock()

Expand All @@ -180,7 +180,7 @@ func (c *client) sendGossip(g *Gossip, stream Gossip_GossipClient) error {
Addr: g.mu.is.NodeAddr,
Delta: delta,
HighWaterStamps: g.mu.is.getHighWaterStamps(),
ClusterID: g.mu.clusterID,
ClusterID: g.clusterID.Get(),
}

bytesSent := int64(args.Size())
Expand Down
16 changes: 9 additions & 7 deletions pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,9 @@ type Gossip struct {
}

// New creates an instance of a gossip node.
// The higher level manages the NodeIDContainer instance (which can be shared by
// various server components). The ambient context is expected to already
// contain the node ID.
// The higher level manages the ClusterIDContainer and NodeIDContainer instances
// (which can be shared by various server components). The ambient context is
// expected to already contain the node ID.
//
// grpcServer: The server on which the new Gossip instance will register its RPC
// service. Can be nil, in which case the Gossip will not register the
Expand All @@ -251,6 +251,7 @@ type Gossip struct {
// restricted way by populating it with data manually.
func New(
ambient log.AmbientContext,
clusterID *base.ClusterIDContainer,
nodeID *base.NodeIDContainer,
rpcContext *rpc.Context,
grpcServer *grpc.Server,
Expand All @@ -259,7 +260,7 @@ func New(
) *Gossip {
ambient.SetEventLog("gossip", "gossip")
g := &Gossip{
server: newServer(ambient, nodeID, stopper, registry),
server: newServer(ambient, clusterID, nodeID, stopper, registry),
Connected: make(chan struct{}),
rpcContext: rpcContext,
outgoing: makeNodeSet(minPeers, metric.NewGauge(MetaConnectionsOutgoingGauge)),
Expand Down Expand Up @@ -294,8 +295,8 @@ func New(
return g
}

// NewTest is a simplified wrapper around New that creates the NodeIDContainer
// internally. Used for testing.
// NewTest is a simplified wrapper around New that creates the
// ClusterIDContainer and NodeIDContainer internally. Used for testing.
//
// grpcServer: The server on which the new Gossip instance will register its RPC
// service. Can be nil, in which case the Gossip will not register the
Expand All @@ -310,10 +311,11 @@ func NewTest(
stopper *stop.Stopper,
registry *metric.Registry,
) *Gossip {
c := &base.ClusterIDContainer{}
n := &base.NodeIDContainer{}
var ac log.AmbientContext
ac.AddLogTag("n", n)
gossip := New(ac, n, rpcContext, grpcServer, stopper, registry)
gossip := New(ac, c, n, rpcContext, grpcServer, stopper, registry)
if nodeID != 0 {
n.Set(context.TODO(), nodeID)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func TestGossipNoForwardSelf(t *testing.T) {
c := newClient(log.AmbientContext{Tracer: tracing.NewTracer()}, local.GetNodeAddr(), makeMetrics())

testutils.SucceedsSoon(t, func() error {
conn, err := peer.rpcContext.GRPCDial(c.addr.String(), grpc.WithBlock())
conn, err := peer.rpcContext.GRPCDial(c.addr.String(), grpc.WithBlock()).Connect(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -563,7 +563,7 @@ func TestGossipJoinTwoClusters(t *testing.T) {
g = append(g, gnode)
gnode.SetStallInterval(interval)
gnode.SetBootstrapInterval(interval)
gnode.SetClusterID(clusterIDs[i])
gnode.clusterID.Set(context.TODO(), clusterIDs[i])

ln, err := netutil.ListenAndServeGRPC(stopper, server, util.IsolatedTestAddr)
if err != nil {
Expand Down
29 changes: 8 additions & 21 deletions pkg/gossip/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,16 @@ type serverInfo struct {
type server struct {
log.AmbientContext

NodeID *base.NodeIDContainer
clusterID *base.ClusterIDContainer
NodeID *base.NodeIDContainer

stopper *stop.Stopper

mu struct {
syncutil.Mutex
is *infoStore // The backing infostore
incoming nodeSet // Incoming client node IDs
nodeMap map[util.UnresolvedAddr]serverInfo // Incoming client's local address -> serverInfo
clusterID uuid.UUID
is *infoStore // The backing infostore
incoming nodeSet // Incoming client node IDs
nodeMap map[util.UnresolvedAddr]serverInfo // Incoming client's local address -> serverInfo
// ready broadcasts a wakeup to waiting gossip requests. This is done
// via closing the current ready channel and opening a new one. This
// is required due to the fact that condition variables are not
Expand All @@ -74,12 +74,14 @@ type server struct {
// newServer creates and returns a server struct.
func newServer(
ambient log.AmbientContext,
clusterID *base.ClusterIDContainer,
nodeID *base.NodeIDContainer,
stopper *stop.Stopper,
registry *metric.Registry,
) *server {
s := &server{
AmbientContext: ambient,
clusterID: clusterID,
NodeID: nodeID,
stopper: stopper,
tighten: make(chan struct{}, 1),
Expand All @@ -98,21 +100,6 @@ func newServer(
return s
}

// SetClusterID sets the cluster ID to prevent nodes from illegally
// connecting to incorrect clusters, or from allowing nodes from
// other clusters to incorrectly connect to this one.
func (s *server) SetClusterID(clusterID uuid.UUID) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.clusterID = clusterID
}

func (s *server) GetClusterID() uuid.UUID {
s.mu.Lock()
defer s.mu.Unlock()
return s.mu.clusterID
}

// GetNodeMetrics returns this server's node metrics struct.
func (s *server) GetNodeMetrics() *Metrics {
return &s.nodeMetrics
Expand All @@ -126,7 +113,7 @@ func (s *server) Gossip(stream Gossip_GossipServer) error {
if err != nil {
return err
}
if (args.ClusterID != uuid.UUID{}) && args.ClusterID != s.GetClusterID() {
if (args.ClusterID != uuid.UUID{}) && args.ClusterID != s.clusterID.Get() {
return errors.Errorf("gossip connection refused from different cluster %s", args.ClusterID)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func createTestClientForUser(
testutils.FillCerts(cfg)

rpcContext := rpc.NewContext(log.AmbientContext{Tracer: s.ClusterSettings().Tracer}, cfg, s.Clock(), s.Stopper())
conn, err := rpcContext.GRPCDial(s.ServingAddr())
conn, err := rpcContext.GRPCDial(s.ServingAddr()).Connect(context.Background())
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit a826bfb

Please sign in to comment.