diff --git a/pkg/server/multi_store_test.go b/pkg/server/multi_store_test.go index 79a81ac703c5..bb8067209d85 100644 --- a/pkg/server/multi_store_test.go +++ b/pkg/server/multi_store_test.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" @@ -134,6 +135,106 @@ func TestAddNewStoresToExistingNodes(t *testing.T) { }) } +// TestAddNewStoresToRunningNodes tests database behavior with +// multiple stores per node, in particular when new stores are +// added while nodes are shut down. This test starts a cluster with +// three nodes, shuts down all nodes and adds a store to each node, +// and ensures nodes start back up successfully. See #39415. +func TestAddNewStoresToRunningNode(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Nine stores is a lot of goroutines. + skip.UnderStress(t, "too many new stores and nodes for stress") + skip.UnderRace(t, "too many new stores and nodes for race") + skip.UnderDeadlock(t, "too many new stores and nodes for deadlock") + + ctx := context.Background() + + ser := fs.NewStickyRegistry() + + const ( + numNodes = 3 + numStoresPerNodeInitially = 1 + numStoresPerNodeAfter = 3 + ) + + mkClusterArgs := func(numNodes, numStoresPerNode int) base.TestClusterArgs { + tcArgs := base.TestClusterArgs{ + // NB: it's important that this test wait for full replication. Otherwise, + // with only a single voter on the range that allocates store IDs, it can + // pass erroneously. StartTestCluster already calls it, but we call it + // again explicitly. + ReplicationMode: base.ReplicationAuto, + ServerArgsPerNode: map[int]base.TestServerArgs{}, + ServerArgs: base.TestServerArgs{ + DefaultTestTenant: base.TODOTestTenantDisabled, + }, + } + for srvIdx := 0; srvIdx < numNodes; srvIdx++ { + serverArgs := base.TestServerArgs{} + serverArgs.Knobs.Server = &server.TestingKnobs{StickyVFSRegistry: ser} + for storeIdx := 0; storeIdx < numStoresPerNode; storeIdx++ { + id := fmt.Sprintf("s%d.%d", srvIdx+1, storeIdx+1) + serverArgs.StoreSpecs = append( + serverArgs.StoreSpecs, + base.StoreSpec{InMemory: true, StickyVFSID: id}, + ) + } + tcArgs.ServerArgsPerNode[srvIdx] = serverArgs + } + return tcArgs + } + + tc := testcluster.StartTestCluster(t, numNodes, mkClusterArgs(numNodes, numStoresPerNodeInitially)) + defer tc.Stopper().Stop(ctx) + for i, s := range tc.Servers { + n := s.Node().(*server.Node) + transport := tc.GetFirstStoreFromServer(t, i).GetStoreConfig().Transport + for i := numStoresPerNodeInitially; i < numStoresPerNodeAfter; i++ { + eng := storage.NewDefaultInMemForTesting() + defer eng.Close() + storeCfg := kvserver.TestStoreConfig(s.SystemLayer().Clock()) + storeCfg.Transport = transport + require.NoError(t, n.AddStore(ctx, eng, storeCfg)) + } + } + + // Ensure all nodes have all stores available, and each store has a unique + // store ID. + testutils.SucceedsSoon(t, func() error { + var storeIDs []roachpb.StoreID + for _, server := range tc.Servers { + var storeCount = 0 + if err := server.GetStores().(*kvserver.Stores).VisitStores( + func(s *kvserver.Store) error { + storeCount++ + storeIDs = append(storeIDs, s.StoreID()) + return nil + }, + ); err != nil { + return errors.Wrap(err, "failed to visit all nodes") + } + + if storeCount != 3 { + return errors.Errorf("expected 3 stores to be available on n%s, got %d stores instead", server.NodeID(), storeCount) + } + } + + sort.Slice(storeIDs, func(i, j int) bool { + return storeIDs[i] < storeIDs[j] + }) + for i := range storeIDs { + expStoreID := roachpb.StoreID(i + 1) + if storeIDs[i] != expStoreID { + t.Fatalf("expected the %s store to have storeID s%s, found s%s", humanize.Ordinal(i+1), expStoreID, storeIDs[i]) + } + } + + return nil + }) +} + // TestMultiStoreIDAlloc validates that we don't accidentally re-use or // skip-over allocated store IDs in multi-store setups. func TestMultiStoreIDAlloc(t *testing.T) { diff --git a/pkg/server/node.go b/pkg/server/node.go index 0d1b8e422020..411d77cff22f 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -790,7 +790,7 @@ func (n *Node) start( // sequence ID generator stored in a system key. n.additionalStoreInitCh = make(chan struct{}) if err := n.stopper.RunAsyncTask(workersCtx, "initialize-additional-stores", func(ctx context.Context) { - if err := n.initializeAdditionalStores(ctx, state.uninitializedEngines, n.stopper); err != nil { + if err := n.initializeAdditionalStores(ctx, state.uninitializedEngines); err != nil { log.Fatalf(ctx, "while initializing additional stores: %v", err) } close(n.additionalStoreInitCh) @@ -989,13 +989,46 @@ func (n *Node) validateStores(ctx context.Context) error { }) } +// AddStore adds a store to a running node. The store will be started and +// shutdown when the nodes stopper is stopped. +func (n *Node) AddStore(ctx context.Context, eng storage.Engine, cfg kvserver.StoreConfig) error { + if n.clusterID.Get() == uuid.Nil { + return errors.New("missing cluster ID during initialization of additional store") + } + // Initialize all waiting stores by allocating a new store id for each + // and invoking kvserver.InitEngine() to persist it. We'll then + // construct a new store out of the initialized engine and attach it to + // ourselves. + // TODO: What is the best way to get an id? + storeID, err := allocateStoreIDs(ctx, n.Descriptor.NodeID, 1, n.storeCfg.DB) + if err != nil { + return errors.Wrap(err, "error allocating store ids") + } + sIdent := roachpb.StoreIdent{ + ClusterID: n.clusterID.Get(), + NodeID: n.Descriptor.NodeID, + StoreID: storeID, + } + if err := kvstorage.InitEngine(ctx, eng, sIdent); err != nil { + return err + } + + s := kvserver.NewStore(ctx, n.storeCfg, eng, &n.Descriptor) + if err := s.Start(ctx, n.stopper); err != nil { + return err + } + + n.addStore(ctx, s) + log.Infof(ctx, "initialized new store after startup s%s", s.StoreID()) + + return nil +} + // initializeAdditionalStores initializes the given set of engines once the // cluster and node ID have been established for this node. Store IDs are // allocated via a sequence id generator stored at a system key per node. The // new stores are added to n.stores. -func (n *Node) initializeAdditionalStores( - ctx context.Context, engines []storage.Engine, stopper *stop.Stopper, -) error { +func (n *Node) initializeAdditionalStores(ctx context.Context, engines []storage.Engine) error { if n.clusterID.Get() == uuid.Nil { return errors.New("missing cluster ID during initialization of additional store") } @@ -1022,7 +1055,7 @@ func (n *Node) initializeAdditionalStores( } s := kvserver.NewStore(ctx, n.storeCfg, eng, &n.Descriptor) - if err := s.Start(ctx, stopper); err != nil { + if err := s.Start(ctx, n.stopper); err != nil { return err }