Skip to content

Commit

Permalink
Merge #34478
Browse files Browse the repository at this point in the history
34478: storage: simplify the stores' handling of version updates r=andreimatei a=andreimatei

Don't call SynthesizeClusterVersion() when gossip updates the version.
That function does too much - it reads from all the stores, validates,
writes. Gossip updates don't need all that jazz, so do something
simpler.
Also, that call to SynthesizeClusterVersion() was in my way for some
upcoming bulldozing.

This patch also moves the istallation of the gossip callback that
persists version updates to stores, to make it more easy to reason about
what version is being used during startup.

Release note: None

Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
craig[bot] and andreimatei committed Feb 6, 2019
2 parents 1092e02 + 8cff2c8 commit 5d7e15a
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 17 deletions.
38 changes: 26 additions & 12 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,6 @@ func (n *Node) start(
localityAddress []roachpb.LocalityAddress,
nodeDescriptorCallback func(descriptor roachpb.NodeDescriptor),
) error {
n.storeCfg.Settings.Version.OnChange(n.onClusterVersionChange)
if err := n.storeCfg.Settings.InitializeVersion(cv); err != nil {
return errors.Wrap(err, "while initializing cluster version")
}
Expand Down Expand Up @@ -522,6 +521,15 @@ func (n *Node) start(
}

n.startComputePeriodicMetrics(n.stopper, DefaultMetricsSampleInterval)

// Now that we've created all our stores, install the gossip version update
// handler to write version updates to them.
n.storeCfg.Settings.Version.OnChange(n.onClusterVersionChange)
// Invoke the callback manually once so that we persist the updated value that
// gossip might have already received.
clusterVersion := n.storeCfg.Settings.Version.Version()
n.onClusterVersionChange(clusterVersion)

// Be careful about moving this line above `startStores`; store migrations rely
// on the fact that the cluster version has not been updated via Gossip (we
// have migrations that want to run only if the server starts with a given
Expand Down Expand Up @@ -584,6 +592,15 @@ func (n *Node) initStores(
}

func (n *Node) addStore(store *storage.Store) {
cv, err := store.GetClusterVersion(context.TODO())
if err != nil {
log.Fatal(context.TODO(), err)
}
if cv == (cluster.ClusterVersion{}) {
// The store should have had a version written to it during the store
// bootstrap process.
log.Fatal(context.TODO(), "attempting to add a store without a version")
}
n.stores.AddStore(store)
n.recorder.AddStore(store)
}
Expand All @@ -603,7 +620,7 @@ func (n *Node) validateStores(ctx context.Context) error {
// bootstrapStores bootstraps uninitialized stores once the cluster
// and node IDs have been established for this node. Store IDs are
// allocated via a sequence id generator stored at a system key per
// node.
// node. The new stores are added to n.stores.
func (n *Node) bootstrapStores(
ctx context.Context, emptyEngines []engine.Engine, stopper *stop.Stopper,
) error {
Expand All @@ -615,20 +632,20 @@ func (n *Node) bootstrapStores(
// is joining an existing cluster for the first time, it doesn't have any engines
// set up yet, and cv below will be the MinSupportedVersion. At the same time,
// the Gossip update which notifies us about the real cluster version won't
// persist it to any engines (because none of them are bootstrapped). The correct
// version is likely in Settings.Version.Version(), but what if the callback fires
// too late? In that case that too is the MinSupportedVersion. So we just accept
// that we won't use the correct version here, but post-bootstrapping will invoke
// the callback manually, which will disseminate the correct version to all engines
// that still need it.
// persist it to any engines (because we haven't installed the gossip update
// handler yet and also because none of the stores are bootstrapped). So we
// just accept that we won't use the correct version here, but
// post-bootstrapping will invoke the callback manually, which will
// disseminate the correct version to all engines.
cv, err := n.stores.SynthesizeClusterVersion(ctx)
if err != nil {
return errors.Errorf("error retrieving cluster version for bootstrap: %s", err)
}

{
// Bootstrap all waiting stores by allocating a new store id for
// each and invoking store.Bootstrap() to persist.
// each and invoking storage.Bootstrap() to persist it and the cluster
// version.
inc := int64(len(emptyEngines))
firstID, err := allocateStoreIDs(ctx, n.Descriptor.NodeID, inc, n.storeCfg.DB)
if err != nil {
Expand Down Expand Up @@ -665,9 +682,6 @@ func (n *Node) bootstrapStores(
}
}

clusterVersion := n.storeCfg.Settings.Version.Version()
n.onClusterVersionChange(clusterVersion)

// write a new status summary after all stores have been bootstrapped; this
// helps the UI remain responsive when new nodes are added.
if err := n.writeNodeStatus(ctx, 0 /* alertTTL */); err != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4499,6 +4499,12 @@ func (s *Store) ManuallyEnqueue(
return collect(), "", nil
}

// GetClusterVersion reads the the cluster version from the store-local version
// key. Returns an empty version if the key is not found.
func (s *Store) GetClusterVersion(ctx context.Context) (cluster.ClusterVersion, error) {
return ReadClusterVersion(ctx, s.engine)
}

// WriteClusterVersion writes the given cluster version to the store-local cluster version key.
func WriteClusterVersion(
ctx context.Context, writer engine.ReadWriter, cv cluster.ClusterVersion,
Expand Down
21 changes: 16 additions & 5 deletions pkg/storage/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,8 @@ func SynthesizeClusterVersionFromEngines(
// versions across the stores, returns a version that carries the smallest
// Version.
//
// If there aren't any stores, returns a ClusterVersion with MinSupportedVersion
// and UseVersion set to the minimum supported version and server version of the
// build, respectively.
// If there aren't any stores, returns a ClusterVersion set to the minimum
// supported version of the binary.
func (ls *Stores) SynthesizeClusterVersion(ctx context.Context) (cluster.ClusterVersion, error) {
var engines []engine.Engine
ls.storeMap.Range(func(_ int64, v unsafe.Pointer) bool {
Expand Down Expand Up @@ -447,9 +446,21 @@ func (ls *Stores) OnClusterVersionChange(ctx context.Context, cv cluster.Cluster
// this method that result in clobbering of an update.
ls.mu.Lock()
defer ls.mu.Unlock()
synthCV, err := ls.SynthesizeClusterVersion(ctx)

// We're going to read the cluster version from any engine - all the engines
// are always kept in sync so it doesn't matter which one we read from.
var someEngine engine.Engine
ls.storeMap.Range(func(_ int64, v unsafe.Pointer) bool {
someEngine = (*Store)(v).engine
return false // don't iterate any more
})
if someEngine == nil {
// If we haven't bootstrapped any engines yet, there's nothing for us to do.
return nil
}
synthCV, err := ReadClusterVersion(ctx, someEngine)
if err != nil {
return errors.Wrap(err, "reading persisted cluster version")
return errors.Wrap(err, "error reading persisted cluster version")
}
// If the update downgrades the version, ignore it. Must be a
// reordering (this method is called from multiple goroutines via
Expand Down

0 comments on commit 5d7e15a

Please sign in to comment.