diff --git a/pkg/cmd/roachtest/BUILD.bazel b/pkg/cmd/roachtest/BUILD.bazel index 5833f1f2fabd..84bf2f0be76f 100644 --- a/pkg/cmd/roachtest/BUILD.bazel +++ b/pkg/cmd/roachtest/BUILD.bazel @@ -61,7 +61,6 @@ go_library( "many_splits.go", "mixed_version_decommission.go", "mixed_version_jobs.go", - "mixed_version_join_init.go", "mixed_version_schemachange.go", "multitenant.go", "namespace_upgrade.go", diff --git a/pkg/cmd/roachtest/mixed_version_join_init.go b/pkg/cmd/roachtest/mixed_version_join_init.go deleted file mode 100644 index 722d10f827c4..000000000000 --- a/pkg/cmd/roachtest/mixed_version_join_init.go +++ /dev/null @@ -1,227 +0,0 @@ -// Copyright 2020 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package main - -import ( - "context" - "fmt" - "math/rand" - "time" - - "github.com/cockroachdb/cockroach/pkg/server" - "github.com/cockroachdb/cockroach/pkg/util/uuid" - "github.com/cockroachdb/cockroach/pkg/util/version" - "github.com/cockroachdb/errors" -) - -func registerJoinInitMixed(r *testRegistry) { - numNodes := 4 - r.Add(testSpec{ - Name: "join-init/mixed", - Owner: OwnerKV, - MinVersion: "v20.2.0", - Cluster: makeClusterSpec(numNodes), - Run: func(ctx context.Context, t *test, c *cluster) { - runJoinInitMixed(ctx, t, c, r.buildVersion) - }, - }) -} - -// runJoinInitMixed tests the mechanism used to allocate node IDs and -// disseminate cluster IDs in mixed version clusters. -// -// TODO(irfansharif): This test is only really useful for the 20.1/20.2 -// timeframe where we introduced the Join RPC; we should remove this test at a -// future point. -func runJoinInitMixed(ctx context.Context, t *test, c *cluster, buildVersion version.Version) { - predecessorVersion, err := PredecessorVersion(buildVersion) - if err != nil { - t.Fatal(err) - } - - // An empty string means that the cockroach binary specified by flag - // `cockroach` will be used. - const mainVersion = "" - - // This test starts off with a two node cluster (node{1,2}) running at - // predecessor version. It then rolls over node2 to the current version. It - // then adds node3 to the cluster, which is randomized to be either the - // current version or the predecessor. It is also randomly configured to - // point to one of node1 or node2 (they're running different binary - // versions) in its join flags. - node1, node2, node3, node4 := 1, 2, 3, 4 - - nodeX := 1 + rand.Intn(2) // Either node1 or node2. - versionX := func() string { // Either current or predecessor version. - if rand.Intn(2) == 0 { - return mainVersion - } - return predecessorVersion - }() - t.l.Printf("nodeX = %d; versionX = \"%s\"", nodeX, versionX) - - allNodes := c.All() - u := newVersionUpgradeTest(c, - // We upload both binaries to each node, to be able to vary the binary - // used when issuing `cockroach node` subcommands. - uploadVersion(allNodes, predecessorVersion), - uploadVersion(allNodes, mainVersion), - - // Start everything at predecessor version. - startVersion(c.Range(1, 2), predecessorVersion), - waitForUpgradeStep(c.Range(1, 2)), - preventAutoUpgradeStep(node1), - - checkNodeAndStoreIDs(node1, 1), - checkNodeAndStoreIDs(node2, 2), - - // If we upgrade too soon, we some times run into "last used with - // cockroach version vX-1, is too old for running version vX+1" errors. - // Give it a generous window to persist the right version marker on - // disk. - // - // TODO(irfansharif): Figure out a better way to address this. This is - // applicable to a lot of tests. I'd naively expect `waitForUpgrade` to - // also wait for the on-disk version marker to get bumped. We might need - // to change crdb code to make that happen. - sleepStep(time.Minute), - - // Roll node2 into the new version and check to see that it retains its - // node/cluster ID. - binaryUpgradeStep(c.Node(node2), mainVersion), - checkClusterIDsMatch(node1, node2), - checkNodeAndStoreIDs(node2, 2), - - // Add node3 (running either predecessor version binary or current) to - // the cluster, pointing at nodeX (running either predecessor version - // binary or current). - addNodeStep(c.Node(node3), nodeX, versionX), - checkClusterIDsMatch(nodeX, node3), - checkNodeAndStoreIDs(node3, 3), - - // Roll all nodes forward, and finalize upgrade. - binaryUpgradeStep(c.Range(1, 3), mainVersion), - allowAutoUpgradeStep(node1), - waitForUpgradeStep(c.Range(1, 3)), - - checkNodeAndStoreIDs(node1, 1), - checkNodeAndStoreIDs(node2, 2), - checkNodeAndStoreIDs(node3, 3), - - // TODO(irfansharif): We'd like to add a step like the one below, and - // will only be able to do so once 20.2 is cut. 20.1 code does not make - // use of the Join RPC to join the cluster, so this "gating mechanism" - // does not apply. - // - // Add node4 (running at predecessor version) to the cluster, pointing - // at nodeX (running new version, now with new cluster version active). - // We expect this to fail. - // - // unsuccessfullyAddNodeStep(c.Node(node4), nodeX, predecessorVersion), - - // Add node4 (running at new version) to the cluster, pointing at nodeX. - // (running new version, now with new cluster version active). - addNodeStep(c.Node(node4), nodeX, mainVersion), - checkClusterIDsMatch(node1, node4), - checkNodeAndStoreIDs(node4, 4), - ) - - u.run(ctx, t) -} - -func addNodeStep(nodes nodeListOption, joinNode int, newVersion string) versionStep { - return func(ctx context.Context, t *test, u *versionUpgradeTest) { - c := u.c - args := u.uploadVersion(ctx, t, nodes, newVersion) - - for _, node := range nodes { - t.l.Printf("adding node %d to the cluster\n", node) - joinAddr := c.InternalAddr(ctx, c.Node(joinNode))[0] - c.Start(ctx, t, c.Node(node), args, - startArgs(fmt.Sprintf("-a=--join=%s", joinAddr)), - ) - } - } -} - -func unsuccessfullyAddNodeStep(nodes nodeListOption, joinNode int, newVersion string) versionStep { - return func(ctx context.Context, t *test, u *versionUpgradeTest) { - c := u.c - args := u.uploadVersion(ctx, t, nodes, newVersion) - - for _, node := range nodes { - t.l.Printf("adding node %d to the cluster\n", node) - joinAddr := c.InternalAddr(ctx, c.Node(joinNode))[0] - err := c.StartE(ctx, c.Node(node), args, - // TODO(irfansharif): `roachprod` should be taught to skip - // adding default flags if manually specified via --args/-a. - // Today it includes both versions, which seems silly. - startArgs(fmt.Sprintf("-a=--join=%s", joinAddr)), - ) - if !errors.Is(err, server.ErrIncompatibleBinaryVersion) { - t.Fatalf("expected err: %s, got %v", server.ErrIncompatibleBinaryVersion, err) - } - } - } -} - -var _ = unsuccessfullyAddNodeStep - -func checkClusterIDsMatch(nodeA, nodeB int) versionStep { - return func(ctx context.Context, t *test, u *versionUpgradeTest) { - var clusterIDA, clusterIDB uuid.UUID - { - db := u.conn(ctx, t, nodeA) - if err := db.QueryRow(`select crdb_internal.cluster_id();`).Scan(&clusterIDA); err != nil { - t.Fatal(err) - } - } - { - db := u.conn(ctx, t, nodeB) - if err := db.QueryRow(`select crdb_internal.cluster_id();`).Scan(&clusterIDB); err != nil { - t.Fatal(err) - } - } - - if clusterIDA != clusterIDB { - t.Fatalf("expected to cluster ids %s and %s to match", clusterIDA.String(), clusterIDB.String()) - } - } -} - -func checkNodeAndStoreIDs(from int, exp int) versionStep { - return func(ctx context.Context, t *test, u *versionUpgradeTest) { - db := u.conn(ctx, t, from) - var nodeID, storeID int - if err := db.QueryRow(`SELECT node_id FROM crdb_internal.node_runtime_info LIMIT 1;`).Scan(&nodeID); err != nil { - t.Fatal(err) - } - - if exp != nodeID { - t.Fatalf("expected to find node id %d, found %d", exp, nodeID) - } - - if err := db.QueryRow(`SELECT store_id FROM crdb_internal.kv_store_status WHERE node_id = $1 LIMIT 1;`, nodeID).Scan(&storeID); err != nil { - t.Fatal(err) - } - - if exp != storeID { - t.Fatalf("expected to find store id %d, found %d", exp, storeID) - } - } -} - -func sleepStep(duration time.Duration) versionStep { - return func(ctx context.Context, t *test, u *versionUpgradeTest) { - t.l.Printf("sleeping for %s...", duration.String()) - time.Sleep(duration) - } -} diff --git a/pkg/cmd/roachtest/registry.go b/pkg/cmd/roachtest/registry.go index 543b65b9a6d1..f563af17f001 100644 --- a/pkg/cmd/roachtest/registry.go +++ b/pkg/cmd/roachtest/registry.go @@ -49,7 +49,6 @@ func registerTests(r *testRegistry) { registerInterleaved(r) registerJepsen(r) registerJobsMixedVersions(r) - registerJoinInitMixed(r) registerKV(r) registerKVContention(r) registerKVQuiescenceDead(r) diff --git a/pkg/server/init.go b/pkg/server/init.go index 4a569b8315e1..38e6dd8f20e4 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -18,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/config/zonepb" - "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/gossip/resolver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -39,13 +38,6 @@ import ( // a node that is already part of an initialized cluster. var ErrClusterInitialized = fmt.Errorf("cluster has already been initialized") -// errJoinRPCUnsupported is reported when the Join RPC is run against -// a node that does not know about the join RPC (i.e. it is running 20.1 or -// below). -// -// TODO(irfansharif): Remove this in 21.1. -var errJoinRPCUnsupported = fmt.Errorf("node does not support the Join RPC") - // ErrIncompatibleBinaryVersion is returned when a CRDB node with a binary version X // attempts to join a cluster with an active version that's higher. This is not // allowed. @@ -148,6 +140,18 @@ func (i *initState) bootstrapped() bool { return len(i.initializedEngines) > 0 } +// validate asserts that the init state is a fully fleshed out one (i.e. with a +// non-empty cluster ID and node ID). +func (i *initState) validate() error { + if (i.clusterID == uuid.UUID{}) { + return errors.New("missing cluster ID") + } + if i.nodeID == 0 { + return errors.New("missing node ID") + } + return nil +} + // joinResult is used to represent the result of a node attempting to join // an already bootstrapped cluster. type joinResult struct { @@ -160,10 +164,9 @@ type joinResult struct { // restarting an existing node, this immediately returns. When starting with a // blank slate (i.e. only empty engines), it waits for incoming Bootstrap // request or for a successful outgoing Join RPC, whichever happens earlier. -// See [1]. // // The returned initState reflects a bootstrapped cluster (i.e. it has a cluster -// ID and a node ID for this server). See [2]. +// ID and a node ID for this server). // // This method must be called only once. // @@ -180,18 +183,8 @@ type joinResult struct { // for logging and reporting. A newly bootstrapped single-node cluster is // functionally equivalent to one that restarted; any decisions should be made // on persisted data instead of this flag. -// -// [1]: In mixed version clusters it waits until Gossip connects (but this is -// slated to be removed in 21.1). -// -// [2]: This is not technically true for mixed version clusters where we leave -// the node ID unassigned until later, but this too is part of the deprecated -// init server behavior that is slated for removal in 21.1. func (s *initServer) ServeAndWait( - ctx context.Context, - stopper *stop.Stopper, - sv *settings.Values, - startGossipFn func() *gossip.Gossip, + ctx context.Context, stopper *stop.Stopper, sv *settings.Values, ) (state *initState, initialBoot bool, err error) { // If we're restarting an already bootstrapped node, return early. if s.inspectedDiskState.bootstrapped() { @@ -201,42 +194,46 @@ func (s *initServer) ServeAndWait( log.Info(ctx, "no stores initialized") log.Info(ctx, "awaiting `cockroach init` or join with an already initialized node") - joinCtx, cancelJoin := context.WithCancel(ctx) - defer cancelJoin() - + // If we end up joining a bootstrapped cluster, the resulting init state + // will be passed through this channel. + var joinCh chan joinResult + var cancelJoin = func() {} var wg sync.WaitGroup - wg.Add(1) - // If this CRDB node was able to join a bootstrapped cluster, the resulting - // init state will be passed through to this channel. - joinCh := make(chan joinResult, 1) - if err := stopper.RunTask(joinCtx, "init server: join loop", func(joinCtx context.Context) { - stopper.RunWorker(joinCtx, func(joinCtx context.Context) { - defer wg.Done() - - state, err := s.startJoinLoop(joinCtx, stopper) - joinCh <- joinResult{ - state: state, - err: err, - } - }) - }); err != nil { - return nil, false, err - } - // gossipConnectedCh is used as a place holder for gossip.Connected. We - // don't trigger on gossip connectivity unless we have to, favoring instead - // the join RPC to discover the cluster ID (and node ID). If we're in a - // mixed-version cluster however (with 20.1 nodes), we'll fall back to using - // the legacy gossip connectivity mechanism to discover the cluster ID. - var gossipConnectedCh chan struct{} - var g *gossip.Gossip + if len(s.config.resolvers) == 0 { + // We're pointing to only ourselves or nothing at all, which (likely) + // suggests that we're going to be bootstrapped by the operator. Since + // we're not going to be sending out join RPCs, we don't bother spinning + // up the join loop. + } else { + joinCh = make(chan joinResult, 1) + wg.Add(1) + + var joinCtx context.Context + joinCtx, cancelJoin = context.WithCancel(ctx) + defer cancelJoin() + + err := stopper.RunTask(joinCtx, "init server: join loop", + func(joinCtx context.Context) { + stopper.RunWorker(joinCtx, func(joinCtx context.Context) { + defer wg.Done() + + state, err := s.startJoinLoop(joinCtx, stopper) + joinCh <- joinResult{state: state, err: err} + }) + }) + if err != nil { + return nil, false, err + } + } for { select { case state := <-s.bootstrapReqCh: - // Ensure we're draining out the join attempt. We're not going to - // need it anymore and it had no chance of joining anywhere (since - // we are starting the new cluster and are not serving Join yet). + // Ensure we're draining out the join attempt, if any. We're not + // going to need it anymore and it had no chance of joining + // elsewhere (since we are the ones bootstrapping the new cluster + // and have not started serving Join yet). cancelJoin() wg.Wait() @@ -265,26 +262,13 @@ func (s *initServer) ServeAndWait( wg.Wait() if err := result.err; err != nil { - if errors.Is(err, errJoinRPCUnsupported) { - // We're in a mixed-version cluster, we start gossip and wire up - // the gossip connectivity mechanism to discover the cluster ID. - g = startGossipFn() - gossipConnectedCh = g.Connected - - // Let's nil out joinCh to prevent accidental re-use. - close(joinCh) - joinCh = nil - - continue - } - if errors.Is(err, ErrIncompatibleBinaryVersion) { return nil, false, err } if err != nil { // We expect the join RPC to blindly retry on all - // "connection" errors save for the two above. If we're + // "connection" errors save for one above. If we're // here, we failed to initialize our first store after a // successful join attempt. return nil, false, errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error: %v", err) @@ -301,43 +285,6 @@ func (s *initServer) ServeAndWait( log.Infof(ctx, "joined cluster %s through join rpc", state.clusterID) log.Infof(ctx, "received node ID %d", state.nodeID) - return state, true, nil - case <-gossipConnectedCh: - // Ensure we're draining out the join attempt. - wg.Wait() - - // We're in a mixed-version cluster, so we retain the legacy - // behavior of retrieving the cluster ID and deferring node ID - // allocation (happens in (*Node).start). - // - // TODO(irfansharif): Remove this in 21.1. - - // Gossip connected, that is, we know a ClusterID. Due to the early - // return above, we know that all of our engines are empty, i.e. we - // don't have a NodeID yet (and the cluster version is the minimum we - // support). Commence startup; the Node will realize it's short a - // NodeID and will request one. - clusterID, err := g.GetClusterID() - if err != nil { - return nil, false, err - } - - state := &initState{ - // NB: We elide the node ID, we don't have one available yet. - clusterID: clusterID, - clusterVersion: s.inspectedDiskState.clusterVersion, - initializedEngines: s.inspectedDiskState.initializedEngines, - uninitializedEngines: s.inspectedDiskState.uninitializedEngines, - } - - // We mark ourselves as bootstrapped to prevent future bootstrap - // attempts. - s.mu.Lock() - s.mu.bootstrapped = true - s.mu.Unlock() - - log.Infof(ctx, "joined cluster %s through gossip (legacy behavior)", state.clusterID) - return state, true, nil case <-stopper.ShouldQuiesce(): return nil, false, stop.ErrUnavailable @@ -396,11 +343,7 @@ func (s *initServer) Bootstrap( // running a binary that's too old to join the rest of the cluster. func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) (*initState, error) { if len(s.config.resolvers) == 0 { - // We're pointing to only ourselves, which is probably indicative of a - // node that's going to be bootstrapped by the operator. We could opt to - // not fall back to the gossip based connectivity mechanism, but we do - // it anyway. - return nil, errJoinRPCUnsupported + return nil, errors.AssertionFailedf("expected to find at least one resolver, found none") } // Iterate through all the resolvers at least once to reduce time taken to @@ -417,9 +360,9 @@ func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) ( addr := res.Addr() resp, err := s.attemptJoinTo(ctx, res.Addr()) - if errors.Is(err, errJoinRPCUnsupported) || errors.Is(err, ErrIncompatibleBinaryVersion) { - // Propagate upwards; these are error conditions the caller knows to - // expect. + if errors.Is(err, ErrIncompatibleBinaryVersion) { + // Propagate upwards; this is an error condition the caller knows + // to expect. return nil, err } if err != nil { @@ -459,8 +402,8 @@ func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) ( select { case <-tickChan: resp, err := s.attemptJoinTo(ctx, addr) - if errors.Is(err, errJoinRPCUnsupported) || errors.Is(err, ErrIncompatibleBinaryVersion) { - // Propagate upwards; these are error conditions the caller + if errors.Is(err, ErrIncompatibleBinaryVersion) { + // Propagate upwards; this is an error condition the caller // knows to expect. return nil, err } @@ -537,11 +480,6 @@ func (s *initServer) attemptJoinTo( // The caller code, as written, switches on the error type; that'll need // to be changed as well. - if status.Code() == codes.Unimplemented { - log.Infof(ctx, "%s running an older version; falling back to gossip-based cluster join", addr) - return nil, errJoinRPCUnsupported - } - if status.Code() == codes.PermissionDenied { log.Infof(ctx, "%s is running a version higher than our binary version %s", addr, req.BinaryVersion.String()) return nil, ErrIncompatibleBinaryVersion diff --git a/pkg/server/node.go b/pkg/server/node.go index 1767644d5ce3..62b0bd8ee361 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -338,48 +338,10 @@ func (n *Node) start( localityAddress []roachpb.LocalityAddress, nodeDescriptorCallback func(descriptor roachpb.NodeDescriptor), ) error { - // Obtaining the NodeID requires a dance of sorts. If the node has initialized - // stores, the NodeID is persisted in each of them. If not, then we'll need to - // use the KV store to get a NodeID assigned. n.initialStart = initialStart - nodeID := state.nodeID - if nodeID == 0 { - // TODO(irfansharif): This codepath exists to maintain the legacy - // behavior of node ID allocation that was triggered on gossip - // connectivity. This was replaced by the Join RPC in 20.2, and can be - // removed in 21.1. - if !initialStart { - log.Fatalf(ctx, "node has no NodeID, but claims to not be joining cluster") - } - // Allocate NodeID. Note that Gossip is already connected because if there's - // no NodeID yet, this means that we had to connect Gossip to learn the ClusterID. - select { - case <-n.storeCfg.Gossip.Connected: - default: - log.Fatalf(ctx, "gossip is not connected yet") - } - ctxWithSpan, span := n.AnnotateCtxWithSpan(ctx, "alloc-node-id") - newID, err := allocateNodeID(ctxWithSpan, n.storeCfg.DB) - if err != nil { - return err - } - log.Infof(ctxWithSpan, "new node allocated ID %d", newID) - span.Finish() - nodeID = newID - - // We're joining via gossip, so we don't have a liveness record for - // ourselves yet. Let's create one while here. - if err := n.storeCfg.NodeLiveness.CreateLivenessRecord(ctx, nodeID); err != nil { - return err - } - } - - // Inform the RPC context of the node ID. - n.storeCfg.RPCContext.NodeID.Set(ctx, nodeID) - n.startedAt = n.storeCfg.Clock.Now().WallTime n.Descriptor = roachpb.NodeDescriptor{ - NodeID: nodeID, + NodeID: state.nodeID, Address: util.MakeUnresolvedAddr(addr.Network(), addr.String()), SQLAddress: util.MakeUnresolvedAddr(sqlAddr.Network(), sqlAddr.String()), Attrs: attrs, diff --git a/pkg/server/server.go b/pkg/server/server.go index 701e0e4b3504..953dbb00e3a2 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1350,30 +1350,6 @@ func (s *Server) PreStart(ctx context.Context) error { // provided. advAddrU := util.NewUnresolvedAddr("tcp", s.cfg.AdvertiseAddr) - // As of 21.1, we will no longer need gossip to start before the init - // server. We need it in 20.2 for backwards compatibility with 20.1 servers - // that use gossip connectivity to distribute the cluster ID. In 20.2 we - // introduced a dedicated Join RPC to do exactly this, and so we can defer - // gossip start to after bootstrap/initialization. - // - // In order to defer starting gossip until absolutely needed, we wrap up - // gossip start in an idempotent function that's provided to the init - // server. It'll get invoked if we detect we're in a mixed-version cluster. - // If we're starting off at 20.2, we'll start gossip later. - // - // TODO(irfansharif): Remove this callback in 21.1. - var startGossipFn func() *gossip.Gossip - { - var once sync.Once - startGossipFn = func() *gossip.Gossip { - once.Do(func() { - s.gossip.Start(advAddrU, filtered) - log.Event(ctx, "started gossip") - }) - return s.gossip - } - } - if s.cfg.DelayedBootstrapFn != nil { defer time.AfterFunc(30*time.Second, s.cfg.DelayedBootstrapFn).Stop() } @@ -1413,20 +1389,16 @@ func (s *Server) PreStart(ctx context.Context) error { // incoming connections. startRPCServer(workersCtx) onInitServerReady() - state, initialStart, err := initServer.ServeAndWait(ctx, s.stopper, &s.cfg.Settings.SV, startGossipFn) + state, initialStart, err := initServer.ServeAndWait(ctx, s.stopper, &s.cfg.Settings.SV) if err != nil { return errors.Wrap(err, "during init") } + if err := state.validate(); err != nil { + return errors.Wrap(err, "invalid init state") + } s.rpcContext.ClusterID.Set(ctx, state.clusterID) - // If there's no NodeID here, then we didn't just bootstrap. The Node will - // read its ID from the stores or request a new one via KV. - // - // TODO(irfansharif): Make this unconditional once 20.2 is cut. This only - // exists to be compatible with 20.1 clusters. - if state.nodeID != 0 { - s.rpcContext.NodeID.Set(ctx, state.nodeID) - } + s.rpcContext.NodeID.Set(ctx, state.nodeID) // TODO(irfansharif): Now that we have our node ID, we should run another // check here to make sure we've not been decommissioned away (if we're here @@ -1462,36 +1434,6 @@ func (s *Server) PreStart(ctx context.Context) error { // initState -- and everything after it is actually starting the server, // using the listeners and init state. - // Defense in depth: set up an eager sanity check that we're not - // accidentally being pointed at a different cluster. We have checks for - // this in the RPC layer, but since the RPC layer gets set up before the - // clusterID is known, early connections won't validate the clusterID (at - // least not until the next Ping). - // - // The check is simple: listen for clusterID changes from Gossip. If we see - // one, make sure it's the clusterID we already know (and are guaranteed to - // know) at this point. If it's not the same, explode. - // - // TODO(irfansharif): The above is no longer applicable; in 21.1 we can - // always assume that the RPC layer will always get set up after having - // found out what the cluster ID is. The checks below can be removed then. - { - // We populated this above, so it should still be set. This is just to - // demonstrate that we're not doing anything functional here (and to - // prevent bugs during further refactors). - if s.rpcContext.ClusterID.Get() == uuid.Nil { - return errors.AssertionFailedf("expected cluster ID to be populated in rpc context") - } - unregister := s.gossip.RegisterCallback(gossip.KeyClusterID, func(string, roachpb.Value) { - clusterID, err := s.gossip.GetClusterID() - if err != nil { - log.Fatalf(ctx, "unable to read ClusterID: %v", err) - } - s.rpcContext.ClusterID.Set(ctx, clusterID) // fatals on mismatch - }) - defer unregister() - } - // Spawn a goroutine that will print a nice message when Gossip connects. // Note that we already know the clusterID, but we don't know that Gossip // has connected. The pertinent case is that of restarting an entire @@ -1535,7 +1477,8 @@ func (s *Server) PreStart(ctx context.Context) error { onSuccessfulReturnFn() // We're going to need to start gossip before we spin up Node below. - startGossipFn() + s.gossip.Start(advAddrU, filtered) + log.Event(ctx, "started gossip") // Now that we have a monotonic HLC wrt previous incarnations of the process, // init all the replicas. At this point *some* store has been initialized or