Skip to content

Commit

Permalink
server: use consistent phrasing around store initialization
Browse files Browse the repository at this point in the history
We interchange between saying we "{bootstrap,initialize}
{stores,engines}". Let's just stick to one, and use it consistently
throughout. Ditto for naming uninitialized stores/engines as "new"
engines. We also clean up a few comments while here.

Release note: None
  • Loading branch information
irfansharif committed Nov 4, 2020
1 parent 8cf1290 commit c738ae0
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 90 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ go_library(
"split_trigger_helper.go",
"storage_services.pb.go",
"store.go",
"store_bootstrap.go",
"store_create_replica.go",
"store_init.go",
"store_merge.go",
"store_pool.go",
"store_raft.go",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ import (
func InitEngine(ctx context.Context, eng storage.Engine, ident roachpb.StoreIdent) error {
exIdent, err := ReadStoreIdent(ctx, eng)
if err == nil {
return errors.Errorf("engine %s is already bootstrapped with ident %s", eng, exIdent.String())
return errors.Errorf("engine %s is already initialized with ident %s", eng, exIdent.String())
}
if !errors.HasType(err, (*NotBootstrappedError)(nil)) {
return err
}

if err := checkCanInitializeEngine(ctx, eng); err != nil {
return errors.Wrap(err, "while trying to initialize store")
return errors.Wrap(err, "while trying to initialize engine")
}

batch := eng.NewBatch()
Expand All @@ -58,13 +58,13 @@ func InitEngine(ctx context.Context, eng storage.Engine, ident roachpb.StoreIden
return err
}
if err := batch.Commit(true /* sync */); err != nil {
return errors.Wrap(err, "persisting bootstrap data")
return errors.Wrap(err, "persisting engine initialization data")
}

return nil
}

// WriteInitialClusterData writes bootstrapping data to an engine. It creates
// WriteInitialClusterData writes initialization data to an engine. It creates
// system ranges (filling in meta1 and meta2) and the default zone config.
//
// Args:
Expand Down
16 changes: 8 additions & 8 deletions pkg/server/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ type initDiskState struct {
// TODO(tbg): see TODO above.
nodeID roachpb.NodeID
// All fields below are always set.
clusterID uuid.UUID
clusterVersion clusterversion.ClusterVersion
initializedEngines []storage.Engine
newEngines []storage.Engine
clusterID uuid.UUID
clusterVersion clusterversion.ClusterVersion
initializedEngines []storage.Engine
uninitializedEngines []storage.Engine
}

// initState contains the cluster and node IDs as well as the stores, from which
Expand All @@ -125,7 +125,7 @@ type initDiskState struct {
// running server will be wholly reconstructed if reloading from disk. It
// could if we always persisted any changes made to it back to disk. Right now
// when initializing after a successful join attempt, we don't persist back the
// disk state back to disk (we'd need to bootstrap the first store here, in the
// disk state back to disk (we'd need to initialize the first store here, in the
// same we do when `cockroach init`-ialized).
type initState struct {
initDiskState
Expand Down Expand Up @@ -205,7 +205,7 @@ func (s *initServer) ServeAndWait(
}
s.mu.Unlock()

log.Info(ctx, "no stores bootstrapped")
log.Info(ctx, "no stores initialized")
log.Info(ctx, "awaiting `cockroach init` or join with an already initialized node")

joinCtx, cancelJoin := context.WithCancel(ctx)
Expand Down Expand Up @@ -549,11 +549,11 @@ func (s *initServer) attemptJoinTo(ctx context.Context, addr string) (*initState
func (s *initServer) tryBootstrapLocked(ctx context.Context) (*initState, error) {
// We use our binary version to bootstrap the cluster.
cv := clusterversion.ClusterVersion{Version: s.config.binaryVersion}
if err := kvserver.WriteClusterVersionToEngines(ctx, s.mu.inspectState.newEngines, cv); err != nil {
if err := kvserver.WriteClusterVersionToEngines(ctx, s.mu.inspectState.uninitializedEngines, cv); err != nil {
return nil, err
}
return bootstrapCluster(
ctx, s.mu.inspectState.newEngines, &s.config.defaultZoneConfig, &s.config.defaultSystemZoneConfig,
ctx, s.mu.inspectState.uninitializedEngines, &s.config.defaultZoneConfig, &s.config.defaultSystemZoneConfig,
)
}

Expand Down
138 changes: 72 additions & 66 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,22 +146,24 @@ func (nm nodeMetrics) callComplete(d time.Duration, pErr *roachpb.Error) {
// stores, which in turn direct the commands to specific ranges. Each
// node has access to the global, monolithic Key-Value abstraction via
// its client.DB reference. Nodes use this to allocate node and store
// IDs for bootstrapping the node itself or new stores as they're added
// on subsequent instantiations.
// IDs for bootstrapping the node itself or initializing new stores as
// they're added on subsequent instantiations.
type Node struct {
stopper *stop.Stopper
clusterID *base.ClusterIDContainer // UUID for Cockroach cluster
Descriptor roachpb.NodeDescriptor // Node ID, network/physical topology
storeCfg kvserver.StoreConfig // Config to use and pass to stores
eventLogger sql.EventLogger
stores *kvserver.Stores // Access to node-local stores
metrics nodeMetrics
recorder *status.MetricsRecorder
startedAt int64
lastUp int64
initialStart bool // True if this is the first time this node has started.
txnMetrics kvcoord.TxnMetrics
bootstrapNewStoresCh chan struct{}
stopper *stop.Stopper
clusterID *base.ClusterIDContainer // UUID for Cockroach cluster
Descriptor roachpb.NodeDescriptor // Node ID, network/physical topology
storeCfg kvserver.StoreConfig // Config to use and pass to stores
eventLogger sql.EventLogger
stores *kvserver.Stores // Access to node-local stores
metrics nodeMetrics
recorder *status.MetricsRecorder
startedAt int64
lastUp int64
initialStart bool // True if this is the first time this node has started.
txnMetrics kvcoord.TxnMetrics

// Used to signal when additional stores, if any, have been initialized.
additionalStoreInitCh chan struct{}

perReplicaServer kvserver.Server
}
Expand Down Expand Up @@ -268,11 +270,11 @@ func bootstrapCluster(

state := &initState{
initDiskState: initDiskState{
nodeID: FirstNodeID,
clusterID: clusterID,
clusterVersion: bootstrapVersion,
initializedEngines: engines,
newEngines: nil,
nodeID: FirstNodeID,
clusterID: clusterID,
clusterVersion: bootstrapVersion,
initializedEngines: engines,
uninitializedEngines: nil,
},
firstStoreID: firstStoreID,
}
Expand Down Expand Up @@ -417,19 +419,15 @@ func (n *Node) start(
// Start the closed timestamp subsystem.
n.storeCfg.ClosedTimestamp.Start(n.Descriptor.NodeID)

// Create stores from the engines that were already bootstrapped.
// Create stores from the engines that were already initialized.
for _, e := range state.initializedEngines {
s := kvserver.NewStore(ctx, n.storeCfg, e, &n.Descriptor)
if err := s.Start(ctx, n.stopper); err != nil {
return errors.Errorf("failed to start store: %s", err)
}
capacity, err := s.Capacity(ctx, false /* useCached */)
if err != nil {
return errors.Errorf("could not query store capacity: %s", err)
}
log.Infof(ctx, "initialized store %s: %+v", s, capacity)

n.addStore(ctx, s)
log.Infof(ctx, "initialized store s%s", s.StoreID())
}

// Verify all initialized stores agree on cluster and node IDs.
Expand Down Expand Up @@ -462,27 +460,32 @@ func (n *Node) start(
return fmt.Errorf("failed to initialize the gossip interface: %s", err)
}

// Bootstrap uninitialized stores, if any.
if len(state.newEngines) > 0 {
// We need to bootstrap additional stores asynchronously. Consider the range that
// houses the store ID allocator. When restarting the set of nodes that holds a
// quorum of these replicas, when restarting them with additional stores, those
// additional stores will require store IDs to get fully bootstrapped. But if we're
// gating node start (specifically opening up the RPC floodgates) on having all
// stores fully bootstrapped, we'll simply hang when trying to allocate store IDs.
// See TestAddNewStoresToExistingNodes and #39415 for more details.
// Initialize remaining stores/engines, if any.
if len(state.uninitializedEngines) > 0 {
// We need to initialize any remaining stores asynchronously.
// Consider the range that houses the store ID allocator. When we
// restart the set of nodes that holds a quorum of these replicas,
// specifically when we restart them with auxiliary stores, these stores
// will require store IDs during initialization[1]. But if we're gating
// node start up (specifically the opening up of RPC floodgates) on
// having all stores in the node fully initialized, we'll simply hang
// when trying to allocate store IDs. See
// TestAddNewStoresToExistingNodes and #39415 for more details.
//
// So instead we opt to initialize additional stores asynchronously, and
// rely on the blocking function n.waitForAdditionalStoreInit() to
// signal to the caller that all stores have been fully initialized.
//
// Instead we opt to bootstrap additional stores asynchronously, and rely on the
// blocking function n.waitForBootstrapNewStores() to signal to the caller that
// all stores have been fully bootstrapped.
n.bootstrapNewStoresCh = make(chan struct{})
if err := n.stopper.RunAsyncTask(ctx, "bootstrap-stores", func(ctx context.Context) {
if err := n.bootstrapStores(ctx, state.firstStoreID, state.newEngines, n.stopper); err != nil {
log.Fatalf(ctx, "while bootstrapping additional stores: %v", err)
// [1]: It's important to note that store IDs are allocated via a
// sequence ID generator stored in a system key.
n.additionalStoreInitCh = make(chan struct{})
if err := n.stopper.RunAsyncTask(ctx, "initialize-additional-stores", func(ctx context.Context) {
if err := n.initializeAdditionalStores(ctx, state.firstStoreID, state.uninitializedEngines, n.stopper); err != nil {
log.Fatalf(ctx, "while initializing additional stores: %v", err)
}
close(n.bootstrapNewStoresCh)
close(n.additionalStoreInitCh)
}); err != nil {
close(n.bootstrapNewStoresCh)
close(n.additionalStoreInitCh)
return err
}
}
Expand All @@ -498,7 +501,7 @@ func (n *Node) start(
n.startGossiping(ctx, n.stopper)

allEngines := append([]storage.Engine(nil), state.initializedEngines...)
allEngines = append(allEngines, state.newEngines...)
allEngines = append(allEngines, state.uninitializedEngines...)
for _, e := range allEngines {
t := e.Type()
log.Infof(ctx, "started with engine type %v", t)
Expand All @@ -507,11 +510,11 @@ func (n *Node) start(
return nil
}

// waitForBootstrapNewStores blocks until all additional empty stores,
// if any, have been bootstrapped.
func (n *Node) waitForBootstrapNewStores() {
if n.bootstrapNewStoresCh != nil {
<-n.bootstrapNewStoresCh
// waitForAdditionalStoreInit blocks until all additional empty stores,
// if any, have been initialized.
func (n *Node) waitForAdditionalStoreInit() {
if n.additionalStoreInitCh != nil {
<-n.additionalStoreInitCh
}
}

Expand Down Expand Up @@ -555,7 +558,7 @@ func (n *Node) addStore(ctx context.Context, store *kvserver.Store) {
}
if cv == (clusterversion.ClusterVersion{}) {
// The store should have had a version written to it during the store
// bootstrap process.
// initialization process.
log.Fatal(ctx, "attempting to add a store without a version")
}
n.stores.AddStore(store)
Expand All @@ -576,26 +579,27 @@ func (n *Node) validateStores(ctx context.Context) error {
})
}

// bootstrapStores bootstraps uninitialized stores once the cluster
// and node IDs have been established for this node. Store IDs are
// allocated via a sequence id generator stored at a system key per
// node. The new stores are added to n.stores.
func (n *Node) bootstrapStores(
// initializeAdditionalStores initializes the given set of engines once the
// cluster and node ID have been established for this node. Store IDs are
// allocated via a sequence id generator stored at a system key per node. The
// new stores are added to n.stores.
func (n *Node) initializeAdditionalStores(
ctx context.Context,
firstStoreID roachpb.StoreID,
emptyEngines []storage.Engine,
engines []storage.Engine,
stopper *stop.Stopper,
) error {
if n.clusterID.Get() == uuid.Nil {
return errors.New("ClusterID missing during store bootstrap of auxiliary store")
return errors.New("missing cluster ID during initialization of additional store")
}

{
// Bootstrap all waiting stores by allocating a new store id for
// each and invoking storage.Bootstrap() to persist it and the cluster
// version and to create stores. The -1 comes from the fact that our
// first store ID has already been pre-allocated for us.
storeIDAlloc := int64(len(emptyEngines)) - 1
// Initialize all waiting stores by allocating a new store id for each
// and invoking kvserver.InitEngine() to persist it. We'll then
// construct a new store out of the initialized engine and attach it to
// ourselves. The -1 comes from the fact that our first store ID has
// already been pre-allocated for us.
storeIDAlloc := int64(len(engines)) - 1
if firstStoreID == 0 {
// We lied, we don't have a firstStoreID; we'll need to allocate for
// that too.
Expand All @@ -616,7 +620,7 @@ func (n *Node) bootstrapStores(
NodeID: n.Descriptor.NodeID,
StoreID: firstStoreID,
}
for _, eng := range emptyEngines {
for _, eng := range engines {
if err := kvserver.InitEngine(ctx, eng, sIdent); err != nil {
return err
}
Expand All @@ -625,8 +629,10 @@ func (n *Node) bootstrapStores(
if err := s.Start(ctx, stopper); err != nil {
return err
}

n.addStore(ctx, s)
log.Infof(ctx, "bootstrapped store %s", s)
log.Infof(ctx, "initialized store s%s", s.StoreID())

// Done regularly in Node.startGossiping, but this cuts down the time
// until this store is used for range allocations.
if err := s.GossipStore(ctx, false /* useCached */); err != nil {
Expand All @@ -637,7 +643,7 @@ func (n *Node) bootstrapStores(
}
}

// write a new status summary after all stores have been bootstrapped; this
// Write a new status summary after all stores have been initialized; this
// helps the UI remain responsive when new nodes are added.
if err := n.writeNodeStatus(ctx, 0 /* alertTTL */); err != nil {
log.Warningf(ctx, "error writing node summary after store bootstrap: %s", err)
Expand Down
23 changes: 12 additions & 11 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ func inspectEngines(
for _, eng := range engines {
storeIdent, err := kvserver.ReadStoreIdent(ctx, eng)
if errors.HasType(err, (*kvserver.NotBootstrappedError)(nil)) {
state.newEngines = append(state.newEngines, eng)
state.uninitializedEngines = append(state.uninitializedEngines, eng)
continue
} else if err != nil {
return nil, err
Expand Down Expand Up @@ -1503,7 +1503,7 @@ func (s *Server) PreStart(ctx context.Context) error {
}
})

// NB: if this store is freshly bootstrapped (or no upper bound was
// NB: if this store is freshly initialized (or no upper bound was
// persisted), hlcUpperBound will be zero.
hlcUpperBound, err := kvserver.ReadMaxHLCUpperBound(ctx, s.engines)
if err != nil {
Expand Down Expand Up @@ -1533,7 +1533,7 @@ func (s *Server) PreStart(ctx context.Context) error {
startGossipFn()

// Now that we have a monotonic HLC wrt previous incarnations of the process,
// init all the replicas. At this point *some* store has been bootstrapped or
// init all the replicas. At this point *some* store has been initialized or
// we're joining an existing cluster for the first time.
advSQLAddrU := util.NewUnresolvedAddr("tcp", s.cfg.SQLAdvertiseAddr)
if err := s.node.start(
Expand Down Expand Up @@ -1612,16 +1612,17 @@ func (s *Server) PreStart(ctx context.Context) error {
})

// After setting modeOperational, we can block until all stores are fully
// bootstrapped.
// initialized.
s.grpc.setMode(modeOperational)

// We'll block here until all stores are fully bootstrapped. We do this here for
// two reasons:
// - some of the components below depend on all stores being fully bootstrapped
// (like the debug server registration for e.g.)
// - we'll need to do it after having opened up the RPC floodgates (due to the
// hazard described in Node.start, around bootstrapping additional stores)
s.node.waitForBootstrapNewStores()
// We'll block here until all stores are fully initialized. We do this here
// for two reasons:
// - some of the components below depend on all stores being fully
// initialized (like the debug server registration for e.g.)
// - we'll need to do it after having opened up the RPC floodgates (due to
// the hazard described in Node.start, around initializing additional
// stores)
s.node.waitForAdditionalStoreInit()

log.Infof(ctx, "starting %s server at %s (use: %s)",
redact.Safe(s.cfg.HTTPRequestScheme()), s.cfg.HTTPAddr, s.cfg.HTTPAdvertiseAddr)
Expand Down

0 comments on commit c738ae0

Please sign in to comment.