diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index f65103126ccd..7adfe4e1b532 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -18,6 +18,7 @@ import ( "github.com/aws/aws-sdk-go/aws/credentials" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" ) @@ -1466,3 +1467,21 @@ func (rirr *ResolveIntentRangeRequest) AsLockUpdate() LockUpdate { IgnoredSeqNums: rirr.IgnoredSeqNums, } } + +// CreateStoreIdent creates a store identifier out of the details captured +// within the join node response (the join node RPC is used to allocate a store +// ID for the client's first store). +func (r *JoinNodeResponse) CreateStoreIdent() (StoreIdent, error) { + nodeID, storeID := NodeID(r.NodeID), StoreID(r.StoreID) + clusterID, err := uuid.FromBytes(r.ClusterID) + if err != nil { + return StoreIdent{}, err + } + + sIdent := StoreIdent{ + ClusterID: clusterID, + NodeID: nodeID, + StoreID: storeID, + } + return sIdent, nil +} diff --git a/pkg/server/init.go b/pkg/server/init.go index 98d4e8fb06d3..4a569b8315e1 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -66,86 +66,86 @@ type initServer struct { config initServerCfg mu struct { - // This mutex is grabbed during bootstrap and is used to serialized - // bootstrap attempts (and attempts to read whether or not his node has - // been bootstrapped). It's also used to guard inspectState below. + // This mutex is used to serialize bootstrap attempts. syncutil.Mutex - // The state of the engines. This tells us whether the node is already - // bootstrapped. The goal of the initServer is to stub this out by the time - // ServeAndWait returns. - inspectState *initDiskState + // We use this field to guard against doubly bootstrapping clusters. + bootstrapped bool // If we encounter an unrecognized error during bootstrap, we use this // field to block out future bootstrap attempts. rejectErr error } + // inspectedDiskState captures the relevant bits of the on-disk state needed + // by the init server. It's through this that the init server knows whether + // or not this node needs to be bootstrapped. It does so by checking to see + // if any engines were already initialized. If so, there's nothing left for + // the init server to, it simply returns the inspected disk state in + // ServeAndWait. + // + // Another function the inspected disk state provides is that it relays the + // synthesized cluster version (this binary's minimum supported version if + // there are no initialized engines). This is used as the cluster version if + // we end up connecting to an existing cluster via gossip. + // + // TODO(irfansharif): The above function goes away once we remove the use of + // gossip to join running clusters in 21.1. + inspectedDiskState *initState + // If this CRDB node was `cockroach init`-ialized, the resulting init state // will be passed through to this channel. bootstrapReqCh chan *initState } +// NeedsBootstrap returns true if we haven't already been bootstrapped or +// haven't yet been able to join a running cluster. +func (s *initServer) NeedsBootstrap() bool { + s.mu.Lock() + defer s.mu.Unlock() + + return !s.mu.bootstrapped +} + func newInitServer( - actx log.AmbientContext, inspectState *initDiskState, config initServerCfg, -) (*initServer, error) { - s := &initServer{ - AmbientContext: actx, - bootstrapReqCh: make(chan *initState, 1), - config: config, + actx log.AmbientContext, inspectedDiskState *initState, config initServerCfg, +) *initServer { + initServer := &initServer{ + AmbientContext: actx, + bootstrapReqCh: make(chan *initState, 1), + config: config, + inspectedDiskState: inspectedDiskState, + } + // If we were already bootstrapped, we mark ourselves as such to prevent + // future bootstrap attempts. + if inspectedDiskState.bootstrapped() { + initServer.mu.bootstrapped = true } - s.mu.inspectState = inspectState - return s, nil + return initServer } -// initDiskState contains the part of initState that is read from stable -// storage. +// initState is the entirety of what the init server is tasked with +// constructing. It's a view of our on-disk state, instantiated through +// inspectEngines (and inspectEngines alone). // -// NB: The above is a lie in the case in which we join an existing mixed-version -// cluster. In that case, the state returned from ServeAndWait will have the -// clusterID set from Gossip (and there will be no NodeID). This is holdover -// behavior that can be removed in 21.1, at which point the lie disappears. -type initDiskState struct { - // nodeID is zero if joining an existing cluster. - // - // TODO(tbg): see TODO above. - nodeID roachpb.NodeID - // All fields below are always set. +// The init server is tasked with durably persisting state on-disk when this +// node is bootstrapped, or is able to join an already bootstrapped cluster. +// By state here we mean the cluster ID, node ID, at least one initialized +// engine, etc. After having persisted the relevant state, the init server +// constructs an initState with the details needed to fully start up a CRDB +// server. +type initState struct { + nodeID roachpb.NodeID clusterID uuid.UUID clusterVersion clusterversion.ClusterVersion initializedEngines []storage.Engine uninitializedEngines []storage.Engine } -// initState contains the cluster and node IDs as well as the stores, from which -// a CockroachDB server can be started up after ServeAndWait returns. -// -// TODO(irfansharif): The usage of initState and then initDiskState is a bit -// confusing. It isn't the case today that the version held in memory for a -// running server will be wholly reconstructed if reloading from disk. It -// could if we always persisted any changes made to it back to disk. Right now -// when initializing after a successful join attempt, we don't persist back the -// disk state back to disk (we'd need to initialize the first store here, in the -// same we do when `cockroach init`-ialized). -type initState struct { - initDiskState -} - -// NeedsInit is like needsInitLocked, except it acquires the necessary locks. -func (s *initServer) NeedsInit() bool { - s.mu.Lock() - defer s.mu.Unlock() - - return s.needsInitLocked() -} - -// needsInitLocked returns true if (and only if) none if the engines are -// initialized. In this case, ServeAndWait is blocked until either an -// initialized node is reached via the Join RPC (or Gossip if operating in mixed -// version clusters running v20.1, see ErrJoinUnimplemented), or this node -// itself is bootstrapped. -func (s *initServer) needsInitLocked() bool { - return len(s.mu.inspectState.initializedEngines) == 0 +// bootstrapped is a shorthand to check if there exists at least one initialized +// engine. +func (i *initState) bootstrapped() bool { + return len(i.initializedEngines) > 0 } // joinResult is used to represent the result of a node attempting to join @@ -171,10 +171,10 @@ type joinResult struct { // have all stores initialized by the time ServeAndWait returns. This is because // if this server is already bootstrapped, it might hold a replica of the range // backing the StoreID allocating counter, and letting this server start may be -// necessary to restore quorum to that range. So in general, after this TODO, we -// will always leave this method with at least one store initialized, but not +// necessary to restore quorum to that range. So in general, after this method, +// we will always leave this method with at least one store initialized, but not // necessarily all. This is fine, since initializing additional stores later is -// easy. +// easy (see `initializeAdditionalStores`). // // `initialBoot` is true if this is a new node. This flag should only be used // for logging and reporting. A newly bootstrapped single-node cluster is @@ -193,15 +193,10 @@ func (s *initServer) ServeAndWait( sv *settings.Values, startGossipFn func() *gossip.Gossip, ) (state *initState, initialBoot bool, err error) { - // If already bootstrapped, return early. - s.mu.Lock() - if !s.needsInitLocked() { - diskState := *s.mu.inspectState - s.mu.Unlock() - - return &initState{initDiskState: diskState}, false, nil + // If we're restarting an already bootstrapped node, return early. + if s.inspectedDiskState.bootstrapped() { + return s.inspectedDiskState, false, nil } - s.mu.Unlock() log.Info(ctx, "no stores initialized") log.Info(ctx, "awaiting `cockroach init` or join with an already initialized node") @@ -264,11 +259,6 @@ func (s *initServer) ServeAndWait( log.Infof(ctx, "cluster %s has been created", state.clusterID) log.Infof(ctx, "allocated node ID: n%d (for self)", state.nodeID) - s.mu.Lock() - s.mu.inspectState.clusterID = state.clusterID - s.mu.inspectState.initializedEngines = state.initializedEngines - s.mu.Unlock() - return state, true, nil case result := <-joinCh: // Ensure we're draining out the join attempt. @@ -293,8 +283,10 @@ func (s *initServer) ServeAndWait( } if err != nil { - // We expect the join RPC to blindly retry on all errors - // save for the two above. This should be unreachable code. + // We expect the join RPC to blindly retry on all + // "connection" errors save for the two above. If we're + // here, we failed to initialize our first store after a + // successful join attempt. return nil, false, errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error: %v", err) } } @@ -330,15 +322,22 @@ func (s *initServer) ServeAndWait( return nil, false, err } + state := &initState{ + // NB: We elide the node ID, we don't have one available yet. + clusterID: clusterID, + clusterVersion: s.inspectedDiskState.clusterVersion, + initializedEngines: s.inspectedDiskState.initializedEngines, + uninitializedEngines: s.inspectedDiskState.uninitializedEngines, + } + + // We mark ourselves as bootstrapped to prevent future bootstrap + // attempts. s.mu.Lock() - s.mu.inspectState.clusterID = clusterID - diskState := *s.mu.inspectState + s.mu.bootstrapped = true s.mu.Unlock() - state := &initState{ - initDiskState: diskState, - } log.Infof(ctx, "joined cluster %s through gossip (legacy behavior)", state.clusterID) + return state, true, nil case <-stopper.ShouldQuiesce(): return nil, false, stop.ErrUnavailable @@ -366,7 +365,7 @@ func (s *initServer) Bootstrap( s.mu.Lock() defer s.mu.Unlock() - if !s.needsInitLocked() { + if s.mu.bootstrapped { return nil, ErrClusterInitialized } @@ -374,14 +373,18 @@ func (s *initServer) Bootstrap( return nil, s.mu.rejectErr } - state, err := s.tryBootstrapLocked(ctx) + state, err := s.tryBootstrap(ctx) if err != nil { log.Errorf(ctx, "bootstrap: %v", err) s.mu.rejectErr = errInternalBootstrapError return nil, s.mu.rejectErr } + // We've successfully bootstrapped (we've initialized at least one engine). + // We mark ourselves as bootstrapped to prevent future bootstrap attempts. + s.mu.bootstrapped = true s.bootstrapReqCh <- state + return &serverpb.BootstrapResponse{}, nil } @@ -413,24 +416,34 @@ func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) ( } addr := res.Addr() - state, err := s.attemptJoinTo(ctx, res.Addr()) - if err == nil { - return state, nil - } - + resp, err := s.attemptJoinTo(ctx, res.Addr()) if errors.Is(err, errJoinRPCUnsupported) || errors.Is(err, ErrIncompatibleBinaryVersion) { // Propagate upwards; these are error conditions the caller knows to // expect. return nil, err } + if err != nil { + // Try the next node if unsuccessful. - if IsWaitingForInit(err) { - log.Warningf(ctx, "%s is itself waiting for init, will retry", addr) - } else { - log.Warningf(ctx, "outgoing join rpc to %s unsuccessful: %v", addr, err.Error()) + if IsWaitingForInit(err) { + log.Infof(ctx, "%s is itself waiting for init, will retry", addr) + } else { + log.Warningf(ctx, "outgoing join rpc to %s unsuccessful: %v", addr, err.Error()) + } + continue } - // Try the next node if unsuccessful. + state, err := s.initializeFirstStoreAfterJoin(ctx, resp) + if err != nil { + return nil, err + } + + // We mark ourselves as bootstrapped to prevent future bootstrap attempts. + s.mu.Lock() + s.mu.bootstrapped = true + s.mu.Unlock() + + return state, nil } const joinRPCBackoff = time.Second @@ -445,28 +458,40 @@ func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) ( addr := s.config.resolvers[idx].Addr() select { case <-tickChan: - state, err := s.attemptJoinTo(ctx, addr) - if err == nil { - return state, nil - } - + resp, err := s.attemptJoinTo(ctx, addr) if errors.Is(err, errJoinRPCUnsupported) || errors.Is(err, ErrIncompatibleBinaryVersion) { // Propagate upwards; these are error conditions the caller // knows to expect. return nil, err } + if err != nil { + // Blindly retry for all other errors, logging them for visibility. - // Blindly retry for all other errors, logging them for visibility. + // TODO(irfansharif): If startup logging gets too spammy, we + // could match against connection errors to generate nicer + // logging. See grpcutil.connectionRefusedRe. - // TODO(irfansharif): If startup logging gets too spammy, we - // could match against connection errors to generate nicer - // logging. See grpcutil.connectionRefusedRe. + if IsWaitingForInit(err) { + log.Infof(ctx, "%s is itself waiting for init, will retry", addr) + } else { + log.Warningf(ctx, "outgoing join rpc to %s unsuccessful: %v", addr, err.Error()) + } + continue + } - if IsWaitingForInit(err) { - log.Warningf(ctx, "%s is itself waiting for init, will retry", addr) - } else { - log.Warningf(ctx, "outgoing join rpc to %s unsuccessful: %v", addr, err.Error()) + // We were able to successfully join an existing cluster. We'll now + // initialize our first store, using the store ID handed to us. + state, err := s.initializeFirstStoreAfterJoin(ctx, resp) + if err != nil { + return nil, err } + + // We mark ourselves as bootstrapped to prevent future bootstrap attempts. + s.mu.Lock() + s.mu.bootstrapped = true + s.mu.Unlock() + + return state, nil case <-ctx.Done(): return nil, context.Canceled case <-stopper.ShouldQuiesce(): @@ -475,9 +500,10 @@ func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) ( } } -// attemptJoinTo attempts to join to the node running at the given address. If -// successful, an initState is returned. -func (s *initServer) attemptJoinTo(ctx context.Context, addr string) (*initState, error) { +// attemptJoinTo attempts to join to the node running at the given address. +func (s *initServer) attemptJoinTo( + ctx context.Context, addr string, +) (*roachpb.JoinNodeResponse, error) { conn, err := grpc.DialContext(ctx, addr, s.config.dialOpts...) if err != nil { return nil, err @@ -524,75 +550,52 @@ func (s *initServer) attemptJoinTo(ctx context.Context, addr string) (*initState return nil, err } - clusterID, err := uuid.FromBytes(resp.ClusterID) - if err != nil { + return resp, nil +} + +func (s *initServer) tryBootstrap(ctx context.Context) (*initState, error) { + // We use our binary version to bootstrap the cluster. + cv := clusterversion.ClusterVersion{Version: s.config.binaryVersion} + if err := kvserver.WriteClusterVersionToEngines(ctx, s.inspectedDiskState.uninitializedEngines, cv); err != nil { return nil, err } - nodeID, storeID := roachpb.NodeID(resp.NodeID), roachpb.StoreID(resp.StoreID) - clusterVersion := clusterversion.ClusterVersion{Version: *resp.ActiveVersion} - - s.mu.Lock() - defer s.mu.Unlock() + return bootstrapCluster(ctx, s.inspectedDiskState.uninitializedEngines, s.config) +} - s.mu.inspectState.clusterID = clusterID - s.mu.inspectState.nodeID = nodeID - s.mu.inspectState.clusterVersion = clusterVersion +// DiskClusterVersion returns the cluster version synthesized from disk. This +// is always non-zero since it falls back to the BinaryMinSupportedVersion. +func (s *initServer) DiskClusterVersion() clusterversion.ClusterVersion { + return s.inspectedDiskState.clusterVersion +} - if len(s.mu.inspectState.uninitializedEngines) < 1 { - log.Fatal(ctx, "expected to find at least one uninitialized engine") +// initializeFirstStoreAfterJoin initializes the first store after a successful +// join attempt. It re-constructs the store identifier from the join response +// and persists the appropriate cluster version to disk. After having done so, +// it returns an initState that captures the newly initialized store. +func (s *initServer) initializeFirstStoreAfterJoin( + ctx context.Context, resp *roachpb.JoinNodeResponse, +) (*initState, error) { + firstEngine := s.inspectedDiskState.uninitializedEngines[0] + clusterVersion := clusterversion.ClusterVersion{Version: *resp.ActiveVersion} + if err := kvserver.WriteClusterVersion(ctx, firstEngine, clusterVersion); err != nil { + return nil, err } - // We initialize the very first store here, using the store ID handed to us. - sIdent := roachpb.StoreIdent{ - ClusterID: clusterID, - NodeID: nodeID, - StoreID: storeID, + sIdent, err := resp.CreateStoreIdent() + if err != nil { + return nil, err } - - firstEngine := s.mu.inspectState.uninitializedEngines[0] if err := kvserver.InitEngine(ctx, firstEngine, sIdent); err != nil { return nil, err } - // We construct the appropriate initState to indicate that we've initialized - // the first engine. We similarly trim it off the uninitializedEngines list - // so that when initializing auxiliary stores, if any, we know to avoid - // re-initializing the first store. - initializedEngines := []storage.Engine{firstEngine} - uninitializedEngines := s.mu.inspectState.uninitializedEngines[1:] - state := &initState{ - initDiskState: initDiskState{ - nodeID: nodeID, - clusterID: clusterID, - clusterVersion: clusterVersion, - initializedEngines: initializedEngines, - uninitializedEngines: uninitializedEngines, - }, - } - return state, nil -} - -func (s *initServer) tryBootstrapLocked(ctx context.Context) (*initState, error) { - // We use our binary version to bootstrap the cluster. - cv := clusterversion.ClusterVersion{Version: s.config.binaryVersion} - if err := kvserver.WriteClusterVersionToEngines(ctx, s.mu.inspectState.uninitializedEngines, cv); err != nil { - return nil, err - } - return bootstrapCluster( - ctx, s.mu.inspectState.uninitializedEngines, &s.config.defaultZoneConfig, &s.config.defaultSystemZoneConfig, + return inspectEngines( + ctx, s.inspectedDiskState.uninitializedEngines, + s.config.binaryVersion, s.config.binaryMinSupportedVersion, ) } -// DiskClusterVersion returns the cluster version synthesized from disk. This -// is always non-zero since it falls back to the BinaryMinSupportedVersion. -func (s *initServer) DiskClusterVersion() clusterversion.ClusterVersion { - s.mu.Lock() - defer s.mu.Unlock() - - return s.mu.inspectState.clusterVersion -} - // initServerCfg is a thin wrapper around the server Config object, exposing // only the fields needed by the init server. type initServerCfg struct { diff --git a/pkg/server/node.go b/pkg/server/node.go index fc9a7ead4cf8..1767644d5ce3 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -210,10 +210,7 @@ func GetBootstrapSchema( // written, since epoch-based leases cannot be granted until then. All other // engines are initialized with their StoreIdent. func bootstrapCluster( - ctx context.Context, - engines []storage.Engine, - defaultZoneConfig *zonepb.ZoneConfig, - defaultSystemZoneConfig *zonepb.ZoneConfig, + ctx context.Context, engines []storage.Engine, initCfg initServerCfg, ) (*initState, error) { clusterID := uuid.MakeV4() // TODO(andrei): It'd be cool if this method wouldn't do anything to engines @@ -251,7 +248,7 @@ func bootstrapCluster( // not create the range, just its data. Only do this if this is the // first store. if i == 0 { - schema := GetBootstrapSchema(defaultZoneConfig, defaultSystemZoneConfig) + schema := GetBootstrapSchema(&initCfg.defaultZoneConfig, &initCfg.defaultSystemZoneConfig) initialValues, tableSplits := schema.GetInitialValues() splits := append(config.StaticSplits(), tableSplits...) sort.Slice(splits, func(i, j int) bool { @@ -268,16 +265,7 @@ func bootstrapCluster( } } - state := &initState{ - initDiskState: initDiskState{ - nodeID: FirstNodeID, - clusterID: clusterID, - clusterVersion: bootstrapVersion, - initializedEngines: engines, - uninitializedEngines: nil, - }, - } - return state, nil + return inspectEngines(ctx, engines, initCfg.binaryVersion, initCfg.binaryMinSupportedVersion) } // NewNode returns a new instance of Node. diff --git a/pkg/server/node_test.go b/pkg/server/node_test.go index 5452a2e59447..efbfd806510e 100644 --- a/pkg/server/node_test.go +++ b/pkg/server/node_test.go @@ -62,9 +62,14 @@ func TestBootstrapCluster(t *testing.T) { e := storage.NewDefaultInMem() defer e.Close() require.NoError(t, kvserver.WriteClusterVersion(ctx, e, clusterversion.TestingClusterVersion)) - if _, err := bootstrapCluster( - ctx, []storage.Engine{e}, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(), - ); err != nil { + + initCfg := initServerCfg{ + binaryMinSupportedVersion: clusterversion.TestingBinaryMinSupportedVersion, + binaryVersion: clusterversion.TestingBinaryVersion, + defaultSystemZoneConfig: *zonepb.DefaultZoneConfigRef(), + defaultZoneConfig: *zonepb.DefaultSystemZoneConfigRef(), + } + if _, err := bootstrapCluster(ctx, []storage.Engine{e}, initCfg); err != nil { t.Fatal(err) } @@ -241,11 +246,15 @@ func TestCorruptedClusterID(t *testing.T) { defer e.Close() cv := clusterversion.TestingClusterVersion - require.NoError(t, kvserver.WriteClusterVersion(ctx, e, cv)) - if _, err := bootstrapCluster( - ctx, []storage.Engine{e}, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(), - ); err != nil { + + initCfg := initServerCfg{ + binaryMinSupportedVersion: clusterversion.TestingBinaryMinSupportedVersion, + binaryVersion: clusterversion.TestingBinaryVersion, + defaultSystemZoneConfig: *zonepb.DefaultZoneConfigRef(), + defaultZoneConfig: *zonepb.DefaultSystemZoneConfigRef(), + } + if _, err := bootstrapCluster(ctx, []storage.Engine{e}, initCfg); err != nil { t.Fatal(err) } diff --git a/pkg/server/server.go b/pkg/server/server.go index ef352811737c..701e0e4b3504 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -739,50 +739,58 @@ func (l *ListenError) Error() string { return l.cause.Error() } // Unwrap is because ListenError is a wrapper. func (l *ListenError) Unwrap() error { return l.cause } -// inspectEngines goes through engines and populates in initDiskState. It also -// calls SynthesizeClusterVersionFromEngines, which selects and backfills the -// cluster version to all initialized engines. -// -// The initDiskState returned by this method will reflect a zero NodeID if none -// has been assigned yet (i.e. if none of the engines is initialized). +// inspectEngines goes through engines and constructs an initState. The +// initState returned by this method will reflect a zero NodeID if none has +// been assigned yet (i.e. if none of the engines is initialized). See +// commentary on initState for the intended usage of inspectEngines. func inspectEngines( ctx context.Context, engines []storage.Engine, binaryVersion, binaryMinSupportedVersion roachpb.Version, -) (*initDiskState, error) { - state := &initDiskState{} +) (*initState, error) { + var clusterID uuid.UUID + var nodeID roachpb.NodeID + var initializedEngines, uninitializedEngines []storage.Engine for _, eng := range engines { storeIdent, err := kvserver.ReadStoreIdent(ctx, eng) if errors.HasType(err, (*kvserver.NotBootstrappedError)(nil)) { - state.uninitializedEngines = append(state.uninitializedEngines, eng) + uninitializedEngines = append(uninitializedEngines, eng) continue } else if err != nil { return nil, err } - if state.clusterID != uuid.Nil && state.clusterID != storeIdent.ClusterID { - return nil, errors.Errorf("conflicting store ClusterIDs: %s, %s", storeIdent.ClusterID, state.clusterID) + if clusterID != uuid.Nil && clusterID != storeIdent.ClusterID { + return nil, errors.Errorf("conflicting store ClusterIDs: %s, %s", storeIdent.ClusterID, clusterID) } - state.clusterID = storeIdent.ClusterID + clusterID = storeIdent.ClusterID if storeIdent.StoreID == 0 || storeIdent.NodeID == 0 || storeIdent.ClusterID == uuid.Nil { return nil, errors.Errorf("partially initialized store: %+v", storeIdent) } - if state.nodeID != 0 && state.nodeID != storeIdent.NodeID { - return nil, errors.Errorf("conflicting store NodeIDs: %s, %s", storeIdent.NodeID, state.nodeID) + if nodeID != 0 && nodeID != storeIdent.NodeID { + return nil, errors.Errorf("conflicting store NodeIDs: %s, %s", storeIdent.NodeID, nodeID) } - state.nodeID = storeIdent.NodeID + nodeID = storeIdent.NodeID - state.initializedEngines = append(state.initializedEngines, eng) + initializedEngines = append(initializedEngines, eng) } - - cv, err := kvserver.SynthesizeClusterVersionFromEngines(ctx, state.initializedEngines, binaryVersion, binaryMinSupportedVersion) + clusterVersion, err := kvserver.SynthesizeClusterVersionFromEngines( + ctx, initializedEngines, binaryVersion, binaryMinSupportedVersion, + ) if err != nil { return nil, err } - state.clusterVersion = cv + + state := &initState{ + clusterID: clusterID, + nodeID: nodeID, + initializedEngines: initializedEngines, + uninitializedEngines: uninitializedEngines, + clusterVersion: clusterVersion, + } return state, nil } @@ -1118,7 +1126,7 @@ func (s *Server) PreStart(ctx context.Context) error { } initConfig := newInitServerConfig(s.cfg, dialOpts) - inspectState, err := inspectEngines( + inspectedDiskState, err := inspectEngines( ctx, s.engines, s.cfg.Settings.Version.BinaryVersion(), @@ -1128,10 +1136,7 @@ func (s *Server) PreStart(ctx context.Context) error { return err } - initServer, err = newInitServer(s.cfg.AmbientCtx, inspectState, initConfig) - if err != nil { - return err - } + initServer = newInitServer(s.cfg.AmbientCtx, inspectedDiskState, initConfig) } { @@ -1375,7 +1380,7 @@ func (s *Server) PreStart(ctx context.Context) error { // We self bootstrap for when we're configured to do so, which should only // happen during tests and for `cockroach start-single-node`. - selfBootstrap := s.cfg.AutoInitializeCluster && initServer.NeedsInit() + selfBootstrap := s.cfg.AutoInitializeCluster && initServer.NeedsBootstrap() if selfBootstrap { if _, err := initServer.Bootstrap(ctx, &serverpb.BootstrapRequest{}); err != nil { return err @@ -1393,7 +1398,7 @@ func (s *Server) PreStart(ctx context.Context) error { if s.cfg.ReadyFn != nil { readyFn = s.cfg.ReadyFn } - if !initServer.NeedsInit() || selfBootstrap { + if !initServer.NeedsBootstrap() || selfBootstrap { onSuccessfulReturnFn = func() { readyFn(false /* waitForInit */) } onInitServerReady = func() {} } else {