Skip to content

Commit

Permalink
server: introduce join rpc for node id allocation
Browse files Browse the repository at this point in the history
This mostly follows the ideas in #32574, and serves as a crucial
building block for #48843. Specifically this PR introduces a new Join
RPC that new nodes can use, addressing already initialized nodes, to
learn about the cluster ID and its node id. Previously joining nodes
were responsible for allocating their own IDs and used to discover the
cluster ID.

By moving towards a more understandable flow of how nodes joins the
cluster, we can build a few useful primitives on top of this:
 - we can prevent mismatched version nodes from joining the cluster
 - we can prevent decommissioned nodes from joining the cluster
 - we can add the liveness record for a given node as soon as it joins,
   which would simplify our liveness record handling code that is
   perennially concerned with missing liveness records

The tiny bit of complexity in this PR comes from how we're able to
migrate into this behavior from the old. To that end we retain the
earlier gossip-based cluster ID discovery+node ID allocation for self
behavior. Nodes with this patch will attempt to use this join RPC, if
implemented on the addressed node, and fall back to using the previous
behavior if not. It wasn't possible to use cluster versions for this
migrations because this happens very early in the node start up process,
and the version gating this change will not be active until much later
in the crdb process lifetime.

---

There are some leftover TODOs that I'm looking to address in this PR.
They should be tiny, and be easy to retro-fit into what we have so far.
Specifically I'm going to plumb the client address into the RPC so the
server is able to generate backlinks (and solve the bidirectionality
problem). I'm also going to try and add the liveness record for a
joining node as part of the join rpc. Right now the tests verifying
connectivity/bootstrap/join flags pass out of the box, but I'm going to
try adding more randomized testing here to test full connectivity once I
address these TODOs.

Release note: None
  • Loading branch information
irfansharif committed Aug 11, 2020
1 parent 7b7a631 commit 7410cb1
Show file tree
Hide file tree
Showing 14 changed files with 953 additions and 174 deletions.
1 change: 0 additions & 1 deletion pkg/cli/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func runInit(cmd *cobra.Command, args []string) error {

// Actually perform cluster initialization.
c := serverpb.NewInitClient(conn)

if _, err = c.Bootstrap(ctx, &serverpb.BootstrapRequest{}); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ func runStart(cmd *cobra.Command, args []string, disableReplication bool) error

if waitForInit {
log.Shout(ctx, log.Severity_INFO,
"initial startup completed.\n"+
"initial startup completed\n"+
"Node will now attempt to join a running cluster, or wait for `cockroach init`.\n"+
"Client connections will be accepted after this completes successfully.\n"+
"Check the log file(s) for progress. ")
Expand Down
1 change: 0 additions & 1 deletion pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,6 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Version: roachpb.Version{Major: 20, Minor: 1, Unstable: 14},
},
// Add new versions here (step two of two).

})

// TODO(irfansharif): clusterversion.binary{,MinimumSupported}Version
Expand Down
4 changes: 4 additions & 0 deletions pkg/cmd/roachtest/decommission.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,10 @@ func runDecommissionRecommission(ctx context.Context, t *test, c *cluster) {
Multiplier: 2,
}

// XXX: Hack, to see node come into liveness table. We should do this as
// part of the connect rpc.
time.Sleep(10 * time.Second)

// Partially decommission then recommission a random node, from another
// random node. Run a couple of status checks while doing so.
{
Expand Down
2 changes: 1 addition & 1 deletion pkg/gossip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (c *client) handleResponse(ctx context.Context, g *Gossip, reply *Response)
// Check whether this outgoing client is duplicating work already
// being done by an incoming client, either because an outgoing
// matches an incoming or the client is connecting to itself.
if nodeID := g.NodeID.Get(); nodeID == c.peerID {
if nodeID := g.NodeID.Get(); nodeID != 0 && nodeID == c.peerID {
return errors.Errorf("stopping outgoing client to n%d (%s); loopback connection", c.peerID, c.addr)
} else if g.hasIncomingLocked(c.peerID) && nodeID > c.peerID {
// To avoid mutual shutdown, we only shutdown our client if our
Expand Down
13 changes: 8 additions & 5 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,9 @@ type KVConfig struct {
// in zone configs.
Attrs string

// JoinList is a list of node addresses that act as bootstrap hosts for
// connecting to the gossip network.
// JoinList is a list of node addresses that is used to form a network of KV
// servers. Assuming a connected graph, it suffices to initialize any server
// in the network.
JoinList base.JoinListType

// JoinPreferSRVRecords, if set, causes the lookup logic for the
Expand Down Expand Up @@ -646,11 +647,13 @@ func (cfg *Config) InitNode(ctx context.Context) error {

// FilterGossipBootstrapResolvers removes any gossip bootstrap resolvers which
// match either this node's listen address or its advertised host address.
func (cfg *Config) FilterGossipBootstrapResolvers(
ctx context.Context, listen, advert net.Addr,
) []resolver.Resolver {
func (cfg *Config) FilterGossipBootstrapResolvers(ctx context.Context) []resolver.Resolver {
var listen, advert net.Addr
listen = util.NewUnresolvedAddr("tcp", cfg.Addr)
advert = util.NewUnresolvedAddr("tcp", cfg.AdvertiseAddr)
filtered := make([]resolver.Resolver, 0, len(cfg.GossipBootstrapResolvers))
addrs := make([]string, 0, len(cfg.GossipBootstrapResolvers))

for _, r := range cfg.GossipBootstrapResolvers {
if r.Addr() == advert.String() || r.Addr() == listen.String() {
if log.V(1) {
Expand Down
7 changes: 3 additions & 4 deletions pkg/server/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/gossip/resolver"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -191,10 +190,10 @@ func TestFilterGossipBootstrapResolvers(t *testing.T) {
}
cfg := MakeConfig(context.Background(), cluster.MakeTestingClusterSettings())
cfg.GossipBootstrapResolvers = resolvers
cfg.Addr = resolverSpecs[0]
cfg.AdvertiseAddr = resolverSpecs[2]

listenAddr := util.MakeUnresolvedAddr("tcp", resolverSpecs[0])
advertAddr := util.MakeUnresolvedAddr("tcp", resolverSpecs[2])
filtered := cfg.FilterGossipBootstrapResolvers(context.Background(), &listenAddr, &advertAddr)
filtered := cfg.FilterGossipBootstrapResolvers(context.Background())
if len(filtered) != 1 {
t.Fatalf("expected one resolver; got %+v", filtered)
} else if filtered[0].Addr() != resolverSpecs[1] {
Expand Down
1 change: 1 addition & 0 deletions pkg/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ var rpcsAllowedWhileBootstrapping = map[string]struct{}{
"/cockroach.rpc.Heartbeat/Ping": {},
"/cockroach.gossip.Gossip/Gossip": {},
"/cockroach.server.serverpb.Init/Bootstrap": {},
"/cockroach.server.serverpb.Init/Join": {},
"/cockroach.server.serverpb.Admin/Health": {},
}

Expand Down
Loading

0 comments on commit 7410cb1

Please sign in to comment.