diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index f937fd783eb2..e3c16f76712a 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -76,8 +76,8 @@ go_library( "split_trigger_helper.go", "storage_services.pb.go", "store.go", - "store_bootstrap.go", "store_create_replica.go", + "store_init.go", "store_merge.go", "store_pool.go", "store_raft.go", diff --git a/pkg/kv/kvserver/store_bootstrap.go b/pkg/kv/kvserver/store_init.go similarity index 96% rename from pkg/kv/kvserver/store_bootstrap.go rename to pkg/kv/kvserver/store_init.go index 0b82a35a777d..57dbe5feb3ea 100644 --- a/pkg/kv/kvserver/store_bootstrap.go +++ b/pkg/kv/kvserver/store_init.go @@ -34,14 +34,14 @@ import ( func InitEngine(ctx context.Context, eng storage.Engine, ident roachpb.StoreIdent) error { exIdent, err := ReadStoreIdent(ctx, eng) if err == nil { - return errors.Errorf("engine %s is already bootstrapped with ident %s", eng, exIdent.String()) + return errors.Errorf("engine %s is already initialized with ident %s", eng, exIdent.String()) } if !errors.HasType(err, (*NotBootstrappedError)(nil)) { return err } if err := checkCanInitializeEngine(ctx, eng); err != nil { - return errors.Wrap(err, "while trying to initialize store") + return errors.Wrap(err, "while trying to initialize engine") } batch := eng.NewBatch() @@ -58,13 +58,13 @@ func InitEngine(ctx context.Context, eng storage.Engine, ident roachpb.StoreIden return err } if err := batch.Commit(true /* sync */); err != nil { - return errors.Wrap(err, "persisting bootstrap data") + return errors.Wrap(err, "persisting engine initialization data") } return nil } -// WriteInitialClusterData writes bootstrapping data to an engine. It creates +// WriteInitialClusterData writes initialization data to an engine. It creates // system ranges (filling in meta1 and meta2) and the default zone config. // // Args: diff --git a/pkg/server/init.go b/pkg/server/init.go index 11a182785b64..7200c50590a3 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -111,10 +111,10 @@ type initDiskState struct { // TODO(tbg): see TODO above. nodeID roachpb.NodeID // All fields below are always set. - clusterID uuid.UUID - clusterVersion clusterversion.ClusterVersion - initializedEngines []storage.Engine - newEngines []storage.Engine + clusterID uuid.UUID + clusterVersion clusterversion.ClusterVersion + initializedEngines []storage.Engine + uninitializedEngines []storage.Engine } // initState contains the cluster and node IDs as well as the stores, from which @@ -125,7 +125,7 @@ type initDiskState struct { // running server will be wholly reconstructed if reloading from disk. It // could if we always persisted any changes made to it back to disk. Right now // when initializing after a successful join attempt, we don't persist back the -// disk state back to disk (we'd need to bootstrap the first store here, in the +// disk state back to disk (we'd need to initialize the first store here, in the // same we do when `cockroach init`-ialized). type initState struct { initDiskState @@ -205,7 +205,7 @@ func (s *initServer) ServeAndWait( } s.mu.Unlock() - log.Info(ctx, "no stores bootstrapped") + log.Info(ctx, "no stores initialized") log.Info(ctx, "awaiting `cockroach init` or join with an already initialized node") joinCtx, cancelJoin := context.WithCancel(ctx) @@ -549,11 +549,11 @@ func (s *initServer) attemptJoinTo(ctx context.Context, addr string) (*initState func (s *initServer) tryBootstrapLocked(ctx context.Context) (*initState, error) { // We use our binary version to bootstrap the cluster. cv := clusterversion.ClusterVersion{Version: s.config.binaryVersion} - if err := kvserver.WriteClusterVersionToEngines(ctx, s.mu.inspectState.newEngines, cv); err != nil { + if err := kvserver.WriteClusterVersionToEngines(ctx, s.mu.inspectState.uninitializedEngines, cv); err != nil { return nil, err } return bootstrapCluster( - ctx, s.mu.inspectState.newEngines, &s.config.defaultZoneConfig, &s.config.defaultSystemZoneConfig, + ctx, s.mu.inspectState.uninitializedEngines, &s.config.defaultZoneConfig, &s.config.defaultSystemZoneConfig, ) } diff --git a/pkg/server/node.go b/pkg/server/node.go index 16c59b7c2cf3..395a09451a23 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -146,22 +146,24 @@ func (nm nodeMetrics) callComplete(d time.Duration, pErr *roachpb.Error) { // stores, which in turn direct the commands to specific ranges. Each // node has access to the global, monolithic Key-Value abstraction via // its client.DB reference. Nodes use this to allocate node and store -// IDs for bootstrapping the node itself or new stores as they're added -// on subsequent instantiations. +// IDs for bootstrapping the node itself or initializing new stores as +// they're added on subsequent instantiations. type Node struct { - stopper *stop.Stopper - clusterID *base.ClusterIDContainer // UUID for Cockroach cluster - Descriptor roachpb.NodeDescriptor // Node ID, network/physical topology - storeCfg kvserver.StoreConfig // Config to use and pass to stores - eventLogger sql.EventLogger - stores *kvserver.Stores // Access to node-local stores - metrics nodeMetrics - recorder *status.MetricsRecorder - startedAt int64 - lastUp int64 - initialStart bool // True if this is the first time this node has started. - txnMetrics kvcoord.TxnMetrics - bootstrapNewStoresCh chan struct{} + stopper *stop.Stopper + clusterID *base.ClusterIDContainer // UUID for Cockroach cluster + Descriptor roachpb.NodeDescriptor // Node ID, network/physical topology + storeCfg kvserver.StoreConfig // Config to use and pass to stores + eventLogger sql.EventLogger + stores *kvserver.Stores // Access to node-local stores + metrics nodeMetrics + recorder *status.MetricsRecorder + startedAt int64 + lastUp int64 + initialStart bool // True if this is the first time this node has started. + txnMetrics kvcoord.TxnMetrics + + // Used to signal when additional stores, if any, have been initialized. + additionalStoreInitCh chan struct{} perReplicaServer kvserver.Server } @@ -268,11 +270,11 @@ func bootstrapCluster( state := &initState{ initDiskState: initDiskState{ - nodeID: FirstNodeID, - clusterID: clusterID, - clusterVersion: bootstrapVersion, - initializedEngines: engines, - newEngines: nil, + nodeID: FirstNodeID, + clusterID: clusterID, + clusterVersion: bootstrapVersion, + initializedEngines: engines, + uninitializedEngines: nil, }, firstStoreID: firstStoreID, } @@ -417,19 +419,15 @@ func (n *Node) start( // Start the closed timestamp subsystem. n.storeCfg.ClosedTimestamp.Start(n.Descriptor.NodeID) - // Create stores from the engines that were already bootstrapped. + // Create stores from the engines that were already initialized. for _, e := range state.initializedEngines { s := kvserver.NewStore(ctx, n.storeCfg, e, &n.Descriptor) if err := s.Start(ctx, n.stopper); err != nil { return errors.Errorf("failed to start store: %s", err) } - capacity, err := s.Capacity(ctx, false /* useCached */) - if err != nil { - return errors.Errorf("could not query store capacity: %s", err) - } - log.Infof(ctx, "initialized store %s: %+v", s, capacity) n.addStore(ctx, s) + log.Infof(ctx, "initialized store s%s", s.StoreID()) } // Verify all initialized stores agree on cluster and node IDs. @@ -462,27 +460,32 @@ func (n *Node) start( return fmt.Errorf("failed to initialize the gossip interface: %s", err) } - // Bootstrap uninitialized stores, if any. - if len(state.newEngines) > 0 { - // We need to bootstrap additional stores asynchronously. Consider the range that - // houses the store ID allocator. When restarting the set of nodes that holds a - // quorum of these replicas, when restarting them with additional stores, those - // additional stores will require store IDs to get fully bootstrapped. But if we're - // gating node start (specifically opening up the RPC floodgates) on having all - // stores fully bootstrapped, we'll simply hang when trying to allocate store IDs. - // See TestAddNewStoresToExistingNodes and #39415 for more details. + // Initialize remaining stores/engines, if any. + if len(state.uninitializedEngines) > 0 { + // We need to initialize any remaining stores asynchronously. + // Consider the range that houses the store ID allocator. When we + // restart the set of nodes that holds a quorum of these replicas, + // specifically when we restart them with auxiliary stores, these stores + // will require store IDs during initialization[1]. But if we're gating + // node start up (specifically the opening up of RPC floodgates) on + // having all stores in the node fully initialized, we'll simply hang + // when trying to allocate store IDs. See + // TestAddNewStoresToExistingNodes and #39415 for more details. + // + // So instead we opt to initialize additional stores asynchronously, and + // rely on the blocking function n.waitForAdditionalStoreInit() to + // signal to the caller that all stores have been fully initialized. // - // Instead we opt to bootstrap additional stores asynchronously, and rely on the - // blocking function n.waitForBootstrapNewStores() to signal to the caller that - // all stores have been fully bootstrapped. - n.bootstrapNewStoresCh = make(chan struct{}) - if err := n.stopper.RunAsyncTask(ctx, "bootstrap-stores", func(ctx context.Context) { - if err := n.bootstrapStores(ctx, state.firstStoreID, state.newEngines, n.stopper); err != nil { - log.Fatalf(ctx, "while bootstrapping additional stores: %v", err) + // [1]: It's important to note that store IDs are allocated via a + // sequence ID generator stored in a system key. + n.additionalStoreInitCh = make(chan struct{}) + if err := n.stopper.RunAsyncTask(ctx, "initialize-additional-stores", func(ctx context.Context) { + if err := n.initializeAdditionalStores(ctx, state.firstStoreID, state.uninitializedEngines, n.stopper); err != nil { + log.Fatalf(ctx, "while initializing additional stores: %v", err) } - close(n.bootstrapNewStoresCh) + close(n.additionalStoreInitCh) }); err != nil { - close(n.bootstrapNewStoresCh) + close(n.additionalStoreInitCh) return err } } @@ -498,7 +501,7 @@ func (n *Node) start( n.startGossiping(ctx, n.stopper) allEngines := append([]storage.Engine(nil), state.initializedEngines...) - allEngines = append(allEngines, state.newEngines...) + allEngines = append(allEngines, state.uninitializedEngines...) for _, e := range allEngines { t := e.Type() log.Infof(ctx, "started with engine type %v", t) @@ -507,11 +510,11 @@ func (n *Node) start( return nil } -// waitForBootstrapNewStores blocks until all additional empty stores, -// if any, have been bootstrapped. -func (n *Node) waitForBootstrapNewStores() { - if n.bootstrapNewStoresCh != nil { - <-n.bootstrapNewStoresCh +// waitForAdditionalStoreInit blocks until all additional empty stores, +// if any, have been initialized. +func (n *Node) waitForAdditionalStoreInit() { + if n.additionalStoreInitCh != nil { + <-n.additionalStoreInitCh } } @@ -555,7 +558,7 @@ func (n *Node) addStore(ctx context.Context, store *kvserver.Store) { } if cv == (clusterversion.ClusterVersion{}) { // The store should have had a version written to it during the store - // bootstrap process. + // initialization process. log.Fatal(ctx, "attempting to add a store without a version") } n.stores.AddStore(store) @@ -576,26 +579,27 @@ func (n *Node) validateStores(ctx context.Context) error { }) } -// bootstrapStores bootstraps uninitialized stores once the cluster -// and node IDs have been established for this node. Store IDs are -// allocated via a sequence id generator stored at a system key per -// node. The new stores are added to n.stores. -func (n *Node) bootstrapStores( +// initializeAdditionalStores initializes the given set of engines once the +// cluster and node ID have been established for this node. Store IDs are +// allocated via a sequence id generator stored at a system key per node. The +// new stores are added to n.stores. +func (n *Node) initializeAdditionalStores( ctx context.Context, firstStoreID roachpb.StoreID, - emptyEngines []storage.Engine, + engines []storage.Engine, stopper *stop.Stopper, ) error { if n.clusterID.Get() == uuid.Nil { - return errors.New("ClusterID missing during store bootstrap of auxiliary store") + return errors.New("missing cluster ID during initialization of additional store") } { - // Bootstrap all waiting stores by allocating a new store id for - // each and invoking storage.Bootstrap() to persist it and the cluster - // version and to create stores. The -1 comes from the fact that our - // first store ID has already been pre-allocated for us. - storeIDAlloc := int64(len(emptyEngines)) - 1 + // Initialize all waiting stores by allocating a new store id for each + // and invoking kvserver.InitEngine() to persist it. We'll then + // construct a new store out of the initialized engine and attach it to + // ourselves. The -1 comes from the fact that our first store ID has + // already been pre-allocated for us. + storeIDAlloc := int64(len(engines)) - 1 if firstStoreID == 0 { // We lied, we don't have a firstStoreID; we'll need to allocate for // that too. @@ -616,7 +620,7 @@ func (n *Node) bootstrapStores( NodeID: n.Descriptor.NodeID, StoreID: firstStoreID, } - for _, eng := range emptyEngines { + for _, eng := range engines { if err := kvserver.InitEngine(ctx, eng, sIdent); err != nil { return err } @@ -625,8 +629,10 @@ func (n *Node) bootstrapStores( if err := s.Start(ctx, stopper); err != nil { return err } + n.addStore(ctx, s) - log.Infof(ctx, "bootstrapped store %s", s) + log.Infof(ctx, "initialized store s%s", s.StoreID()) + // Done regularly in Node.startGossiping, but this cuts down the time // until this store is used for range allocations. if err := s.GossipStore(ctx, false /* useCached */); err != nil { @@ -637,7 +643,7 @@ func (n *Node) bootstrapStores( } } - // write a new status summary after all stores have been bootstrapped; this + // Write a new status summary after all stores have been initialized; this // helps the UI remain responsive when new nodes are added. if err := n.writeNodeStatus(ctx, 0 /* alertTTL */); err != nil { log.Warningf(ctx, "error writing node summary after store bootstrap: %s", err) diff --git a/pkg/server/server.go b/pkg/server/server.go index 5a1ee9c3de78..ef352811737c 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -755,7 +755,7 @@ func inspectEngines( for _, eng := range engines { storeIdent, err := kvserver.ReadStoreIdent(ctx, eng) if errors.HasType(err, (*kvserver.NotBootstrappedError)(nil)) { - state.newEngines = append(state.newEngines, eng) + state.uninitializedEngines = append(state.uninitializedEngines, eng) continue } else if err != nil { return nil, err @@ -1503,7 +1503,7 @@ func (s *Server) PreStart(ctx context.Context) error { } }) - // NB: if this store is freshly bootstrapped (or no upper bound was + // NB: if this store is freshly initialized (or no upper bound was // persisted), hlcUpperBound will be zero. hlcUpperBound, err := kvserver.ReadMaxHLCUpperBound(ctx, s.engines) if err != nil { @@ -1533,7 +1533,7 @@ func (s *Server) PreStart(ctx context.Context) error { startGossipFn() // Now that we have a monotonic HLC wrt previous incarnations of the process, - // init all the replicas. At this point *some* store has been bootstrapped or + // init all the replicas. At this point *some* store has been initialized or // we're joining an existing cluster for the first time. advSQLAddrU := util.NewUnresolvedAddr("tcp", s.cfg.SQLAdvertiseAddr) if err := s.node.start( @@ -1612,16 +1612,17 @@ func (s *Server) PreStart(ctx context.Context) error { }) // After setting modeOperational, we can block until all stores are fully - // bootstrapped. + // initialized. s.grpc.setMode(modeOperational) - // We'll block here until all stores are fully bootstrapped. We do this here for - // two reasons: - // - some of the components below depend on all stores being fully bootstrapped - // (like the debug server registration for e.g.) - // - we'll need to do it after having opened up the RPC floodgates (due to the - // hazard described in Node.start, around bootstrapping additional stores) - s.node.waitForBootstrapNewStores() + // We'll block here until all stores are fully initialized. We do this here + // for two reasons: + // - some of the components below depend on all stores being fully + // initialized (like the debug server registration for e.g.) + // - we'll need to do it after having opened up the RPC floodgates (due to + // the hazard described in Node.start, around initializing additional + // stores) + s.node.waitForAdditionalStoreInit() log.Infof(ctx, "starting %s server at %s (use: %s)", redact.Safe(s.cfg.HTTPRequestScheme()), s.cfg.HTTPAddr, s.cfg.HTTPAdvertiseAddr)