diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 81984380950f..78d647ec362e 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -308,6 +308,7 @@ go_test( "//pkg/util/timeutil", "//pkg/util/uuid", "//vendor/github.com/cockroachdb/errors", + "//vendor/github.com/dustin/go-humanize", "//vendor/github.com/gogo/protobuf/jsonpb", "//vendor/github.com/gogo/protobuf/proto", "//vendor/github.com/grpc-ecosystem/grpc-gateway/runtime", diff --git a/pkg/server/init.go b/pkg/server/init.go index 7200c50590a3..98d4e8fb06d3 100644 --- a/pkg/server/init.go +++ b/pkg/server/init.go @@ -129,8 +129,6 @@ type initDiskState struct { // same we do when `cockroach init`-ialized). type initState struct { initDiskState - - firstStoreID roachpb.StoreID } // NeedsInit is like needsInitLocked, except it acquires the necessary locks. @@ -531,18 +529,47 @@ func (s *initServer) attemptJoinTo(ctx context.Context, addr string) (*initState return nil, err } + nodeID, storeID := roachpb.NodeID(resp.NodeID), roachpb.StoreID(resp.StoreID) + clusterVersion := clusterversion.ClusterVersion{Version: *resp.ActiveVersion} + s.mu.Lock() + defer s.mu.Unlock() + s.mu.inspectState.clusterID = clusterID - s.mu.inspectState.nodeID = roachpb.NodeID(resp.NodeID) - s.mu.inspectState.clusterVersion = clusterversion.ClusterVersion{Version: *resp.ActiveVersion} - diskState := *s.mu.inspectState - s.mu.Unlock() + s.mu.inspectState.nodeID = nodeID + s.mu.inspectState.clusterVersion = clusterVersion - state := &initState{ - initDiskState: diskState, - firstStoreID: roachpb.StoreID(resp.StoreID), + if len(s.mu.inspectState.uninitializedEngines) < 1 { + log.Fatal(ctx, "expected to find at least one uninitialized engine") } + // We initialize the very first store here, using the store ID handed to us. + sIdent := roachpb.StoreIdent{ + ClusterID: clusterID, + NodeID: nodeID, + StoreID: storeID, + } + + firstEngine := s.mu.inspectState.uninitializedEngines[0] + if err := kvserver.InitEngine(ctx, firstEngine, sIdent); err != nil { + return nil, err + } + + // We construct the appropriate initState to indicate that we've initialized + // the first engine. We similarly trim it off the uninitializedEngines list + // so that when initializing auxiliary stores, if any, we know to avoid + // re-initializing the first store. + initializedEngines := []storage.Engine{firstEngine} + uninitializedEngines := s.mu.inspectState.uninitializedEngines[1:] + state := &initState{ + initDiskState: initDiskState{ + nodeID: nodeID, + clusterID: clusterID, + clusterVersion: clusterVersion, + initializedEngines: initializedEngines, + uninitializedEngines: uninitializedEngines, + }, + } return state, nil } diff --git a/pkg/server/multi_store_test.go b/pkg/server/multi_store_test.go index 6fbaa4a82276..cbaecbde56d4 100644 --- a/pkg/server/multi_store_test.go +++ b/pkg/server/multi_store_test.go @@ -12,16 +12,19 @@ package server_test import ( "context" + "sort" "testing" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" + "github.com/dustin/go-humanize" "github.com/stretchr/testify/require" ) @@ -71,7 +74,7 @@ func TestAddNewStoresToExistingNodes(t *testing.T) { clusterID := tc.Server(0).ClusterID() tc.Stopper().Stop(ctx) - // Add an additional store to each node. + // Add two additional stores to each node. n1s2, n1cleanup2 := testutils.TempDir(t) defer n1cleanup2() n2s2, n2cleanup2 := testutils.TempDir(t) @@ -79,6 +82,13 @@ func TestAddNewStoresToExistingNodes(t *testing.T) { n3s2, n3cleanup2 := testutils.TempDir(t) defer n3cleanup2() + n1s3, n1cleanup3 := testutils.TempDir(t) + defer n1cleanup3() + n2s3, n2cleanup3 := testutils.TempDir(t) + defer n2cleanup3() + n3s3, n3cleanup3 := testutils.TempDir(t) + defer n3cleanup3() + tcArgs = base.TestClusterArgs{ // We need ParallelStart since this is an existing cluster. If // we started sequentially, then the first node would hang forever @@ -89,17 +99,17 @@ func TestAddNewStoresToExistingNodes(t *testing.T) { ServerArgsPerNode: map[int]base.TestServerArgs{ 0: { StoreSpecs: []base.StoreSpec{ - {Path: n1s1}, {Path: n1s2}, + {Path: n1s1}, {Path: n1s2}, {Path: n1s3}, }, }, 1: { StoreSpecs: []base.StoreSpec{ - {Path: n2s1}, {Path: n2s2}, + {Path: n2s1}, {Path: n2s2}, {Path: n2s3}, }, }, 2: { StoreSpecs: []base.StoreSpec{ - {Path: n3s1}, {Path: n3s2}, + {Path: n3s1}, {Path: n3s2}, {Path: n3s3}, }, }, }, @@ -115,23 +125,34 @@ func TestAddNewStoresToExistingNodes(t *testing.T) { require.Equal(t, clusterID, srv.ClusterID()) } - // Ensure all nodes have 2 stores available. + // Ensure all nodes have all stores available, and each store has a unique + // store ID. testutils.SucceedsSoon(t, func() error { + var storeIDs []roachpb.StoreID for _, server := range tc.Servers { var storeCount = 0 - - err := server.GetStores().(*kvserver.Stores).VisitStores( + if err := server.GetStores().(*kvserver.Stores).VisitStores( func(s *kvserver.Store) error { storeCount++ + storeIDs = append(storeIDs, s.StoreID()) return nil }, - ) - if err != nil { + ); err != nil { return errors.Errorf("failed to visit all nodes, got %v", err) } - if storeCount != 2 { - return errors.Errorf("expected two stores to be available on node %v, got %v stores instead", server.NodeID(), storeCount) + if storeCount != 3 { + return errors.Errorf("expected 3 stores to be available on n%s, got %d stores instead", server.NodeID(), storeCount) + } + } + + sort.Slice(storeIDs, func(i, j int) bool { + return storeIDs[i] < storeIDs[j] + }) + for i := range storeIDs { + expStoreID := roachpb.StoreID(i + 1) + if storeIDs[i] != expStoreID { + t.Fatalf("expected the %s store to have storeID s%s, found s%s", humanize.Ordinal(i+1), expStoreID, storeIDs[i]) } } diff --git a/pkg/server/node.go b/pkg/server/node.go index 395a09451a23..fc9a7ead4cf8 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -276,7 +276,6 @@ func bootstrapCluster( initializedEngines: engines, uninitializedEngines: nil, }, - firstStoreID: firstStoreID, } return state, nil } @@ -480,7 +479,7 @@ func (n *Node) start( // sequence ID generator stored in a system key. n.additionalStoreInitCh = make(chan struct{}) if err := n.stopper.RunAsyncTask(ctx, "initialize-additional-stores", func(ctx context.Context) { - if err := n.initializeAdditionalStores(ctx, state.firstStoreID, state.uninitializedEngines, n.stopper); err != nil { + if err := n.initializeAdditionalStores(ctx, state.uninitializedEngines, n.stopper); err != nil { log.Fatalf(ctx, "while initializing additional stores: %v", err) } close(n.additionalStoreInitCh) @@ -584,10 +583,7 @@ func (n *Node) validateStores(ctx context.Context) error { // allocated via a sequence id generator stored at a system key per node. The // new stores are added to n.stores. func (n *Node) initializeAdditionalStores( - ctx context.Context, - firstStoreID roachpb.StoreID, - engines []storage.Engine, - stopper *stop.Stopper, + ctx context.Context, engines []storage.Engine, stopper *stop.Stopper, ) error { if n.clusterID.Get() == uuid.Nil { return errors.New("missing cluster ID during initialization of additional store") @@ -597,28 +593,17 @@ func (n *Node) initializeAdditionalStores( // Initialize all waiting stores by allocating a new store id for each // and invoking kvserver.InitEngine() to persist it. We'll then // construct a new store out of the initialized engine and attach it to - // ourselves. The -1 comes from the fact that our first store ID has - // already been pre-allocated for us. - storeIDAlloc := int64(len(engines)) - 1 - if firstStoreID == 0 { - // We lied, we don't have a firstStoreID; we'll need to allocate for - // that too. - // - // TODO(irfansharif): We get here if we're falling back to - // gossip-based connectivity. This can be removed in 21.1. - storeIDAlloc++ - } + // ourselves. + storeIDAlloc := int64(len(engines)) startID, err := allocateStoreIDs(ctx, n.Descriptor.NodeID, storeIDAlloc, n.storeCfg.DB) - if firstStoreID == 0 { - firstStoreID = startID - } if err != nil { return errors.Errorf("error allocating store ids: %s", err) } + sIdent := roachpb.StoreIdent{ ClusterID: n.clusterID.Get(), NodeID: n.Descriptor.NodeID, - StoreID: firstStoreID, + StoreID: startID, } for _, eng := range engines { if err := kvserver.InitEngine(ctx, eng, sIdent); err != nil {