From dab8955ecbaf368581e6e51ffe8972e80a3d6232 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Tue, 3 Nov 2020 14:56:02 -0500 Subject: [PATCH] server: rm gossip usage from node/store/cluster ID allocation In 20.2, with #52526, we introduced the join RPC for new nodes in the system to be handed a pre-allocated node ID, a pre-allocated store ID, and be informed by the existing cluster what the cluster ID was. In 20.1 and earlier this functionality was provided by our use of gossip, and by constructing hollowed out servers (without node/store IDs) and allocating an ID at the joining node. This required an awkward dance around needing KV to be up in order to allocate the right IDs, but not having an ID for the server process itself. We retained the deprecated gossip codepaths in 20.2 in order to maintain compatibility with 20.1. Now that 20.2 is cut however, in 21.1 code we can always assume that we're talking to nodes that are at least 20.2, and therefore nodes that are able to use the join RPC correctly. This lets us strip out our usage of gossip for ID allocation, which in turn paves the way for a few other simplifications. We also delete the mixed-version/join-init roachtest, as it was only ever relevant for the 20.1/20.2 cycle. The current structure around joining nodes being allocated/informed of the right IDs has adequate coverage with TestClusterConnectivity. Release note: None --- pkg/cmd/roachtest/BUILD.bazel | 1 - pkg/cmd/roachtest/mixed_version_join_init.go | 227 ------------------- pkg/cmd/roachtest/registry.go | 1 - pkg/server/init.go | 172 +++++--------- pkg/server/node.go | 40 +--- pkg/server/server.go | 71 +----- 6 files changed, 63 insertions(+), 449 deletions(-) delete mode 100644 pkg/cmd/roachtest/mixed_version_join_init.go diff --git a/pkg/cmd/roachtest/BUILD.bazel b/pkg/cmd/roachtest/BUILD.bazel index 5833f1f2fabd..84bf2f0be76f 100644 --- a/pkg/cmd/roachtest/BUILD.bazel +++ b/pkg/cmd/roachtest/BUILD.bazel @@ -61,7 +61,6 @@ go_library( "many_splits.go", "mixed_version_decommission.go", "mixed_version_jobs.go", - "mixed_version_join_init.go", "mixed_version_schemachange.go", "multitenant.go", "namespace_upgrade.go", diff --git a/pkg/cmd/roachtest/mixed_version_join_init.go b/pkg/cmd/roachtest/mixed_version_join_init.go deleted file mode 100644 index 722d10f827c4..000000000000 --- a/pkg/cmd/roachtest/mixed_version_join_init.go +++ /dev/null @@ -1,227 +0,0 @@ -// Copyright 2020 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package main - -import ( - "context" - "fmt" - "math/rand" - "time" - - "github.com/cockroachdb/cockroach/pkg/server" - "github.com/cockroachdb/cockroach/pkg/util/uuid" - "github.com/cockroachdb/cockroach/pkg/util/version" - "github.com/cockroachdb/errors" -) - -func registerJoinInitMixed(r *testRegistry) { - numNodes := 4 - r.Add(testSpec{ - Name: "join-init/mixed", - Owner: OwnerKV, - MinVersion: "v20.2.0", - Cluster: makeClusterSpec(numNodes), - Run: func(ctx context.Context, t *test, c *cluster) { - runJoinInitMixed(ctx, t, c, r.buildVersion) - }, - }) -} - -// runJoinInitMixed tests the mechanism used to allocate node IDs and -// disseminate cluster IDs in mixed version clusters. -// -// TODO(irfansharif): This test is only really useful for the 20.1/20.2 -// timeframe where we introduced the Join RPC; we should remove this test at a -// future point. -func runJoinInitMixed(ctx context.Context, t *test, c *cluster, buildVersion version.Version) { - predecessorVersion, err := PredecessorVersion(buildVersion) - if err != nil { - t.Fatal(err) - } - - // An empty string means that the cockroach binary specified by flag - // `cockroach` will be used. - const mainVersion = "" - - // This test starts off with a two node cluster (node{1,2}) running at - // predecessor version. It then rolls over node2 to the current version. It - // then adds node3 to the cluster, which is randomized to be either the - // current version or the predecessor. It is also randomly configured to - // point to one of node1 or node2 (they're running different binary - // versions) in its join flags. - node1, node2, node3, node4 := 1, 2, 3, 4 - - nodeX := 1 + rand.Intn(2) // Either node1 or node2. - versionX := func() string { // Either current or predecessor version. - if rand.Intn(2) == 0 { - return mainVersion - } - return predecessorVersion - }() - t.l.Printf("nodeX = %d; versionX = \"%s\"", nodeX, versionX) - - allNodes := c.All() - u := newVersionUpgradeTest(c, - // We upload both binaries to each node, to be able to vary the binary - // used when issuing `cockroach node` subcommands. - uploadVersion(allNodes, predecessorVersion), - uploadVersion(allNodes, mainVersion), - - // Start everything at predecessor version. - startVersion(c.Range(1, 2), predecessorVersion), - waitForUpgradeStep(c.Range(1, 2)), - preventAutoUpgradeStep(node1), - - checkNodeAndStoreIDs(node1, 1), - checkNodeAndStoreIDs(node2, 2), - - // If we upgrade too soon, we some times run into "last used with - // cockroach version vX-1, is too old for running version vX+1" errors. - // Give it a generous window to persist the right version marker on - // disk. - // - // TODO(irfansharif): Figure out a better way to address this. This is - // applicable to a lot of tests. I'd naively expect `waitForUpgrade` to - // also wait for the on-disk version marker to get bumped. We might need - // to change crdb code to make that happen. - sleepStep(time.Minute), - - // Roll node2 into the new version and check to see that it retains its - // node/cluster ID. - binaryUpgradeStep(c.Node(node2), mainVersion), - checkClusterIDsMatch(node1, node2), - checkNodeAndStoreIDs(node2, 2), - - // Add node3 (running either predecessor version binary or current) to - // the cluster, pointing at nodeX (running either predecessor version - // binary or current). - addNodeStep(c.Node(node3), nodeX, versionX), - checkClusterIDsMatch(nodeX, node3), - checkNodeAndStoreIDs(node3, 3), - - // Roll all nodes forward, and finalize upgrade. - binaryUpgradeStep(c.Range(1, 3), mainVersion), - allowAutoUpgradeStep(node1), - waitForUpgradeStep(c.Range(1, 3)), - - checkNodeAndStoreIDs(node1, 1), - checkNodeAndStoreIDs(node2, 2), - checkNodeAndStoreIDs(node3, 3), - - // TODO(irfansharif): We'd like to add a step like the one below, and - // will only be able to do so once 20.2 is cut. 20.1 code does not make - // use of the Join RPC to join the cluster, so this "gating mechanism" - // does not apply. - // - // Add node4 (running at predecessor version) to the cluster, pointing - // at nodeX (running new version, now with new cluster version active). - // We expect this to fail. - // - // unsuccessfullyAddNodeStep(c.Node(node4), nodeX, predecessorVersion), - - // Add node4 (running at new version) to the cluster, pointing at nodeX. - // (running new version, now with new cluster version active). - addNodeStep(c.Node(node4), nodeX, mainVersion), - checkClusterIDsMatch(node1, node4), - checkNodeAndStoreIDs(node4, 4), - ) - - u.run(ctx, t) -} - -func addNodeStep(nodes nodeListOption, joinNode int, newVersion string) versionStep { - return func(ctx context.Context, t *test, u *versionUpgradeTest) { - c := u.c - args := u.uploadVersion(ctx, t, nodes, newVersion) - - for _, node := range nodes { - t.l.Printf("adding node %d to the cluster\n", node) - joinAddr := c.InternalAddr(ctx, c.Node(joinNode))[0] - c.Start(ctx, t, c.Node(node), args, - startArgs(fmt.Sprintf("-a=--join=%s", joinAddr)), - ) - } - } -} - -func unsuccessfullyAddNodeStep(nodes nodeListOption, joinNode int, newVersion string) versionStep { - return func(ctx context.Context, t *test, u *versionUpgradeTest) { - c := u.c - args := u.uploadVersion(ctx, t, nodes, newVersion) - - for _, node := range nodes { - t.l.Printf("adding node %d to the cluster\n", node) - joinAddr := c.InternalAddr(ctx, c.Node(joinNode))[0] - err := c.StartE(ctx, c.Node(node), args, - // TODO(irfansharif): `roachprod` should be taught to skip - // adding default flags if manually specified via --args/-a. - // Today it includes both versions, which seems silly. - startArgs(fmt.Sprintf("-a=--join=%s", joinAddr)), - ) - if !errors.Is(err, server.ErrIncompatibleBinaryVersion) { - t.Fatalf("expected err: %s, got %v", server.ErrIncompatibleBinaryVersion, err) - } - } - } -} - -var _ = unsuccessfullyAddNodeStep - -func checkClusterIDsMatch(nodeA, nodeB int) versionStep { - return func(ctx context.Context, t *test, u *versionUpgradeTest) { - var clusterIDA, clusterIDB uuid.UUID - { - db := u.conn(ctx, t, nodeA) - if err := db.QueryRow(`select crdb_internal.cluster_id();`).Scan(&clusterIDA); err != nil { - t.Fatal(err) - } - } - { - db := u.conn(ctx, t, nodeB) - if err := db.QueryRow(`select crdb_internal.cluster_id();`).Scan(&clusterIDB); err != nil { - t.Fatal(err) - } - } - - if clusterIDA != clusterIDB { - t.Fatalf("expected to cluster ids %s and %s to match", clusterIDA.String(), clusterIDB.String()) - } - } -} - -func checkNodeAndStoreIDs(from int, exp int) versionStep { - return func(ctx context.Context, t *test, u *versionUpgradeTest) { - db := u.conn(ctx, t, from) - var nodeID, storeID int - if err := db.QueryRow(`SELECT node_id FROM crdb_internal.node_runtime_info LIMIT 1;`).Scan(&nodeID); err != nil { - t.Fatal(err) - } - - if exp != nodeID { - t.Fatalf("expected to find node id %d, found %d", exp, nodeID) - } - - if err := db.QueryRow(`SELECT store_id FROM crdb_internal.kv_store_status WHERE node_id = $1 LIMIT 1;`, nodeID).Scan(&storeID); err != nil { - t.Fatal(err) - } - - if exp != storeID { - t.Fatalf("expected to find store id %d, found %d", exp, storeID) - } - } -} - -func sleepStep(duration time.Duration) versionStep { - return func(ctx context.Context, t *test, u *versionUpgradeTest) { - t.l.Printf("sleeping for %s...", duration.String()) - time.Sleep(duration) - } -} diff --git a/pkg/cmd/roachtest/registry.go b/pkg/cmd/roachtest/registry.go index 543b65b9a6d1..f563af17f001 100644 --- a/pkg/cmd/roachtest/registry.go +++ b/pkg/cmd/roachtest/registry.go @@ -49,7 +49,6 @@ func registerTests(r *testRegistry) { registerInterleaved(r) registerJepsen(r) registerJobsMixedVersions(r) - registerJoinInitMixed(r) registerKV(r) registerKVContention(r) registerKVQuiescenceDead(r) diff --git a/pkg/server/init.go b/pkg/server/init.go index 4a569b8315e1..38e6dd8f20e4 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -18,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/config/zonepb" - "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/gossip/resolver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -39,13 +38,6 @@ import ( // a node that is already part of an initialized cluster. var ErrClusterInitialized = fmt.Errorf("cluster has already been initialized") -// errJoinRPCUnsupported is reported when the Join RPC is run against -// a node that does not know about the join RPC (i.e. it is running 20.1 or -// below). -// -// TODO(irfansharif): Remove this in 21.1. -var errJoinRPCUnsupported = fmt.Errorf("node does not support the Join RPC") - // ErrIncompatibleBinaryVersion is returned when a CRDB node with a binary version X // attempts to join a cluster with an active version that's higher. This is not // allowed. @@ -148,6 +140,18 @@ func (i *initState) bootstrapped() bool { return len(i.initializedEngines) > 0 } +// validate asserts that the init state is a fully fleshed out one (i.e. with a +// non-empty cluster ID and node ID). +func (i *initState) validate() error { + if (i.clusterID == uuid.UUID{}) { + return errors.New("missing cluster ID") + } + if i.nodeID == 0 { + return errors.New("missing node ID") + } + return nil +} + // joinResult is used to represent the result of a node attempting to join // an already bootstrapped cluster. type joinResult struct { @@ -160,10 +164,9 @@ type joinResult struct { // restarting an existing node, this immediately returns. When starting with a // blank slate (i.e. only empty engines), it waits for incoming Bootstrap // request or for a successful outgoing Join RPC, whichever happens earlier. -// See [1]. // // The returned initState reflects a bootstrapped cluster (i.e. it has a cluster -// ID and a node ID for this server). See [2]. +// ID and a node ID for this server). // // This method must be called only once. // @@ -180,18 +183,8 @@ type joinResult struct { // for logging and reporting. A newly bootstrapped single-node cluster is // functionally equivalent to one that restarted; any decisions should be made // on persisted data instead of this flag. -// -// [1]: In mixed version clusters it waits until Gossip connects (but this is -// slated to be removed in 21.1). -// -// [2]: This is not technically true for mixed version clusters where we leave -// the node ID unassigned until later, but this too is part of the deprecated -// init server behavior that is slated for removal in 21.1. func (s *initServer) ServeAndWait( - ctx context.Context, - stopper *stop.Stopper, - sv *settings.Values, - startGossipFn func() *gossip.Gossip, + ctx context.Context, stopper *stop.Stopper, sv *settings.Values, ) (state *initState, initialBoot bool, err error) { // If we're restarting an already bootstrapped node, return early. if s.inspectedDiskState.bootstrapped() { @@ -201,42 +194,46 @@ func (s *initServer) ServeAndWait( log.Info(ctx, "no stores initialized") log.Info(ctx, "awaiting `cockroach init` or join with an already initialized node") - joinCtx, cancelJoin := context.WithCancel(ctx) - defer cancelJoin() - + // If we end up joining a bootstrapped cluster, the resulting init state + // will be passed through this channel. + var joinCh chan joinResult + var cancelJoin = func() {} var wg sync.WaitGroup - wg.Add(1) - // If this CRDB node was able to join a bootstrapped cluster, the resulting - // init state will be passed through to this channel. - joinCh := make(chan joinResult, 1) - if err := stopper.RunTask(joinCtx, "init server: join loop", func(joinCtx context.Context) { - stopper.RunWorker(joinCtx, func(joinCtx context.Context) { - defer wg.Done() - - state, err := s.startJoinLoop(joinCtx, stopper) - joinCh <- joinResult{ - state: state, - err: err, - } - }) - }); err != nil { - return nil, false, err - } - // gossipConnectedCh is used as a place holder for gossip.Connected. We - // don't trigger on gossip connectivity unless we have to, favoring instead - // the join RPC to discover the cluster ID (and node ID). If we're in a - // mixed-version cluster however (with 20.1 nodes), we'll fall back to using - // the legacy gossip connectivity mechanism to discover the cluster ID. - var gossipConnectedCh chan struct{} - var g *gossip.Gossip + if len(s.config.resolvers) == 0 { + // We're pointing to only ourselves or nothing at all, which (likely) + // suggests that we're going to be bootstrapped by the operator. Since + // we're not going to be sending out join RPCs, we don't bother spinning + // up the join loop. + } else { + joinCh = make(chan joinResult, 1) + wg.Add(1) + + var joinCtx context.Context + joinCtx, cancelJoin = context.WithCancel(ctx) + defer cancelJoin() + + err := stopper.RunTask(joinCtx, "init server: join loop", + func(joinCtx context.Context) { + stopper.RunWorker(joinCtx, func(joinCtx context.Context) { + defer wg.Done() + + state, err := s.startJoinLoop(joinCtx, stopper) + joinCh <- joinResult{state: state, err: err} + }) + }) + if err != nil { + return nil, false, err + } + } for { select { case state := <-s.bootstrapReqCh: - // Ensure we're draining out the join attempt. We're not going to - // need it anymore and it had no chance of joining anywhere (since - // we are starting the new cluster and are not serving Join yet). + // Ensure we're draining out the join attempt, if any. We're not + // going to need it anymore and it had no chance of joining + // elsewhere (since we are the ones bootstrapping the new cluster + // and have not started serving Join yet). cancelJoin() wg.Wait() @@ -265,26 +262,13 @@ func (s *initServer) ServeAndWait( wg.Wait() if err := result.err; err != nil { - if errors.Is(err, errJoinRPCUnsupported) { - // We're in a mixed-version cluster, we start gossip and wire up - // the gossip connectivity mechanism to discover the cluster ID. - g = startGossipFn() - gossipConnectedCh = g.Connected - - // Let's nil out joinCh to prevent accidental re-use. - close(joinCh) - joinCh = nil - - continue - } - if errors.Is(err, ErrIncompatibleBinaryVersion) { return nil, false, err } if err != nil { // We expect the join RPC to blindly retry on all - // "connection" errors save for the two above. If we're + // "connection" errors save for one above. If we're // here, we failed to initialize our first store after a // successful join attempt. return nil, false, errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error: %v", err) @@ -301,43 +285,6 @@ func (s *initServer) ServeAndWait( log.Infof(ctx, "joined cluster %s through join rpc", state.clusterID) log.Infof(ctx, "received node ID %d", state.nodeID) - return state, true, nil - case <-gossipConnectedCh: - // Ensure we're draining out the join attempt. - wg.Wait() - - // We're in a mixed-version cluster, so we retain the legacy - // behavior of retrieving the cluster ID and deferring node ID - // allocation (happens in (*Node).start). - // - // TODO(irfansharif): Remove this in 21.1. - - // Gossip connected, that is, we know a ClusterID. Due to the early - // return above, we know that all of our engines are empty, i.e. we - // don't have a NodeID yet (and the cluster version is the minimum we - // support). Commence startup; the Node will realize it's short a - // NodeID and will request one. - clusterID, err := g.GetClusterID() - if err != nil { - return nil, false, err - } - - state := &initState{ - // NB: We elide the node ID, we don't have one available yet. - clusterID: clusterID, - clusterVersion: s.inspectedDiskState.clusterVersion, - initializedEngines: s.inspectedDiskState.initializedEngines, - uninitializedEngines: s.inspectedDiskState.uninitializedEngines, - } - - // We mark ourselves as bootstrapped to prevent future bootstrap - // attempts. - s.mu.Lock() - s.mu.bootstrapped = true - s.mu.Unlock() - - log.Infof(ctx, "joined cluster %s through gossip (legacy behavior)", state.clusterID) - return state, true, nil case <-stopper.ShouldQuiesce(): return nil, false, stop.ErrUnavailable @@ -396,11 +343,7 @@ func (s *initServer) Bootstrap( // running a binary that's too old to join the rest of the cluster. func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) (*initState, error) { if len(s.config.resolvers) == 0 { - // We're pointing to only ourselves, which is probably indicative of a - // node that's going to be bootstrapped by the operator. We could opt to - // not fall back to the gossip based connectivity mechanism, but we do - // it anyway. - return nil, errJoinRPCUnsupported + return nil, errors.AssertionFailedf("expected to find at least one resolver, found none") } // Iterate through all the resolvers at least once to reduce time taken to @@ -417,9 +360,9 @@ func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) ( addr := res.Addr() resp, err := s.attemptJoinTo(ctx, res.Addr()) - if errors.Is(err, errJoinRPCUnsupported) || errors.Is(err, ErrIncompatibleBinaryVersion) { - // Propagate upwards; these are error conditions the caller knows to - // expect. + if errors.Is(err, ErrIncompatibleBinaryVersion) { + // Propagate upwards; this is an error condition the caller knows + // to expect. return nil, err } if err != nil { @@ -459,8 +402,8 @@ func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) ( select { case <-tickChan: resp, err := s.attemptJoinTo(ctx, addr) - if errors.Is(err, errJoinRPCUnsupported) || errors.Is(err, ErrIncompatibleBinaryVersion) { - // Propagate upwards; these are error conditions the caller + if errors.Is(err, ErrIncompatibleBinaryVersion) { + // Propagate upwards; this is an error condition the caller // knows to expect. return nil, err } @@ -537,11 +480,6 @@ func (s *initServer) attemptJoinTo( // The caller code, as written, switches on the error type; that'll need // to be changed as well. - if status.Code() == codes.Unimplemented { - log.Infof(ctx, "%s running an older version; falling back to gossip-based cluster join", addr) - return nil, errJoinRPCUnsupported - } - if status.Code() == codes.PermissionDenied { log.Infof(ctx, "%s is running a version higher than our binary version %s", addr, req.BinaryVersion.String()) return nil, ErrIncompatibleBinaryVersion diff --git a/pkg/server/node.go b/pkg/server/node.go index 1767644d5ce3..62b0bd8ee361 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -338,48 +338,10 @@ func (n *Node) start( localityAddress []roachpb.LocalityAddress, nodeDescriptorCallback func(descriptor roachpb.NodeDescriptor), ) error { - // Obtaining the NodeID requires a dance of sorts. If the node has initialized - // stores, the NodeID is persisted in each of them. If not, then we'll need to - // use the KV store to get a NodeID assigned. n.initialStart = initialStart - nodeID := state.nodeID - if nodeID == 0 { - // TODO(irfansharif): This codepath exists to maintain the legacy - // behavior of node ID allocation that was triggered on gossip - // connectivity. This was replaced by the Join RPC in 20.2, and can be - // removed in 21.1. - if !initialStart { - log.Fatalf(ctx, "node has no NodeID, but claims to not be joining cluster") - } - // Allocate NodeID. Note that Gossip is already connected because if there's - // no NodeID yet, this means that we had to connect Gossip to learn the ClusterID. - select { - case <-n.storeCfg.Gossip.Connected: - default: - log.Fatalf(ctx, "gossip is not connected yet") - } - ctxWithSpan, span := n.AnnotateCtxWithSpan(ctx, "alloc-node-id") - newID, err := allocateNodeID(ctxWithSpan, n.storeCfg.DB) - if err != nil { - return err - } - log.Infof(ctxWithSpan, "new node allocated ID %d", newID) - span.Finish() - nodeID = newID - - // We're joining via gossip, so we don't have a liveness record for - // ourselves yet. Let's create one while here. - if err := n.storeCfg.NodeLiveness.CreateLivenessRecord(ctx, nodeID); err != nil { - return err - } - } - - // Inform the RPC context of the node ID. - n.storeCfg.RPCContext.NodeID.Set(ctx, nodeID) - n.startedAt = n.storeCfg.Clock.Now().WallTime n.Descriptor = roachpb.NodeDescriptor{ - NodeID: nodeID, + NodeID: state.nodeID, Address: util.MakeUnresolvedAddr(addr.Network(), addr.String()), SQLAddress: util.MakeUnresolvedAddr(sqlAddr.Network(), sqlAddr.String()), Attrs: attrs, diff --git a/pkg/server/server.go b/pkg/server/server.go index 701e0e4b3504..953dbb00e3a2 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1350,30 +1350,6 @@ func (s *Server) PreStart(ctx context.Context) error { // provided. advAddrU := util.NewUnresolvedAddr("tcp", s.cfg.AdvertiseAddr) - // As of 21.1, we will no longer need gossip to start before the init - // server. We need it in 20.2 for backwards compatibility with 20.1 servers - // that use gossip connectivity to distribute the cluster ID. In 20.2 we - // introduced a dedicated Join RPC to do exactly this, and so we can defer - // gossip start to after bootstrap/initialization. - // - // In order to defer starting gossip until absolutely needed, we wrap up - // gossip start in an idempotent function that's provided to the init - // server. It'll get invoked if we detect we're in a mixed-version cluster. - // If we're starting off at 20.2, we'll start gossip later. - // - // TODO(irfansharif): Remove this callback in 21.1. - var startGossipFn func() *gossip.Gossip - { - var once sync.Once - startGossipFn = func() *gossip.Gossip { - once.Do(func() { - s.gossip.Start(advAddrU, filtered) - log.Event(ctx, "started gossip") - }) - return s.gossip - } - } - if s.cfg.DelayedBootstrapFn != nil { defer time.AfterFunc(30*time.Second, s.cfg.DelayedBootstrapFn).Stop() } @@ -1413,20 +1389,16 @@ func (s *Server) PreStart(ctx context.Context) error { // incoming connections. startRPCServer(workersCtx) onInitServerReady() - state, initialStart, err := initServer.ServeAndWait(ctx, s.stopper, &s.cfg.Settings.SV, startGossipFn) + state, initialStart, err := initServer.ServeAndWait(ctx, s.stopper, &s.cfg.Settings.SV) if err != nil { return errors.Wrap(err, "during init") } + if err := state.validate(); err != nil { + return errors.Wrap(err, "invalid init state") + } s.rpcContext.ClusterID.Set(ctx, state.clusterID) - // If there's no NodeID here, then we didn't just bootstrap. The Node will - // read its ID from the stores or request a new one via KV. - // - // TODO(irfansharif): Make this unconditional once 20.2 is cut. This only - // exists to be compatible with 20.1 clusters. - if state.nodeID != 0 { - s.rpcContext.NodeID.Set(ctx, state.nodeID) - } + s.rpcContext.NodeID.Set(ctx, state.nodeID) // TODO(irfansharif): Now that we have our node ID, we should run another // check here to make sure we've not been decommissioned away (if we're here @@ -1462,36 +1434,6 @@ func (s *Server) PreStart(ctx context.Context) error { // initState -- and everything after it is actually starting the server, // using the listeners and init state. - // Defense in depth: set up an eager sanity check that we're not - // accidentally being pointed at a different cluster. We have checks for - // this in the RPC layer, but since the RPC layer gets set up before the - // clusterID is known, early connections won't validate the clusterID (at - // least not until the next Ping). - // - // The check is simple: listen for clusterID changes from Gossip. If we see - // one, make sure it's the clusterID we already know (and are guaranteed to - // know) at this point. If it's not the same, explode. - // - // TODO(irfansharif): The above is no longer applicable; in 21.1 we can - // always assume that the RPC layer will always get set up after having - // found out what the cluster ID is. The checks below can be removed then. - { - // We populated this above, so it should still be set. This is just to - // demonstrate that we're not doing anything functional here (and to - // prevent bugs during further refactors). - if s.rpcContext.ClusterID.Get() == uuid.Nil { - return errors.AssertionFailedf("expected cluster ID to be populated in rpc context") - } - unregister := s.gossip.RegisterCallback(gossip.KeyClusterID, func(string, roachpb.Value) { - clusterID, err := s.gossip.GetClusterID() - if err != nil { - log.Fatalf(ctx, "unable to read ClusterID: %v", err) - } - s.rpcContext.ClusterID.Set(ctx, clusterID) // fatals on mismatch - }) - defer unregister() - } - // Spawn a goroutine that will print a nice message when Gossip connects. // Note that we already know the clusterID, but we don't know that Gossip // has connected. The pertinent case is that of restarting an entire @@ -1535,7 +1477,8 @@ func (s *Server) PreStart(ctx context.Context) error { onSuccessfulReturnFn() // We're going to need to start gossip before we spin up Node below. - startGossipFn() + s.gossip.Start(advAddrU, filtered) + log.Event(ctx, "started gossip") // Now that we have a monotonic HLC wrt previous incarnations of the process, // init all the replicas. At this point *some* store has been initialized or