Skip to content

Commit

Permalink
server: refactor the sticky engine registry into a VFS registry
Browse files Browse the repository at this point in the history
The 'StickyInMemEngineRegistry' has historically been the method of preserving
local physical state within tests across node/TestCluster restarts. This
registry predates Pebble and its pure-Go VFS interface. This commit refactors
the sticky engine registry into a registry of virtual filesystems. This
improves test coverage by exercising storage engine teardown and recovery code.

The previous behavior of reusing the storage Engine without closing and
re-opening is still supported with the server.ReuseEngines option.

Close #107177.
Epic: None
Release note: None
  • Loading branch information
jbowens committed Aug 2, 2023
1 parent 602bc79 commit 6ca7d2f
Show file tree
Hide file tree
Showing 29 changed files with 522 additions and 527 deletions.
11 changes: 5 additions & 6 deletions pkg/base/store_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,11 @@ type StoreSpec struct {
BallastSize *SizeSpec
InMemory bool
Attributes roachpb.Attributes
// StickyInMemoryEngineID is a unique identifier associated with a given
// store which will remain in memory even after the default Engine close
// until it has been explicitly cleaned up by CleanupStickyInMemEngine[s]
// or the process has been terminated.
// This only applies to in-memory storage engine.
StickyInMemoryEngineID string
// StickyVFSID is a unique identifier associated with a given store which
// will preserve the in-memory virtual file system (VFS) even after the
// storage engine has been closed. This only applies to in-memory storage
// engine.
StickyVFSID string
// UseFileRegistry is true if the "file registry" store version is desired.
// This is set by CCL code when encryption-at-rest is in use.
UseFileRegistry bool
Expand Down
14 changes: 5 additions & 9 deletions pkg/cli/debug_recover_loss_of_quorum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,19 +328,18 @@ func TestStageVersionCheck(t *testing.T) {
})
defer c.Cleanup()

storeReg := server.NewStickyInMemEnginesRegistry()
defer storeReg.CloseAllStickyInMemEngines()
storeReg := server.NewStickyVFSRegistry()
tc := testcluster.NewTestCluster(t, 4, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: map[int]base.TestServerArgs{
0: {
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
StickyEngineRegistry: storeReg,
StickyVFSRegistry: storeReg,
},
},
StoreSpecs: []base.StoreSpec{
{InMemory: true, StickyInMemoryEngineID: "1"},
{InMemory: true, StickyVFSID: "1"},
},
},
},
Expand Down Expand Up @@ -378,7 +377,7 @@ func TestStageVersionCheck(t *testing.T) {
})
require.NoError(t, err, "force local must fix incorrect version")
// Check that stored plan has version matching cluster version.
fs, err := storeReg.GetUnderlyingFS(base.StoreSpec{InMemory: true, StickyInMemoryEngineID: "1"})
fs, err := storeReg.Get(base.StoreSpec{InMemory: true, StickyVFSID: "1"})
require.NoError(t, err, "failed to get shared store fs")
ps := loqrecovery.NewPlanStore("", fs)
p, ok, err := ps.LoadPlan()
Expand Down Expand Up @@ -434,9 +433,6 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) {
listenerReg := listenerutil.NewListenerRegistry()
defer listenerReg.Close()

storeReg := server.NewStickyInMemEnginesRegistry()
defer storeReg.CloseAllStickyInMemEngines()

// Test cluster contains 3 nodes that we would turn into a single node
// cluster using loss of quorum recovery. To do that, we will terminate
// two nodes and run recovery on remaining one. Restarting node should
Expand All @@ -452,7 +448,7 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) {
sa[i] = base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
StickyEngineRegistry: storeReg,
StickyVFSRegistry: server.NewStickyVFSRegistry(),
},
},
StoreSpecs: []base.StoreSpec{
Expand Down
16 changes: 8 additions & 8 deletions pkg/cli/democluster/demo_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type transientCluster struct {
adminPassword string
adminUser username.SQLUsername

stickyEngineRegistry server.StickyInMemEnginesRegistry
stickyVFSRegistry server.StickyVFSRegistry

drainAndShutdown func(ctx context.Context, adminClient serverpb.AdminClient) error

Expand Down Expand Up @@ -210,7 +210,7 @@ func NewDemoCluster(
}
}

c.stickyEngineRegistry = server.NewStickyInMemEnginesRegistry()
c.stickyVFSRegistry = server.NewStickyVFSRegistry()
return c, nil
}

Expand Down Expand Up @@ -314,7 +314,7 @@ func (c *transientCluster) Start(ctx context.Context) (err error) {
// individual servers' Stop() methods have been registered
// via createAndAddNode() above.
c.stopper.AddCloser(stop.CloserFn(func() {
c.stickyEngineRegistry.CloseAllStickyInMemEngines()
c.stickyVFSRegistry.CloseAllEngines()
}))

// Start the remaining nodes asynchronously.
Expand Down Expand Up @@ -600,7 +600,7 @@ func (c *transientCluster) createAndAddNode(
}
args := c.demoCtx.testServerArgsForTransientCluster(
socketDetails, idx, joinAddr, c.demoDir,
c.stickyEngineRegistry,
c.stickyVFSRegistry,
)
if idx == 0 {
// The first node also auto-inits the cluster.
Expand Down Expand Up @@ -878,11 +878,11 @@ func (demoCtx *Context) testServerArgsForTransientCluster(
serverIdx int,
joinAddr string,
demoDir string,
stickyEngineRegistry server.StickyInMemEnginesRegistry,
stickyVFSRegistry server.StickyVFSRegistry,
) base.TestServerArgs {
// Assign a path to the store spec, to be saved.
storeSpec := base.DefaultTestStoreSpec
storeSpec.StickyInMemoryEngineID = fmt.Sprintf("demo-server%d", serverIdx)
storeSpec.StickyVFSID = fmt.Sprintf("demo-server%d", serverIdx)

args := base.TestServerArgs{
SocketFile: sock.filename(),
Expand All @@ -902,7 +902,7 @@ func (demoCtx *Context) testServerArgsForTransientCluster(

Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
StickyEngineRegistry: stickyEngineRegistry,
StickyVFSRegistry: stickyVFSRegistry,
},
JobsTestingKnobs: &jobs.TestingKnobs{
// Allow the scheduler daemon to start earlier in demo.
Expand Down Expand Up @@ -1160,7 +1160,7 @@ func (c *transientCluster) startServerInternal(
socketDetails,
serverIdx,
c.firstServer.AdvRPCAddr(), c.demoDir,
c.stickyEngineRegistry)
c.stickyVFSRegistry)
srv, err := server.TestServerFactory.New(args)
if err != nil {
return 0, err
Expand Down
38 changes: 19 additions & 19 deletions pkg/cli/democluster/demo_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestTestServerArgsForTransientCluster(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

stickyEnginesRegistry := server.NewStickyInMemEnginesRegistry()
stickyVFSRegistry := server.NewStickyVFSRegistry()

testCases := []struct {
serverIdx int
Expand Down Expand Up @@ -81,7 +81,7 @@ func TestTestServerArgsForTransientCluster(t *testing.T) {
EnableDemoLoginEndpoint: true,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
StickyEngineRegistry: stickyEnginesRegistry,
StickyVFSRegistry: stickyVFSRegistry,
},
},
},
Expand All @@ -106,7 +106,7 @@ func TestTestServerArgsForTransientCluster(t *testing.T) {
EnableDemoLoginEndpoint: true,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
StickyEngineRegistry: stickyEnginesRegistry,
StickyVFSRegistry: stickyVFSRegistry,
},
},
},
Expand All @@ -120,15 +120,15 @@ func TestTestServerArgsForTransientCluster(t *testing.T) {
demoCtx.CacheSize = tc.cacheSize
demoCtx.SQLPort = 1234
demoCtx.HTTPPort = 4567
actual := demoCtx.testServerArgsForTransientCluster(unixSocketDetails{}, tc.serverIdx, tc.joinAddr, "", stickyEnginesRegistry)
actual := demoCtx.testServerArgsForTransientCluster(unixSocketDetails{}, tc.serverIdx, tc.joinAddr, "", stickyVFSRegistry)
stopper := actual.Stopper
defer stopper.Stop(context.Background())

assert.Len(t, actual.StoreSpecs, 1)
assert.Equal(
t,
fmt.Sprintf("demo-server%d", tc.serverIdx),
actual.StoreSpecs[0].StickyInMemoryEngineID,
actual.StoreSpecs[0].StickyVFSID,
)

// We cannot compare these.
Expand Down Expand Up @@ -169,13 +169,13 @@ func TestTransientClusterSimulateLatencies(t *testing.T) {

// Setup the transient cluster.
c := transientCluster{
demoCtx: demoCtx,
stopper: stop.NewStopper(),
demoDir: certsDir,
stickyEngineRegistry: server.NewStickyInMemEnginesRegistry(),
infoLog: log.Infof,
warnLog: log.Warningf,
shoutLog: log.Ops.Shoutf,
demoCtx: demoCtx,
stopper: stop.NewStopper(),
demoDir: certsDir,
stickyVFSRegistry: server.NewStickyVFSRegistry(),
infoLog: log.Infof,
warnLog: log.Warningf,
shoutLog: log.Ops.Shoutf,
}
// Stop the cluster when the test exits, including when it fails.
// This also calls the Stop() method on the stopper, and thus
Expand Down Expand Up @@ -281,13 +281,13 @@ func TestTransientClusterMultitenant(t *testing.T) {

// Setup the transient cluster.
c := transientCluster{
demoCtx: demoCtx,
stopper: stop.NewStopper(),
demoDir: certsDir,
stickyEngineRegistry: server.NewStickyInMemEnginesRegistry(),
infoLog: log.Infof,
warnLog: log.Warningf,
shoutLog: log.Ops.Shoutf,
demoCtx: demoCtx,
stopper: stop.NewStopper(),
demoDir: certsDir,
stickyVFSRegistry: server.NewStickyVFSRegistry(),
infoLog: log.Infof,
warnLog: log.Warningf,
shoutLog: log.Ops.Shoutf,
}
// Stop the cluster when the test exits, including when it fails.
// This also calls the Stop() method on the stopper, and thus
Expand Down
29 changes: 13 additions & 16 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,8 +783,7 @@ func TestLeasePreferencesRebalance(t *testing.T) {
func TestLeaseholderRelocate(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
stickyRegistry := server.NewStickyInMemEnginesRegistry()
defer stickyRegistry.CloseAllStickyInMemEngines()
stickyRegistry := server.NewStickyVFSRegistry()
ctx := context.Background()
manualClock := hlc.NewHybridManualClock()

Expand All @@ -809,14 +808,14 @@ func TestLeaseholderRelocate(t *testing.T) {
Locality: localities[i],
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
WallClock: manualClock,
StickyEngineRegistry: stickyRegistry,
WallClock: manualClock,
StickyVFSRegistry: stickyRegistry,
},
},
StoreSpecs: []base.StoreSpec{
{
InMemory: true,
StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10),
InMemory: true,
StickyVFSID: strconv.FormatInt(int64(i), 10),
},
},
}
Expand Down Expand Up @@ -919,8 +918,7 @@ func TestLeasePreferencesDuringOutage(t *testing.T) {
skip.WithIssue(t, 88769, "flaky test")
defer log.Scope(t).Close(t)

stickyRegistry := server.NewStickyInMemEnginesRegistry()
defer stickyRegistry.CloseAllStickyInMemEngines()
stickyRegistry := server.NewStickyVFSRegistry()
ctx := context.Background()
manualClock := hlc.NewHybridManualClock()
// Place all the leases in the us.
Expand Down Expand Up @@ -956,7 +954,7 @@ func TestLeasePreferencesDuringOutage(t *testing.T) {
Server: &server.TestingKnobs{
WallClock: manualClock,
DefaultZoneConfigOverride: &zcfg,
StickyEngineRegistry: stickyRegistry,
StickyVFSRegistry: stickyRegistry,
},
Store: &kvserver.StoreTestingKnobs{
// The Raft leadership may not end up on the eu node, but it needs to
Expand All @@ -966,8 +964,8 @@ func TestLeasePreferencesDuringOutage(t *testing.T) {
},
StoreSpecs: []base.StoreSpec{
{
InMemory: true,
StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10),
InMemory: true,
StickyVFSID: strconv.FormatInt(int64(i), 10),
},
},
}
Expand Down Expand Up @@ -1128,8 +1126,7 @@ func TestLeasesDontThrashWhenNodeBecomesSuspect(t *testing.T) {
kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, false) // override metamorphism

// Speed up lease transfers.
stickyRegistry := server.NewStickyInMemEnginesRegistry()
defer stickyRegistry.CloseAllStickyInMemEngines()
stickyRegistry := server.NewStickyVFSRegistry()
manualClock := hlc.NewHybridManualClock()
serverArgs := make(map[int]base.TestServerArgs)
numNodes := 4
Expand All @@ -1141,13 +1138,13 @@ func TestLeasesDontThrashWhenNodeBecomesSuspect(t *testing.T) {
Server: &server.TestingKnobs{
WallClock: manualClock,
DefaultZoneConfigOverride: &zcfg,
StickyEngineRegistry: stickyRegistry,
StickyVFSRegistry: stickyRegistry,
},
},
StoreSpecs: []base.StoreSpec{
{
InMemory: true,
StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10),
InMemory: true,
StickyVFSID: strconv.FormatInt(int64(i), 10),
},
},
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/client_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,17 +265,17 @@ func TestStoreMetrics(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()
stickyEngineRegistry := server.NewStickyInMemEnginesRegistry()
defer stickyEngineRegistry.CloseAllStickyInMemEngines()
stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEnginesDeprecated)
defer stickyVFSRegistry.CloseAllEngines()
const numServers int = 3
stickyServerArgs := make(map[int]base.TestServerArgs)
for i := 0; i < numServers; i++ {
stickyServerArgs[i] = base.TestServerArgs{
CacheSize: 1 << 20, /* 1 MiB */
StoreSpecs: []base.StoreSpec{
{
InMemory: true,
StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10),
InMemory: true,
StickyVFSID: strconv.FormatInt(int64(i), 10),
// Specify a size to trigger the BlockCache in Pebble.
Size: base.SizeSpec{
InBytes: 512 << 20, /* 512 MiB */
Expand All @@ -284,7 +284,7 @@ func TestStoreMetrics(t *testing.T) {
},
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
StickyEngineRegistry: stickyEngineRegistry,
StickyVFSRegistry: stickyVFSRegistry,
},
Store: &kvserver.StoreTestingKnobs{
DisableRaftLogQueue: true,
Expand Down
Loading

0 comments on commit 6ca7d2f

Please sign in to comment.