From fbd447dc18ffe7d6db411de5788fcc7ce01e97da Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Wed, 4 Nov 2020 18:41:27 -0500 Subject: [PATCH] server: simplify init server state handling Previously we had two different "init server state" types floating around: - We had the `initState`, which contained all the required "output" state the init server was tasked with constructing. It captures what our node ID, cluster ID, cluster version, initialized and uninitialized engines were. - We had an `initDiskState` type, which was being used for two orthogonal reasons: - It was used to capture the on-disk state of a node during server restarts. It happened to capture the same details as the init server's "output" state. This was used to simply return early from the init server in the common case where we were restarting a previously bootstrapped node. - It was used to safeguard against double bootstrap of clusters, where on bootstrap/join/gossip connectivity the init server process would mutate the in-memory representation of the "disk state" to indicate that we had new initialized engines, and should thus prevent future bootstrap attempts. Our usage of `initDiskState` was somewhat confusing as a result. It wasn't the case that the in-memory representation of the disk state would be wholly re-constructed on restart. Given the two state types shared all the same fields, one also simply embedded the other, further muddying the distinction between the two. This PR separates out the three usages above into (hopefully) clearer forms. We fold bot `initDiskState` and `initState` into a single type (the latter), and introduce an `inspectedDiskState` field on the init server to represent the state synthesized from first inspecting the engines. Like the name suggests, the field is to be read-only. The safety mechanism for double bootstrap, previously provided by our "disk state" in-memory, is now powered by a mutex-protected boolean instead. We now only ever construct `initState` instances after having inspected disk state. Given that the init server is responsible for generating a fleshed out `initState`, it does so by persisting relevant bits to disk first, and the inspecting it to retrieve a reresentative `initState`. There's no longer a divergence between what's present on disk, and what the latest `initState` indicates. We also clarify how the early return in ServeAndWait functions. It was intended to return early for the common case where we were restarting an already bootstrapped node. This suggests that we should only be looking at the inspected disk state. Previously, because we were consulting the one `initDiskState` field (also mutated during bootstrap), it was possible for us to race with an in-flight bootstrap attempt to the same node and read the initState from the bootstrap attempt, rather than the pre-node-start disk state. This worked "fine" before because by the time we got to ServeAndWait, we were already bootstrapped, and thus had a fleshed out `initState` to return. But still, it's somewhat odd to do it this way given we have `bootstrapCh` below to listen in on bootstrap events. Now it's a bit clearer now what's happening. Our early return will only look at the inspected state to determine if we were a previously bootstrapped node, restarting, and if we're racing with an inflight bootstrap attempt, we'll find out about it by reading off of `bootstrapCh`. Release note: None --- pkg/roachpb/api.go | 19 +++ pkg/server/init.go | 325 ++++++++++++++++++++-------------------- pkg/server/node.go | 18 +-- pkg/server/node_test.go | 23 ++- pkg/server/server.go | 57 +++---- 5 files changed, 233 insertions(+), 209 deletions(-) diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index f65103126ccd..7adfe4e1b532 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -18,6 +18,7 @@ import ( "github.com/aws/aws-sdk-go/aws/credentials" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" ) @@ -1466,3 +1467,21 @@ func (rirr *ResolveIntentRangeRequest) AsLockUpdate() LockUpdate { IgnoredSeqNums: rirr.IgnoredSeqNums, } } + +// CreateStoreIdent creates a store identifier out of the details captured +// within the join node response (the join node RPC is used to allocate a store +// ID for the client's first store). +func (r *JoinNodeResponse) CreateStoreIdent() (StoreIdent, error) { + nodeID, storeID := NodeID(r.NodeID), StoreID(r.StoreID) + clusterID, err := uuid.FromBytes(r.ClusterID) + if err != nil { + return StoreIdent{}, err + } + + sIdent := StoreIdent{ + ClusterID: clusterID, + NodeID: nodeID, + StoreID: storeID, + } + return sIdent, nil +} diff --git a/pkg/server/init.go b/pkg/server/init.go index 98d4e8fb06d3..4a569b8315e1 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -66,86 +66,86 @@ type initServer struct { config initServerCfg mu struct { - // This mutex is grabbed during bootstrap and is used to serialized - // bootstrap attempts (and attempts to read whether or not his node has - // been bootstrapped). It's also used to guard inspectState below. + // This mutex is used to serialize bootstrap attempts. syncutil.Mutex - // The state of the engines. This tells us whether the node is already - // bootstrapped. The goal of the initServer is to stub this out by the time - // ServeAndWait returns. - inspectState *initDiskState + // We use this field to guard against doubly bootstrapping clusters. + bootstrapped bool // If we encounter an unrecognized error during bootstrap, we use this // field to block out future bootstrap attempts. rejectErr error } + // inspectedDiskState captures the relevant bits of the on-disk state needed + // by the init server. It's through this that the init server knows whether + // or not this node needs to be bootstrapped. It does so by checking to see + // if any engines were already initialized. If so, there's nothing left for + // the init server to, it simply returns the inspected disk state in + // ServeAndWait. + // + // Another function the inspected disk state provides is that it relays the + // synthesized cluster version (this binary's minimum supported version if + // there are no initialized engines). This is used as the cluster version if + // we end up connecting to an existing cluster via gossip. + // + // TODO(irfansharif): The above function goes away once we remove the use of + // gossip to join running clusters in 21.1. + inspectedDiskState *initState + // If this CRDB node was `cockroach init`-ialized, the resulting init state // will be passed through to this channel. bootstrapReqCh chan *initState } +// NeedsBootstrap returns true if we haven't already been bootstrapped or +// haven't yet been able to join a running cluster. +func (s *initServer) NeedsBootstrap() bool { + s.mu.Lock() + defer s.mu.Unlock() + + return !s.mu.bootstrapped +} + func newInitServer( - actx log.AmbientContext, inspectState *initDiskState, config initServerCfg, -) (*initServer, error) { - s := &initServer{ - AmbientContext: actx, - bootstrapReqCh: make(chan *initState, 1), - config: config, + actx log.AmbientContext, inspectedDiskState *initState, config initServerCfg, +) *initServer { + initServer := &initServer{ + AmbientContext: actx, + bootstrapReqCh: make(chan *initState, 1), + config: config, + inspectedDiskState: inspectedDiskState, + } + // If we were already bootstrapped, we mark ourselves as such to prevent + // future bootstrap attempts. + if inspectedDiskState.bootstrapped() { + initServer.mu.bootstrapped = true } - s.mu.inspectState = inspectState - return s, nil + return initServer } -// initDiskState contains the part of initState that is read from stable -// storage. +// initState is the entirety of what the init server is tasked with +// constructing. It's a view of our on-disk state, instantiated through +// inspectEngines (and inspectEngines alone). // -// NB: The above is a lie in the case in which we join an existing mixed-version -// cluster. In that case, the state returned from ServeAndWait will have the -// clusterID set from Gossip (and there will be no NodeID). This is holdover -// behavior that can be removed in 21.1, at which point the lie disappears. -type initDiskState struct { - // nodeID is zero if joining an existing cluster. - // - // TODO(tbg): see TODO above. - nodeID roachpb.NodeID - // All fields below are always set. +// The init server is tasked with durably persisting state on-disk when this +// node is bootstrapped, or is able to join an already bootstrapped cluster. +// By state here we mean the cluster ID, node ID, at least one initialized +// engine, etc. After having persisted the relevant state, the init server +// constructs an initState with the details needed to fully start up a CRDB +// server. +type initState struct { + nodeID roachpb.NodeID clusterID uuid.UUID clusterVersion clusterversion.ClusterVersion initializedEngines []storage.Engine uninitializedEngines []storage.Engine } -// initState contains the cluster and node IDs as well as the stores, from which -// a CockroachDB server can be started up after ServeAndWait returns. -// -// TODO(irfansharif): The usage of initState and then initDiskState is a bit -// confusing. It isn't the case today that the version held in memory for a -// running server will be wholly reconstructed if reloading from disk. It -// could if we always persisted any changes made to it back to disk. Right now -// when initializing after a successful join attempt, we don't persist back the -// disk state back to disk (we'd need to initialize the first store here, in the -// same we do when `cockroach init`-ialized). -type initState struct { - initDiskState -} - -// NeedsInit is like needsInitLocked, except it acquires the necessary locks. -func (s *initServer) NeedsInit() bool { - s.mu.Lock() - defer s.mu.Unlock() - - return s.needsInitLocked() -} - -// needsInitLocked returns true if (and only if) none if the engines are -// initialized. In this case, ServeAndWait is blocked until either an -// initialized node is reached via the Join RPC (or Gossip if operating in mixed -// version clusters running v20.1, see ErrJoinUnimplemented), or this node -// itself is bootstrapped. -func (s *initServer) needsInitLocked() bool { - return len(s.mu.inspectState.initializedEngines) == 0 +// bootstrapped is a shorthand to check if there exists at least one initialized +// engine. +func (i *initState) bootstrapped() bool { + return len(i.initializedEngines) > 0 } // joinResult is used to represent the result of a node attempting to join @@ -171,10 +171,10 @@ type joinResult struct { // have all stores initialized by the time ServeAndWait returns. This is because // if this server is already bootstrapped, it might hold a replica of the range // backing the StoreID allocating counter, and letting this server start may be -// necessary to restore quorum to that range. So in general, after this TODO, we -// will always leave this method with at least one store initialized, but not +// necessary to restore quorum to that range. So in general, after this method, +// we will always leave this method with at least one store initialized, but not // necessarily all. This is fine, since initializing additional stores later is -// easy. +// easy (see `initializeAdditionalStores`). // // `initialBoot` is true if this is a new node. This flag should only be used // for logging and reporting. A newly bootstrapped single-node cluster is @@ -193,15 +193,10 @@ func (s *initServer) ServeAndWait( sv *settings.Values, startGossipFn func() *gossip.Gossip, ) (state *initState, initialBoot bool, err error) { - // If already bootstrapped, return early. - s.mu.Lock() - if !s.needsInitLocked() { - diskState := *s.mu.inspectState - s.mu.Unlock() - - return &initState{initDiskState: diskState}, false, nil + // If we're restarting an already bootstrapped node, return early. + if s.inspectedDiskState.bootstrapped() { + return s.inspectedDiskState, false, nil } - s.mu.Unlock() log.Info(ctx, "no stores initialized") log.Info(ctx, "awaiting `cockroach init` or join with an already initialized node") @@ -264,11 +259,6 @@ func (s *initServer) ServeAndWait( log.Infof(ctx, "cluster %s has been created", state.clusterID) log.Infof(ctx, "allocated node ID: n%d (for self)", state.nodeID) - s.mu.Lock() - s.mu.inspectState.clusterID = state.clusterID - s.mu.inspectState.initializedEngines = state.initializedEngines - s.mu.Unlock() - return state, true, nil case result := <-joinCh: // Ensure we're draining out the join attempt. @@ -293,8 +283,10 @@ func (s *initServer) ServeAndWait( } if err != nil { - // We expect the join RPC to blindly retry on all errors - // save for the two above. This should be unreachable code. + // We expect the join RPC to blindly retry on all + // "connection" errors save for the two above. If we're + // here, we failed to initialize our first store after a + // successful join attempt. return nil, false, errors.NewAssertionErrorWithWrappedErrf(err, "unexpected error: %v", err) } } @@ -330,15 +322,22 @@ func (s *initServer) ServeAndWait( return nil, false, err } + state := &initState{ + // NB: We elide the node ID, we don't have one available yet. + clusterID: clusterID, + clusterVersion: s.inspectedDiskState.clusterVersion, + initializedEngines: s.inspectedDiskState.initializedEngines, + uninitializedEngines: s.inspectedDiskState.uninitializedEngines, + } + + // We mark ourselves as bootstrapped to prevent future bootstrap + // attempts. s.mu.Lock() - s.mu.inspectState.clusterID = clusterID - diskState := *s.mu.inspectState + s.mu.bootstrapped = true s.mu.Unlock() - state := &initState{ - initDiskState: diskState, - } log.Infof(ctx, "joined cluster %s through gossip (legacy behavior)", state.clusterID) + return state, true, nil case <-stopper.ShouldQuiesce(): return nil, false, stop.ErrUnavailable @@ -366,7 +365,7 @@ func (s *initServer) Bootstrap( s.mu.Lock() defer s.mu.Unlock() - if !s.needsInitLocked() { + if s.mu.bootstrapped { return nil, ErrClusterInitialized } @@ -374,14 +373,18 @@ func (s *initServer) Bootstrap( return nil, s.mu.rejectErr } - state, err := s.tryBootstrapLocked(ctx) + state, err := s.tryBootstrap(ctx) if err != nil { log.Errorf(ctx, "bootstrap: %v", err) s.mu.rejectErr = errInternalBootstrapError return nil, s.mu.rejectErr } + // We've successfully bootstrapped (we've initialized at least one engine). + // We mark ourselves as bootstrapped to prevent future bootstrap attempts. + s.mu.bootstrapped = true s.bootstrapReqCh <- state + return &serverpb.BootstrapResponse{}, nil } @@ -413,24 +416,34 @@ func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) ( } addr := res.Addr() - state, err := s.attemptJoinTo(ctx, res.Addr()) - if err == nil { - return state, nil - } - + resp, err := s.attemptJoinTo(ctx, res.Addr()) if errors.Is(err, errJoinRPCUnsupported) || errors.Is(err, ErrIncompatibleBinaryVersion) { // Propagate upwards; these are error conditions the caller knows to // expect. return nil, err } + if err != nil { + // Try the next node if unsuccessful. - if IsWaitingForInit(err) { - log.Warningf(ctx, "%s is itself waiting for init, will retry", addr) - } else { - log.Warningf(ctx, "outgoing join rpc to %s unsuccessful: %v", addr, err.Error()) + if IsWaitingForInit(err) { + log.Infof(ctx, "%s is itself waiting for init, will retry", addr) + } else { + log.Warningf(ctx, "outgoing join rpc to %s unsuccessful: %v", addr, err.Error()) + } + continue } - // Try the next node if unsuccessful. + state, err := s.initializeFirstStoreAfterJoin(ctx, resp) + if err != nil { + return nil, err + } + + // We mark ourselves as bootstrapped to prevent future bootstrap attempts. + s.mu.Lock() + s.mu.bootstrapped = true + s.mu.Unlock() + + return state, nil } const joinRPCBackoff = time.Second @@ -445,28 +458,40 @@ func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) ( addr := s.config.resolvers[idx].Addr() select { case <-tickChan: - state, err := s.attemptJoinTo(ctx, addr) - if err == nil { - return state, nil - } - + resp, err := s.attemptJoinTo(ctx, addr) if errors.Is(err, errJoinRPCUnsupported) || errors.Is(err, ErrIncompatibleBinaryVersion) { // Propagate upwards; these are error conditions the caller // knows to expect. return nil, err } + if err != nil { + // Blindly retry for all other errors, logging them for visibility. - // Blindly retry for all other errors, logging them for visibility. + // TODO(irfansharif): If startup logging gets too spammy, we + // could match against connection errors to generate nicer + // logging. See grpcutil.connectionRefusedRe. - // TODO(irfansharif): If startup logging gets too spammy, we - // could match against connection errors to generate nicer - // logging. See grpcutil.connectionRefusedRe. + if IsWaitingForInit(err) { + log.Infof(ctx, "%s is itself waiting for init, will retry", addr) + } else { + log.Warningf(ctx, "outgoing join rpc to %s unsuccessful: %v", addr, err.Error()) + } + continue + } - if IsWaitingForInit(err) { - log.Warningf(ctx, "%s is itself waiting for init, will retry", addr) - } else { - log.Warningf(ctx, "outgoing join rpc to %s unsuccessful: %v", addr, err.Error()) + // We were able to successfully join an existing cluster. We'll now + // initialize our first store, using the store ID handed to us. + state, err := s.initializeFirstStoreAfterJoin(ctx, resp) + if err != nil { + return nil, err } + + // We mark ourselves as bootstrapped to prevent future bootstrap attempts. + s.mu.Lock() + s.mu.bootstrapped = true + s.mu.Unlock() + + return state, nil case <-ctx.Done(): return nil, context.Canceled case <-stopper.ShouldQuiesce(): @@ -475,9 +500,10 @@ func (s *initServer) startJoinLoop(ctx context.Context, stopper *stop.Stopper) ( } } -// attemptJoinTo attempts to join to the node running at the given address. If -// successful, an initState is returned. -func (s *initServer) attemptJoinTo(ctx context.Context, addr string) (*initState, error) { +// attemptJoinTo attempts to join to the node running at the given address. +func (s *initServer) attemptJoinTo( + ctx context.Context, addr string, +) (*roachpb.JoinNodeResponse, error) { conn, err := grpc.DialContext(ctx, addr, s.config.dialOpts...) if err != nil { return nil, err @@ -524,75 +550,52 @@ func (s *initServer) attemptJoinTo(ctx context.Context, addr string) (*initState return nil, err } - clusterID, err := uuid.FromBytes(resp.ClusterID) - if err != nil { + return resp, nil +} + +func (s *initServer) tryBootstrap(ctx context.Context) (*initState, error) { + // We use our binary version to bootstrap the cluster. + cv := clusterversion.ClusterVersion{Version: s.config.binaryVersion} + if err := kvserver.WriteClusterVersionToEngines(ctx, s.inspectedDiskState.uninitializedEngines, cv); err != nil { return nil, err } - nodeID, storeID := roachpb.NodeID(resp.NodeID), roachpb.StoreID(resp.StoreID) - clusterVersion := clusterversion.ClusterVersion{Version: *resp.ActiveVersion} - - s.mu.Lock() - defer s.mu.Unlock() + return bootstrapCluster(ctx, s.inspectedDiskState.uninitializedEngines, s.config) +} - s.mu.inspectState.clusterID = clusterID - s.mu.inspectState.nodeID = nodeID - s.mu.inspectState.clusterVersion = clusterVersion +// DiskClusterVersion returns the cluster version synthesized from disk. This +// is always non-zero since it falls back to the BinaryMinSupportedVersion. +func (s *initServer) DiskClusterVersion() clusterversion.ClusterVersion { + return s.inspectedDiskState.clusterVersion +} - if len(s.mu.inspectState.uninitializedEngines) < 1 { - log.Fatal(ctx, "expected to find at least one uninitialized engine") +// initializeFirstStoreAfterJoin initializes the first store after a successful +// join attempt. It re-constructs the store identifier from the join response +// and persists the appropriate cluster version to disk. After having done so, +// it returns an initState that captures the newly initialized store. +func (s *initServer) initializeFirstStoreAfterJoin( + ctx context.Context, resp *roachpb.JoinNodeResponse, +) (*initState, error) { + firstEngine := s.inspectedDiskState.uninitializedEngines[0] + clusterVersion := clusterversion.ClusterVersion{Version: *resp.ActiveVersion} + if err := kvserver.WriteClusterVersion(ctx, firstEngine, clusterVersion); err != nil { + return nil, err } - // We initialize the very first store here, using the store ID handed to us. - sIdent := roachpb.StoreIdent{ - ClusterID: clusterID, - NodeID: nodeID, - StoreID: storeID, + sIdent, err := resp.CreateStoreIdent() + if err != nil { + return nil, err } - - firstEngine := s.mu.inspectState.uninitializedEngines[0] if err := kvserver.InitEngine(ctx, firstEngine, sIdent); err != nil { return nil, err } - // We construct the appropriate initState to indicate that we've initialized - // the first engine. We similarly trim it off the uninitializedEngines list - // so that when initializing auxiliary stores, if any, we know to avoid - // re-initializing the first store. - initializedEngines := []storage.Engine{firstEngine} - uninitializedEngines := s.mu.inspectState.uninitializedEngines[1:] - state := &initState{ - initDiskState: initDiskState{ - nodeID: nodeID, - clusterID: clusterID, - clusterVersion: clusterVersion, - initializedEngines: initializedEngines, - uninitializedEngines: uninitializedEngines, - }, - } - return state, nil -} - -func (s *initServer) tryBootstrapLocked(ctx context.Context) (*initState, error) { - // We use our binary version to bootstrap the cluster. - cv := clusterversion.ClusterVersion{Version: s.config.binaryVersion} - if err := kvserver.WriteClusterVersionToEngines(ctx, s.mu.inspectState.uninitializedEngines, cv); err != nil { - return nil, err - } - return bootstrapCluster( - ctx, s.mu.inspectState.uninitializedEngines, &s.config.defaultZoneConfig, &s.config.defaultSystemZoneConfig, + return inspectEngines( + ctx, s.inspectedDiskState.uninitializedEngines, + s.config.binaryVersion, s.config.binaryMinSupportedVersion, ) } -// DiskClusterVersion returns the cluster version synthesized from disk. This -// is always non-zero since it falls back to the BinaryMinSupportedVersion. -func (s *initServer) DiskClusterVersion() clusterversion.ClusterVersion { - s.mu.Lock() - defer s.mu.Unlock() - - return s.mu.inspectState.clusterVersion -} - // initServerCfg is a thin wrapper around the server Config object, exposing // only the fields needed by the init server. type initServerCfg struct { diff --git a/pkg/server/node.go b/pkg/server/node.go index fc9a7ead4cf8..1767644d5ce3 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -210,10 +210,7 @@ func GetBootstrapSchema( // written, since epoch-based leases cannot be granted until then. All other // engines are initialized with their StoreIdent. func bootstrapCluster( - ctx context.Context, - engines []storage.Engine, - defaultZoneConfig *zonepb.ZoneConfig, - defaultSystemZoneConfig *zonepb.ZoneConfig, + ctx context.Context, engines []storage.Engine, initCfg initServerCfg, ) (*initState, error) { clusterID := uuid.MakeV4() // TODO(andrei): It'd be cool if this method wouldn't do anything to engines @@ -251,7 +248,7 @@ func bootstrapCluster( // not create the range, just its data. Only do this if this is the // first store. if i == 0 { - schema := GetBootstrapSchema(defaultZoneConfig, defaultSystemZoneConfig) + schema := GetBootstrapSchema(&initCfg.defaultZoneConfig, &initCfg.defaultSystemZoneConfig) initialValues, tableSplits := schema.GetInitialValues() splits := append(config.StaticSplits(), tableSplits...) sort.Slice(splits, func(i, j int) bool { @@ -268,16 +265,7 @@ func bootstrapCluster( } } - state := &initState{ - initDiskState: initDiskState{ - nodeID: FirstNodeID, - clusterID: clusterID, - clusterVersion: bootstrapVersion, - initializedEngines: engines, - uninitializedEngines: nil, - }, - } - return state, nil + return inspectEngines(ctx, engines, initCfg.binaryVersion, initCfg.binaryMinSupportedVersion) } // NewNode returns a new instance of Node. diff --git a/pkg/server/node_test.go b/pkg/server/node_test.go index 5452a2e59447..efbfd806510e 100644 --- a/pkg/server/node_test.go +++ b/pkg/server/node_test.go @@ -62,9 +62,14 @@ func TestBootstrapCluster(t *testing.T) { e := storage.NewDefaultInMem() defer e.Close() require.NoError(t, kvserver.WriteClusterVersion(ctx, e, clusterversion.TestingClusterVersion)) - if _, err := bootstrapCluster( - ctx, []storage.Engine{e}, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(), - ); err != nil { + + initCfg := initServerCfg{ + binaryMinSupportedVersion: clusterversion.TestingBinaryMinSupportedVersion, + binaryVersion: clusterversion.TestingBinaryVersion, + defaultSystemZoneConfig: *zonepb.DefaultZoneConfigRef(), + defaultZoneConfig: *zonepb.DefaultSystemZoneConfigRef(), + } + if _, err := bootstrapCluster(ctx, []storage.Engine{e}, initCfg); err != nil { t.Fatal(err) } @@ -241,11 +246,15 @@ func TestCorruptedClusterID(t *testing.T) { defer e.Close() cv := clusterversion.TestingClusterVersion - require.NoError(t, kvserver.WriteClusterVersion(ctx, e, cv)) - if _, err := bootstrapCluster( - ctx, []storage.Engine{e}, zonepb.DefaultZoneConfigRef(), zonepb.DefaultSystemZoneConfigRef(), - ); err != nil { + + initCfg := initServerCfg{ + binaryMinSupportedVersion: clusterversion.TestingBinaryMinSupportedVersion, + binaryVersion: clusterversion.TestingBinaryVersion, + defaultSystemZoneConfig: *zonepb.DefaultZoneConfigRef(), + defaultZoneConfig: *zonepb.DefaultSystemZoneConfigRef(), + } + if _, err := bootstrapCluster(ctx, []storage.Engine{e}, initCfg); err != nil { t.Fatal(err) } diff --git a/pkg/server/server.go b/pkg/server/server.go index ef352811737c..701e0e4b3504 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -739,50 +739,58 @@ func (l *ListenError) Error() string { return l.cause.Error() } // Unwrap is because ListenError is a wrapper. func (l *ListenError) Unwrap() error { return l.cause } -// inspectEngines goes through engines and populates in initDiskState. It also -// calls SynthesizeClusterVersionFromEngines, which selects and backfills the -// cluster version to all initialized engines. -// -// The initDiskState returned by this method will reflect a zero NodeID if none -// has been assigned yet (i.e. if none of the engines is initialized). +// inspectEngines goes through engines and constructs an initState. The +// initState returned by this method will reflect a zero NodeID if none has +// been assigned yet (i.e. if none of the engines is initialized). See +// commentary on initState for the intended usage of inspectEngines. func inspectEngines( ctx context.Context, engines []storage.Engine, binaryVersion, binaryMinSupportedVersion roachpb.Version, -) (*initDiskState, error) { - state := &initDiskState{} +) (*initState, error) { + var clusterID uuid.UUID + var nodeID roachpb.NodeID + var initializedEngines, uninitializedEngines []storage.Engine for _, eng := range engines { storeIdent, err := kvserver.ReadStoreIdent(ctx, eng) if errors.HasType(err, (*kvserver.NotBootstrappedError)(nil)) { - state.uninitializedEngines = append(state.uninitializedEngines, eng) + uninitializedEngines = append(uninitializedEngines, eng) continue } else if err != nil { return nil, err } - if state.clusterID != uuid.Nil && state.clusterID != storeIdent.ClusterID { - return nil, errors.Errorf("conflicting store ClusterIDs: %s, %s", storeIdent.ClusterID, state.clusterID) + if clusterID != uuid.Nil && clusterID != storeIdent.ClusterID { + return nil, errors.Errorf("conflicting store ClusterIDs: %s, %s", storeIdent.ClusterID, clusterID) } - state.clusterID = storeIdent.ClusterID + clusterID = storeIdent.ClusterID if storeIdent.StoreID == 0 || storeIdent.NodeID == 0 || storeIdent.ClusterID == uuid.Nil { return nil, errors.Errorf("partially initialized store: %+v", storeIdent) } - if state.nodeID != 0 && state.nodeID != storeIdent.NodeID { - return nil, errors.Errorf("conflicting store NodeIDs: %s, %s", storeIdent.NodeID, state.nodeID) + if nodeID != 0 && nodeID != storeIdent.NodeID { + return nil, errors.Errorf("conflicting store NodeIDs: %s, %s", storeIdent.NodeID, nodeID) } - state.nodeID = storeIdent.NodeID + nodeID = storeIdent.NodeID - state.initializedEngines = append(state.initializedEngines, eng) + initializedEngines = append(initializedEngines, eng) } - - cv, err := kvserver.SynthesizeClusterVersionFromEngines(ctx, state.initializedEngines, binaryVersion, binaryMinSupportedVersion) + clusterVersion, err := kvserver.SynthesizeClusterVersionFromEngines( + ctx, initializedEngines, binaryVersion, binaryMinSupportedVersion, + ) if err != nil { return nil, err } - state.clusterVersion = cv + + state := &initState{ + clusterID: clusterID, + nodeID: nodeID, + initializedEngines: initializedEngines, + uninitializedEngines: uninitializedEngines, + clusterVersion: clusterVersion, + } return state, nil } @@ -1118,7 +1126,7 @@ func (s *Server) PreStart(ctx context.Context) error { } initConfig := newInitServerConfig(s.cfg, dialOpts) - inspectState, err := inspectEngines( + inspectedDiskState, err := inspectEngines( ctx, s.engines, s.cfg.Settings.Version.BinaryVersion(), @@ -1128,10 +1136,7 @@ func (s *Server) PreStart(ctx context.Context) error { return err } - initServer, err = newInitServer(s.cfg.AmbientCtx, inspectState, initConfig) - if err != nil { - return err - } + initServer = newInitServer(s.cfg.AmbientCtx, inspectedDiskState, initConfig) } { @@ -1375,7 +1380,7 @@ func (s *Server) PreStart(ctx context.Context) error { // We self bootstrap for when we're configured to do so, which should only // happen during tests and for `cockroach start-single-node`. - selfBootstrap := s.cfg.AutoInitializeCluster && initServer.NeedsInit() + selfBootstrap := s.cfg.AutoInitializeCluster && initServer.NeedsBootstrap() if selfBootstrap { if _, err := initServer.Bootstrap(ctx, &serverpb.BootstrapRequest{}); err != nil { return err @@ -1393,7 +1398,7 @@ func (s *Server) PreStart(ctx context.Context) error { if s.cfg.ReadyFn != nil { readyFn = s.cfg.ReadyFn } - if !initServer.NeedsInit() || selfBootstrap { + if !initServer.NeedsBootstrap() || selfBootstrap { onSuccessfulReturnFn = func() { readyFn(false /* waitForInit */) } onInitServerReady = func() {} } else {