Skip to content

Commit

Permalink
Merge pull request #6 from mohini-crl/2024-12-13-multi-store-add
Browse files Browse the repository at this point in the history
kvserver: test to add store to running node
  • Loading branch information
mohini-crl authored Dec 19, 2024
2 parents 43ebaa0 + a18536a commit 9db8f5f
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 5 deletions.
101 changes: 101 additions & 0 deletions pkg/server/multi_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
Expand Down Expand Up @@ -134,6 +135,106 @@ func TestAddNewStoresToExistingNodes(t *testing.T) {
})
}

// TestAddNewStoresToRunningNodes tests database behavior with
// multiple stores per node, in particular when new stores are
// added while nodes are shut down. This test starts a cluster with
// three nodes, shuts down all nodes and adds a store to each node,
// and ensures nodes start back up successfully. See #39415.
func TestAddNewStoresToRunningNode(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Nine stores is a lot of goroutines.
skip.UnderStress(t, "too many new stores and nodes for stress")
skip.UnderRace(t, "too many new stores and nodes for race")
skip.UnderDeadlock(t, "too many new stores and nodes for deadlock")

ctx := context.Background()

ser := fs.NewStickyRegistry()

const (
numNodes = 3
numStoresPerNodeInitially = 1
numStoresPerNodeAfter = 3
)

mkClusterArgs := func(numNodes, numStoresPerNode int) base.TestClusterArgs {
tcArgs := base.TestClusterArgs{
// NB: it's important that this test wait for full replication. Otherwise,
// with only a single voter on the range that allocates store IDs, it can
// pass erroneously. StartTestCluster already calls it, but we call it
// again explicitly.
ReplicationMode: base.ReplicationAuto,
ServerArgsPerNode: map[int]base.TestServerArgs{},
ServerArgs: base.TestServerArgs{
DefaultTestTenant: base.TODOTestTenantDisabled,
},
}
for srvIdx := 0; srvIdx < numNodes; srvIdx++ {
serverArgs := base.TestServerArgs{}
serverArgs.Knobs.Server = &server.TestingKnobs{StickyVFSRegistry: ser}
for storeIdx := 0; storeIdx < numStoresPerNode; storeIdx++ {
id := fmt.Sprintf("s%d.%d", srvIdx+1, storeIdx+1)
serverArgs.StoreSpecs = append(
serverArgs.StoreSpecs,
base.StoreSpec{InMemory: true, StickyVFSID: id},
)
}
tcArgs.ServerArgsPerNode[srvIdx] = serverArgs
}
return tcArgs
}

tc := testcluster.StartTestCluster(t, numNodes, mkClusterArgs(numNodes, numStoresPerNodeInitially))
defer tc.Stopper().Stop(ctx)
for i, s := range tc.Servers {
n := s.Node().(*server.Node)
transport := tc.GetFirstStoreFromServer(t, i).GetStoreConfig().Transport
for i := numStoresPerNodeInitially; i < numStoresPerNodeAfter; i++ {
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()
storeCfg := kvserver.TestStoreConfig(s.SystemLayer().Clock())
storeCfg.Transport = transport
require.NoError(t, n.AddStore(ctx, eng, storeCfg))
}
}

// Ensure all nodes have all stores available, and each store has a unique
// store ID.
testutils.SucceedsSoon(t, func() error {
var storeIDs []roachpb.StoreID
for _, server := range tc.Servers {
var storeCount = 0
if err := server.GetStores().(*kvserver.Stores).VisitStores(
func(s *kvserver.Store) error {
storeCount++
storeIDs = append(storeIDs, s.StoreID())
return nil
},
); err != nil {
return errors.Wrap(err, "failed to visit all nodes")
}

if storeCount != 3 {
return errors.Errorf("expected 3 stores to be available on n%s, got %d stores instead", server.NodeID(), storeCount)
}
}

sort.Slice(storeIDs, func(i, j int) bool {
return storeIDs[i] < storeIDs[j]
})
for i := range storeIDs {
expStoreID := roachpb.StoreID(i + 1)
if storeIDs[i] != expStoreID {
t.Fatalf("expected the %s store to have storeID s%s, found s%s", humanize.Ordinal(i+1), expStoreID, storeIDs[i])
}
}

return nil
})
}

// TestMultiStoreIDAlloc validates that we don't accidentally re-use or
// skip-over allocated store IDs in multi-store setups.
func TestMultiStoreIDAlloc(t *testing.T) {
Expand Down
43 changes: 38 additions & 5 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ func (n *Node) start(
// sequence ID generator stored in a system key.
n.additionalStoreInitCh = make(chan struct{})
if err := n.stopper.RunAsyncTask(workersCtx, "initialize-additional-stores", func(ctx context.Context) {
if err := n.initializeAdditionalStores(ctx, state.uninitializedEngines, n.stopper); err != nil {
if err := n.initializeAdditionalStores(ctx, state.uninitializedEngines); err != nil {
log.Fatalf(ctx, "while initializing additional stores: %v", err)
}
close(n.additionalStoreInitCh)
Expand Down Expand Up @@ -989,13 +989,46 @@ func (n *Node) validateStores(ctx context.Context) error {
})
}

// AddStore adds a store to a running node. The store will be started and
// shutdown when the nodes stopper is stopped.
func (n *Node) AddStore(ctx context.Context, eng storage.Engine, cfg kvserver.StoreConfig) error {
if n.clusterID.Get() == uuid.Nil {
return errors.New("missing cluster ID during initialization of additional store")
}
// Initialize all waiting stores by allocating a new store id for each
// and invoking kvserver.InitEngine() to persist it. We'll then
// construct a new store out of the initialized engine and attach it to
// ourselves.
// TODO: What is the best way to get an id?
storeID, err := allocateStoreIDs(ctx, n.Descriptor.NodeID, 1, n.storeCfg.DB)
if err != nil {
return errors.Wrap(err, "error allocating store ids")
}
sIdent := roachpb.StoreIdent{
ClusterID: n.clusterID.Get(),
NodeID: n.Descriptor.NodeID,
StoreID: storeID,
}
if err := kvstorage.InitEngine(ctx, eng, sIdent); err != nil {
return err
}

s := kvserver.NewStore(ctx, n.storeCfg, eng, &n.Descriptor)
if err := s.Start(ctx, n.stopper); err != nil {
return err
}

n.addStore(ctx, s)
log.Infof(ctx, "initialized new store after startup s%s", s.StoreID())

return nil
}

// initializeAdditionalStores initializes the given set of engines once the
// cluster and node ID have been established for this node. Store IDs are
// allocated via a sequence id generator stored at a system key per node. The
// new stores are added to n.stores.
func (n *Node) initializeAdditionalStores(
ctx context.Context, engines []storage.Engine, stopper *stop.Stopper,
) error {
func (n *Node) initializeAdditionalStores(ctx context.Context, engines []storage.Engine) error {
if n.clusterID.Get() == uuid.Nil {
return errors.New("missing cluster ID during initialization of additional store")
}
Expand All @@ -1022,7 +1055,7 @@ func (n *Node) initializeAdditionalStores(
}

s := kvserver.NewStore(ctx, n.storeCfg, eng, &n.Descriptor)
if err := s.Start(ctx, stopper); err != nil {
if err := s.Start(ctx, n.stopper); err != nil {
return err
}

Expand Down

0 comments on commit 9db8f5f

Please sign in to comment.