From ef35ee5ba59852086d65b69f48c726ea8600c99b Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Wed, 4 Nov 2020 16:37:03 -0500 Subject: [PATCH] server: initialize first store within the init server ...when joining an existing cluster. This diff adds some sanity around how we bootstrap stores for nodes when we're informed by the existing cluster what the first store ID should be. Previously we were bootstrapping the first store asynchronously, and that is not what we want. We first observed the implications of doing so in #56263, which attempted to remove the use of gossip in cluster/node ID distribution. There we noticed that our somewhat haphazard structure around initialization of stores could lead to doubly allocating store IDs (#56272). We were inadvertently safeguarded against this, as described in #56271, but this structure is still pretty confusing and needed cleanup. Now the new store initialization structure is the following: ``` - In the init code: - If we're being bootstrapped: - Initialize all the stores - If we're joining an existing cluster: - Only initialize first store, leave remaining stores to start up code later - Later, when initializing additional new stores: - Allocate len(auxiliary engines) store IDs, and initialize them asynchronously. ``` This lets us avoid threading in the first store ID, and always rely on the KV increment operation to tell us what the store ID should be for the first additional store. We update TestAddNewStoresToExistingNodes to test allocation behaviour with more than just two stores. Eventually we could simplify the init code to only initialize the first store when we're bootstrapping (there's a longstanding TODO from Andrei to that effect), but it's not strictly needed. This PR unblocks #56263. Release note: None --- pkg/server/BUILD.bazel | 1 + pkg/server/init.go | 45 +++++++++++++++++++++++++++------- pkg/server/multi_store_test.go | 43 +++++++++++++++++++++++--------- pkg/server/node.go | 27 +++++--------------- 4 files changed, 75 insertions(+), 41 deletions(-) diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index be51b504b44d..ce40bafcce27 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -307,6 +307,7 @@ go_test( "//pkg/util/timeutil", "//pkg/util/uuid", "//vendor/github.com/cockroachdb/errors", + "//vendor/github.com/dustin/go-humanize", "//vendor/github.com/gogo/protobuf/jsonpb", "//vendor/github.com/gogo/protobuf/proto", "//vendor/github.com/grpc-ecosystem/grpc-gateway/runtime", diff --git a/pkg/server/init.go b/pkg/server/init.go index 7200c50590a3..98d4e8fb06d3 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -129,8 +129,6 @@ type initDiskState struct { // same we do when `cockroach init`-ialized). type initState struct { initDiskState - - firstStoreID roachpb.StoreID } // NeedsInit is like needsInitLocked, except it acquires the necessary locks. @@ -531,18 +529,47 @@ func (s *initServer) attemptJoinTo(ctx context.Context, addr string) (*initState return nil, err } + nodeID, storeID := roachpb.NodeID(resp.NodeID), roachpb.StoreID(resp.StoreID) + clusterVersion := clusterversion.ClusterVersion{Version: *resp.ActiveVersion} + s.mu.Lock() + defer s.mu.Unlock() + s.mu.inspectState.clusterID = clusterID - s.mu.inspectState.nodeID = roachpb.NodeID(resp.NodeID) - s.mu.inspectState.clusterVersion = clusterversion.ClusterVersion{Version: *resp.ActiveVersion} - diskState := *s.mu.inspectState - s.mu.Unlock() + s.mu.inspectState.nodeID = nodeID + s.mu.inspectState.clusterVersion = clusterVersion - state := &initState{ - initDiskState: diskState, - firstStoreID: roachpb.StoreID(resp.StoreID), + if len(s.mu.inspectState.uninitializedEngines) < 1 { + log.Fatal(ctx, "expected to find at least one uninitialized engine") } + // We initialize the very first store here, using the store ID handed to us. + sIdent := roachpb.StoreIdent{ + ClusterID: clusterID, + NodeID: nodeID, + StoreID: storeID, + } + + firstEngine := s.mu.inspectState.uninitializedEngines[0] + if err := kvserver.InitEngine(ctx, firstEngine, sIdent); err != nil { + return nil, err + } + + // We construct the appropriate initState to indicate that we've initialized + // the first engine. We similarly trim it off the uninitializedEngines list + // so that when initializing auxiliary stores, if any, we know to avoid + // re-initializing the first store. + initializedEngines := []storage.Engine{firstEngine} + uninitializedEngines := s.mu.inspectState.uninitializedEngines[1:] + state := &initState{ + initDiskState: initDiskState{ + nodeID: nodeID, + clusterID: clusterID, + clusterVersion: clusterVersion, + initializedEngines: initializedEngines, + uninitializedEngines: uninitializedEngines, + }, + } return state, nil } diff --git a/pkg/server/multi_store_test.go b/pkg/server/multi_store_test.go index 6fbaa4a82276..cbaecbde56d4 100644 --- a/pkg/server/multi_store_test.go +++ b/pkg/server/multi_store_test.go @@ -12,16 +12,19 @@ package server_test import ( "context" + "sort" "testing" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" + "github.com/dustin/go-humanize" "github.com/stretchr/testify/require" ) @@ -71,7 +74,7 @@ func TestAddNewStoresToExistingNodes(t *testing.T) { clusterID := tc.Server(0).ClusterID() tc.Stopper().Stop(ctx) - // Add an additional store to each node. + // Add two additional stores to each node. n1s2, n1cleanup2 := testutils.TempDir(t) defer n1cleanup2() n2s2, n2cleanup2 := testutils.TempDir(t) @@ -79,6 +82,13 @@ func TestAddNewStoresToExistingNodes(t *testing.T) { n3s2, n3cleanup2 := testutils.TempDir(t) defer n3cleanup2() + n1s3, n1cleanup3 := testutils.TempDir(t) + defer n1cleanup3() + n2s3, n2cleanup3 := testutils.TempDir(t) + defer n2cleanup3() + n3s3, n3cleanup3 := testutils.TempDir(t) + defer n3cleanup3() + tcArgs = base.TestClusterArgs{ // We need ParallelStart since this is an existing cluster. If // we started sequentially, then the first node would hang forever @@ -89,17 +99,17 @@ func TestAddNewStoresToExistingNodes(t *testing.T) { ServerArgsPerNode: map[int]base.TestServerArgs{ 0: { StoreSpecs: []base.StoreSpec{ - {Path: n1s1}, {Path: n1s2}, + {Path: n1s1}, {Path: n1s2}, {Path: n1s3}, }, }, 1: { StoreSpecs: []base.StoreSpec{ - {Path: n2s1}, {Path: n2s2}, + {Path: n2s1}, {Path: n2s2}, {Path: n2s3}, }, }, 2: { StoreSpecs: []base.StoreSpec{ - {Path: n3s1}, {Path: n3s2}, + {Path: n3s1}, {Path: n3s2}, {Path: n3s3}, }, }, }, @@ -115,23 +125,34 @@ func TestAddNewStoresToExistingNodes(t *testing.T) { require.Equal(t, clusterID, srv.ClusterID()) } - // Ensure all nodes have 2 stores available. + // Ensure all nodes have all stores available, and each store has a unique + // store ID. testutils.SucceedsSoon(t, func() error { + var storeIDs []roachpb.StoreID for _, server := range tc.Servers { var storeCount = 0 - - err := server.GetStores().(*kvserver.Stores).VisitStores( + if err := server.GetStores().(*kvserver.Stores).VisitStores( func(s *kvserver.Store) error { storeCount++ + storeIDs = append(storeIDs, s.StoreID()) return nil }, - ) - if err != nil { + ); err != nil { return errors.Errorf("failed to visit all nodes, got %v", err) } - if storeCount != 2 { - return errors.Errorf("expected two stores to be available on node %v, got %v stores instead", server.NodeID(), storeCount) + if storeCount != 3 { + return errors.Errorf("expected 3 stores to be available on n%s, got %d stores instead", server.NodeID(), storeCount) + } + } + + sort.Slice(storeIDs, func(i, j int) bool { + return storeIDs[i] < storeIDs[j] + }) + for i := range storeIDs { + expStoreID := roachpb.StoreID(i + 1) + if storeIDs[i] != expStoreID { + t.Fatalf("expected the %s store to have storeID s%s, found s%s", humanize.Ordinal(i+1), expStoreID, storeIDs[i]) } } diff --git a/pkg/server/node.go b/pkg/server/node.go index 395a09451a23..fc9a7ead4cf8 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -276,7 +276,6 @@ func bootstrapCluster( initializedEngines: engines, uninitializedEngines: nil, }, - firstStoreID: firstStoreID, } return state, nil } @@ -480,7 +479,7 @@ func (n *Node) start( // sequence ID generator stored in a system key. n.additionalStoreInitCh = make(chan struct{}) if err := n.stopper.RunAsyncTask(ctx, "initialize-additional-stores", func(ctx context.Context) { - if err := n.initializeAdditionalStores(ctx, state.firstStoreID, state.uninitializedEngines, n.stopper); err != nil { + if err := n.initializeAdditionalStores(ctx, state.uninitializedEngines, n.stopper); err != nil { log.Fatalf(ctx, "while initializing additional stores: %v", err) } close(n.additionalStoreInitCh) @@ -584,10 +583,7 @@ func (n *Node) validateStores(ctx context.Context) error { // allocated via a sequence id generator stored at a system key per node. The // new stores are added to n.stores. func (n *Node) initializeAdditionalStores( - ctx context.Context, - firstStoreID roachpb.StoreID, - engines []storage.Engine, - stopper *stop.Stopper, + ctx context.Context, engines []storage.Engine, stopper *stop.Stopper, ) error { if n.clusterID.Get() == uuid.Nil { return errors.New("missing cluster ID during initialization of additional store") @@ -597,28 +593,17 @@ func (n *Node) initializeAdditionalStores( // Initialize all waiting stores by allocating a new store id for each // and invoking kvserver.InitEngine() to persist it. We'll then // construct a new store out of the initialized engine and attach it to - // ourselves. The -1 comes from the fact that our first store ID has - // already been pre-allocated for us. - storeIDAlloc := int64(len(engines)) - 1 - if firstStoreID == 0 { - // We lied, we don't have a firstStoreID; we'll need to allocate for - // that too. - // - // TODO(irfansharif): We get here if we're falling back to - // gossip-based connectivity. This can be removed in 21.1. - storeIDAlloc++ - } + // ourselves. + storeIDAlloc := int64(len(engines)) startID, err := allocateStoreIDs(ctx, n.Descriptor.NodeID, storeIDAlloc, n.storeCfg.DB) - if firstStoreID == 0 { - firstStoreID = startID - } if err != nil { return errors.Errorf("error allocating store ids: %s", err) } + sIdent := roachpb.StoreIdent{ ClusterID: n.clusterID.Get(), NodeID: n.Descriptor.NodeID, - StoreID: firstStoreID, + StoreID: startID, } for _, eng := range engines { if err := kvserver.InitEngine(ctx, eng, sIdent); err != nil {