From ef1045b2224ea22a7ee40cabc06c94b3cdb5a4ee Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Fri, 28 Jul 2023 16:42:29 -0400 Subject: [PATCH] server: refactor the sticky engine registry into a VFS registry The 'StickyInMemEngineRegistry' has historically been the method of preserving local physical state within tests across node/TestCluster restarts. This registry predates Pebble and its pure-Go VFS interface. This commit refactors the sticky engine registry into a registry of virtual filesystems. This improves test coverage by exercising storage engine teardown and recovery code. The previous behavior of reusing the storage Engine without closing and re-opening is still supported with the server.ReuseEngines option. Close #107177. Epic: None Release note: None --- pkg/base/store_spec.go | 11 +- pkg/cli/debug_recover_loss_of_quorum_test.go | 15 +- pkg/cli/democluster/demo_cluster.go | 16 +- pkg/cli/democluster/demo_cluster_test.go | 38 +-- pkg/kv/kvserver/client_lease_test.go | 29 +-- pkg/kv/kvserver/client_metrics_test.go | 10 +- pkg/kv/kvserver/client_raft_test.go | 208 ++++++++-------- .../client_replica_circuit_breaker_test.go | 8 +- pkg/kv/kvserver/client_replica_test.go | 20 +- pkg/kv/kvserver/client_split_test.go | 24 +- pkg/kv/kvserver/consistency_queue_test.go | 15 +- .../loqrecovery/server_integration_test.go | 47 ++-- pkg/kv/kvserver/node_liveness_test.go | 36 ++- pkg/kv/kvserver/replicate_queue_test.go | 4 +- pkg/server/BUILD.bazel | 4 +- pkg/server/config.go | 10 +- pkg/server/loss_of_quorum.go | 10 +- pkg/server/server_startup_test.go | 9 +- pkg/server/settings_cache_test.go | 7 +- pkg/server/sticky_engine.go | 208 ---------------- pkg/server/sticky_vfs.go | 226 ++++++++++++++++++ ...icky_engine_test.go => sticky_vfs_test.go} | 53 ++-- pkg/server/testing_knobs.go | 6 +- pkg/testutils/testcluster/testcluster.go | 8 +- pkg/testutils/testcluster/testcluster_test.go | 9 +- pkg/util/startup/startup_test.go | 9 +- 26 files changed, 516 insertions(+), 524 deletions(-) delete mode 100644 pkg/server/sticky_engine.go create mode 100644 pkg/server/sticky_vfs.go rename pkg/server/{sticky_engine_test.go => sticky_vfs_test.go} (65%) diff --git a/pkg/base/store_spec.go b/pkg/base/store_spec.go index 400f68ee8442..d121f3ecc78f 100644 --- a/pkg/base/store_spec.go +++ b/pkg/base/store_spec.go @@ -249,12 +249,11 @@ type StoreSpec struct { BallastSize *SizeSpec InMemory bool Attributes roachpb.Attributes - // StickyInMemoryEngineID is a unique identifier associated with a given - // store which will remain in memory even after the default Engine close - // until it has been explicitly cleaned up by CleanupStickyInMemEngine[s] - // or the process has been terminated. - // This only applies to in-memory storage engine. - StickyInMemoryEngineID string + // StickyVFSID is a unique identifier associated with a given store which + // will preserve the in-memory virtual file system (VFS) even after the + // storage engine has been closed. This only applies to in-memory storage + // engine. + StickyVFSID string // UseFileRegistry is true if the "file registry" store version is desired. // This is set by CCL code when encryption-at-rest is in use. UseFileRegistry bool diff --git a/pkg/cli/debug_recover_loss_of_quorum_test.go b/pkg/cli/debug_recover_loss_of_quorum_test.go index 2bc8dd151cd8..e19a3ed7a911 100644 --- a/pkg/cli/debug_recover_loss_of_quorum_test.go +++ b/pkg/cli/debug_recover_loss_of_quorum_test.go @@ -334,20 +334,18 @@ func TestStageVersionCheck(t *testing.T) { listenerReg := listenerutil.NewListenerRegistry() defer listenerReg.Close() - - storeReg := server.NewStickyInMemEnginesRegistry() - defer storeReg.CloseAllStickyInMemEngines() + storeReg := server.NewStickyVFSRegistry() tc := testcluster.NewTestCluster(t, 4, base.TestClusterArgs{ ReplicationMode: base.ReplicationManual, ServerArgsPerNode: map[int]base.TestServerArgs{ 0: { Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: storeReg, + StickyVFSRegistry: storeReg, }, }, StoreSpecs: []base.StoreSpec{ - {InMemory: true, StickyInMemoryEngineID: "1"}, + {InMemory: true, StickyVFSID: "1"}, }, }, }, @@ -389,7 +387,7 @@ func TestStageVersionCheck(t *testing.T) { }) require.NoError(t, err, "force local must fix incorrect version") // Check that stored plan has version matching cluster version. - fs, err := storeReg.GetUnderlyingFS(base.StoreSpec{InMemory: true, StickyInMemoryEngineID: "1"}) + fs, err := storeReg.Get(base.StoreSpec{InMemory: true, StickyVFSID: "1"}) require.NoError(t, err, "failed to get shared store fs") ps := loqrecovery.NewPlanStore("", fs) p, ok, err := ps.LoadPlan() @@ -445,9 +443,6 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) { listenerReg := listenerutil.NewListenerRegistry() defer listenerReg.Close() - storeReg := server.NewStickyInMemEnginesRegistry() - defer storeReg.CloseAllStickyInMemEngines() - // Test cluster contains 3 nodes that we would turn into a single node // cluster using loss of quorum recovery. To do that, we will terminate // two nodes and run recovery on remaining one. Restarting node should @@ -463,7 +458,7 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) { sa[i] = base.TestServerArgs{ Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: storeReg, + StickyVFSRegistry: server.NewStickyVFSRegistry(), }, }, StoreSpecs: []base.StoreSpec{ diff --git a/pkg/cli/democluster/demo_cluster.go b/pkg/cli/democluster/demo_cluster.go index 4caf9242d240..a17993f33457 100644 --- a/pkg/cli/democluster/demo_cluster.go +++ b/pkg/cli/democluster/demo_cluster.go @@ -86,7 +86,7 @@ type transientCluster struct { adminPassword string adminUser username.SQLUsername - stickyEngineRegistry server.StickyInMemEnginesRegistry + stickyVFSRegistry server.StickyVFSRegistry getAdminClient func(ctx context.Context, cfg server.Config) (serverpb.AdminClient, func(), error) drainAndShutdown func(ctx context.Context, adminClient serverpb.AdminClient) error @@ -212,7 +212,7 @@ func NewDemoCluster( } } - c.stickyEngineRegistry = server.NewStickyInMemEnginesRegistry() + c.stickyVFSRegistry = server.NewStickyVFSRegistry() return c, nil } @@ -316,7 +316,7 @@ func (c *transientCluster) Start(ctx context.Context) (err error) { // individual servers' Stop() methods have been registered // via createAndAddNode() above. c.stopper.AddCloser(stop.CloserFn(func() { - c.stickyEngineRegistry.CloseAllStickyInMemEngines() + c.stickyVFSRegistry.CloseAllEngines() })) // Start the remaining nodes asynchronously. @@ -618,7 +618,7 @@ func (c *transientCluster) createAndAddNode( } args := c.demoCtx.testServerArgsForTransientCluster( socketDetails, idx, joinAddr, c.demoDir, - c.stickyEngineRegistry, + c.stickyVFSRegistry, ) if idx == 0 { // The first node also auto-inits the cluster. @@ -889,11 +889,11 @@ func (demoCtx *Context) testServerArgsForTransientCluster( serverIdx int, joinAddr string, demoDir string, - stickyEngineRegistry server.StickyInMemEnginesRegistry, + stickyVFSRegistry server.StickyVFSRegistry, ) base.TestServerArgs { // Assign a path to the store spec, to be saved. storeSpec := base.DefaultTestStoreSpec - storeSpec.StickyInMemoryEngineID = fmt.Sprintf("demo-server%d", serverIdx) + storeSpec.StickyVFSID = fmt.Sprintf("demo-server%d", serverIdx) args := base.TestServerArgs{ SocketFile: sock.filename(), @@ -913,7 +913,7 @@ func (demoCtx *Context) testServerArgsForTransientCluster( Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, JobsTestingKnobs: &jobs.TestingKnobs{ // Allow the scheduler daemon to start earlier in demo. @@ -1167,7 +1167,7 @@ func (c *transientCluster) startServerInternal( socketDetails, serverIdx, c.firstServer.ServingRPCAddr(), c.demoDir, - c.stickyEngineRegistry) + c.stickyVFSRegistry) s, err := server.TestServerFactory.New(args) if err != nil { return 0, err diff --git a/pkg/cli/democluster/demo_cluster_test.go b/pkg/cli/democluster/demo_cluster_test.go index 2228078bb082..7c66baf618ab 100644 --- a/pkg/cli/democluster/demo_cluster_test.go +++ b/pkg/cli/democluster/demo_cluster_test.go @@ -51,7 +51,7 @@ func TestTestServerArgsForTransientCluster(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEnginesRegistry := server.NewStickyInMemEnginesRegistry() + stickyVFSRegistry := server.NewStickyVFSRegistry() testCases := []struct { serverIdx int @@ -81,7 +81,7 @@ func TestTestServerArgsForTransientCluster(t *testing.T) { EnableDemoLoginEndpoint: true, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEnginesRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, }, @@ -106,7 +106,7 @@ func TestTestServerArgsForTransientCluster(t *testing.T) { EnableDemoLoginEndpoint: true, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEnginesRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, }, @@ -120,7 +120,7 @@ func TestTestServerArgsForTransientCluster(t *testing.T) { demoCtx.CacheSize = tc.cacheSize demoCtx.SQLPort = 1234 demoCtx.HTTPPort = 4567 - actual := demoCtx.testServerArgsForTransientCluster(unixSocketDetails{}, tc.serverIdx, tc.joinAddr, "", stickyEnginesRegistry) + actual := demoCtx.testServerArgsForTransientCluster(unixSocketDetails{}, tc.serverIdx, tc.joinAddr, "", stickyVFSRegistry) stopper := actual.Stopper defer stopper.Stop(context.Background()) @@ -128,7 +128,7 @@ func TestTestServerArgsForTransientCluster(t *testing.T) { assert.Equal( t, fmt.Sprintf("demo-server%d", tc.serverIdx), - actual.StoreSpecs[0].StickyInMemoryEngineID, + actual.StoreSpecs[0].StickyVFSID, ) // We cannot compare these. @@ -169,13 +169,13 @@ func TestTransientClusterSimulateLatencies(t *testing.T) { // Setup the transient cluster. c := transientCluster{ - demoCtx: demoCtx, - stopper: stop.NewStopper(), - demoDir: certsDir, - stickyEngineRegistry: server.NewStickyInMemEnginesRegistry(), - infoLog: log.Infof, - warnLog: log.Warningf, - shoutLog: log.Ops.Shoutf, + demoCtx: demoCtx, + stopper: stop.NewStopper(), + demoDir: certsDir, + stickyVFSRegistry: server.NewStickyVFSRegistry(), + infoLog: log.Infof, + warnLog: log.Warningf, + shoutLog: log.Ops.Shoutf, } // Stop the cluster when the test exits, including when it fails. // This also calls the Stop() method on the stopper, and thus @@ -281,13 +281,13 @@ func TestTransientClusterMultitenant(t *testing.T) { // Setup the transient cluster. c := transientCluster{ - demoCtx: demoCtx, - stopper: stop.NewStopper(), - demoDir: certsDir, - stickyEngineRegistry: server.NewStickyInMemEnginesRegistry(), - infoLog: log.Infof, - warnLog: log.Warningf, - shoutLog: log.Ops.Shoutf, + demoCtx: demoCtx, + stopper: stop.NewStopper(), + demoDir: certsDir, + stickyVFSRegistry: server.NewStickyVFSRegistry(), + infoLog: log.Infof, + warnLog: log.Warningf, + shoutLog: log.Ops.Shoutf, } // Stop the cluster when the test exits, including when it fails. // This also calls the Stop() method on the stopper, and thus diff --git a/pkg/kv/kvserver/client_lease_test.go b/pkg/kv/kvserver/client_lease_test.go index eb8c8b6c4479..29adb1e2c53a 100644 --- a/pkg/kv/kvserver/client_lease_test.go +++ b/pkg/kv/kvserver/client_lease_test.go @@ -784,8 +784,7 @@ func TestLeasePreferencesRebalance(t *testing.T) { func TestLeaseholderRelocate(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyRegistry.CloseAllStickyInMemEngines() + stickyRegistry := server.NewStickyVFSRegistry() ctx := context.Background() manualClock := hlc.NewHybridManualClock() @@ -810,14 +809,14 @@ func TestLeaseholderRelocate(t *testing.T) { Locality: localities[i], Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - WallClock: manualClock, - StickyEngineRegistry: stickyRegistry, + WallClock: manualClock, + StickyVFSRegistry: stickyRegistry, }, }, StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, } @@ -920,8 +919,7 @@ func TestLeasePreferencesDuringOutage(t *testing.T) { skip.WithIssue(t, 88769, "flaky test") defer log.Scope(t).Close(t) - stickyRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyRegistry.CloseAllStickyInMemEngines() + stickyRegistry := server.NewStickyVFSRegistry() ctx := context.Background() manualClock := hlc.NewHybridManualClock() // Place all the leases in the us. @@ -964,7 +962,7 @@ func TestLeasePreferencesDuringOutage(t *testing.T) { Server: &server.TestingKnobs{ WallClock: manualClock, DefaultZoneConfigOverride: &zcfg, - StickyEngineRegistry: stickyRegistry, + StickyVFSRegistry: stickyRegistry, }, Store: &kvserver.StoreTestingKnobs{ // The Raft leadership may not end up on the eu node, but it needs to @@ -974,8 +972,8 @@ func TestLeasePreferencesDuringOutage(t *testing.T) { }, StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, } @@ -1136,8 +1134,7 @@ func TestLeasesDontThrashWhenNodeBecomesSuspect(t *testing.T) { kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, false) // override metamorphism // Speed up lease transfers. - stickyRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyRegistry.CloseAllStickyInMemEngines() + stickyRegistry := server.NewStickyVFSRegistry() manualClock := hlc.NewHybridManualClock() serverArgs := make(map[int]base.TestServerArgs) numNodes := 4 @@ -1149,13 +1146,13 @@ func TestLeasesDontThrashWhenNodeBecomesSuspect(t *testing.T) { Server: &server.TestingKnobs{ WallClock: manualClock, DefaultZoneConfigOverride: &zcfg, - StickyEngineRegistry: stickyRegistry, + StickyVFSRegistry: stickyRegistry, }, }, StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, } diff --git a/pkg/kv/kvserver/client_metrics_test.go b/pkg/kv/kvserver/client_metrics_test.go index 7527ed2c9d73..fa28e4d68422 100644 --- a/pkg/kv/kvserver/client_metrics_test.go +++ b/pkg/kv/kvserver/client_metrics_test.go @@ -263,8 +263,8 @@ func TestStoreMetrics(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEnginesDeprecated) + defer stickyVFSRegistry.CloseAllEngines() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) for i := 0; i < numServers; i++ { @@ -272,8 +272,8 @@ func TestStoreMetrics(t *testing.T) { CacheSize: 1 << 20, /* 1 MiB */ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), // Specify a size to trigger the BlockCache in Pebble. Size: base.SizeSpec{ InBytes: 512 << 20, /* 512 MiB */ @@ -282,7 +282,7 @@ func TestStoreMetrics(t *testing.T) { }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, Store: &kvserver.StoreTestingKnobs{ DisableRaftLogQueue: true, diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 0d9d9dce8472..cc921074b9c0 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -88,8 +88,7 @@ func TestStoreRecoverFromEngine(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -101,13 +100,13 @@ func TestStoreRecoverFromEngine(t *testing.T) { ServerArgs: base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: "1", + InMemory: true, + StickyVFSID: "1", }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, }, @@ -197,8 +196,7 @@ func TestStoreRecoverWithErrors(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -213,7 +211,7 @@ func TestStoreRecoverWithErrors(t *testing.T) { ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, Store: &kvserver.StoreTestingKnobs{ EvalKnobs: kvserverbase.BatchEvalTestingKnobs{ @@ -229,8 +227,8 @@ func TestStoreRecoverWithErrors(t *testing.T) { }, StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: "1", + InMemory: true, + StickyVFSID: "1", }, }, }, @@ -343,8 +341,7 @@ func TestRestoreReplicas(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -354,13 +351,13 @@ func TestRestoreReplicas(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, } @@ -666,8 +663,10 @@ func TestSnapshotAfterTruncation(t *testing.T) { name = "differentTerm" } t.Run(name, func(t *testing.T) { - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + // TODO(jackson): Currently this test uses ReuseEnginesDeprecated because + // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. + stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEnginesDeprecated) + defer stickyVFSRegistry.CloseAllEngines() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -677,13 +676,13 @@ func TestSnapshotAfterTruncation(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, } @@ -1513,8 +1512,10 @@ func TestConcurrentRaftSnapshots(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + // TODO(jackson): Currently this test uses ReuseEnginesDeprecated because + // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. + stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEnginesDeprecated) + defer stickyVFSRegistry.CloseAllEngines() const numServers int = 5 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -1522,13 +1523,13 @@ func TestConcurrentRaftSnapshots(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, } @@ -1599,8 +1600,7 @@ func TestReplicateAfterRemoveAndSplit(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -1608,13 +1608,13 @@ func TestReplicateAfterRemoveAndSplit(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, Store: &kvserver.StoreTestingKnobs{ // Disable the replica GC queue so that it doesn't accidentally pick up the @@ -1681,8 +1681,7 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() raftConfig := base.RaftConfig{ // Drop the raft tick interval so the Raft group is ticked more. @@ -1704,14 +1703,14 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, RaftConfig: raftConfig, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, Store: &kvserver.StoreTestingKnobs{ // Disable leader transfers during leaseholder changes so that we @@ -2004,8 +2003,10 @@ func TestProgressWithDownNode(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + // TODO(jackson): Currently this test uses ReuseEnginesDeprecated because + // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. + stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEnginesDeprecated) + defer stickyVFSRegistry.CloseAllEngines() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -2013,13 +2014,13 @@ func TestProgressWithDownNode(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, } @@ -2089,22 +2090,22 @@ func runReplicateRestartAfterTruncation(t *testing.T, removeBeforeTruncateAndReA ctx := context.Background() manualClock := hlc.NewHybridManualClock() - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEnginesDeprecated) + defer stickyVFSRegistry.CloseAllEngines() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) for i := 0; i < numServers; i++ { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, - WallClock: manualClock, + StickyVFSRegistry: stickyVFSRegistry, + WallClock: manualClock, }, }, RaftConfig: base.RaftConfig{ @@ -2194,8 +2195,10 @@ func testReplicaAddRemove(t *testing.T, addFirst bool) { ctx := context.Background() manualClock := hlc.NewHybridManualClock() - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + // TODO(jackson): Currently this test uses ReuseEnginesDeprecated because + // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. + stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEnginesDeprecated) + defer stickyVFSRegistry.CloseAllEngines() const numServers int = 4 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -2203,14 +2206,14 @@ func testReplicaAddRemove(t *testing.T, addFirst bool) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, - WallClock: manualClock, + StickyVFSRegistry: stickyVFSRegistry, + WallClock: manualClock, }, Store: &kvserver.StoreTestingKnobs{ // We're gonna want to validate the state of the store before and @@ -3397,8 +3400,10 @@ func TestReplicateRogueRemovedNode(t *testing.T) { ctx := context.Background() manualClock := hlc.NewHybridManualClock() - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + // TODO(jackson): Currently this test uses ReuseEnginesDeprecated because + // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. + stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEnginesDeprecated) + defer stickyVFSRegistry.CloseAllEngines() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -3406,14 +3411,14 @@ func TestReplicateRogueRemovedNode(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, - WallClock: manualClock, + StickyVFSRegistry: stickyVFSRegistry, + WallClock: manualClock, }, Store: &kvserver.StoreTestingKnobs{ // Newly-started stores (including the "rogue" one) should not GC @@ -3740,8 +3745,10 @@ func TestReplicaTooOldGC(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + // TODO(jackson): Currently this test uses ReuseEnginesDeprecated because + // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. + stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEnginesDeprecated) + defer stickyVFSRegistry.CloseAllEngines() const numServers int = 4 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -3749,13 +3756,13 @@ func TestReplicaTooOldGC(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, Store: &kvserver.StoreTestingKnobs{ DisableScanner: true, @@ -3840,14 +3847,13 @@ func TestReplicaLazyLoad(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() stickyServerArgs := base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: "1", + InMemory: true, + StickyVFSID: "1", }, }, RaftConfig: base.RaftConfig{ @@ -3855,7 +3861,7 @@ func TestReplicaLazyLoad(t *testing.T) { }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, Store: &kvserver.StoreTestingKnobs{ DisableScanner: true, @@ -3904,8 +3910,10 @@ func TestReplicateReAddAfterDown(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + // TODO(jackson): Currently this test uses ReuseEnginesDeprecated because + // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. + stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEnginesDeprecated) + defer stickyVFSRegistry.CloseAllEngines() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -3913,13 +3921,13 @@ func TestReplicateReAddAfterDown(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, } @@ -4346,8 +4354,7 @@ func TestInitRaftGroupOnRequest(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -4357,13 +4364,13 @@ func TestInitRaftGroupOnRequest(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, } @@ -4860,8 +4867,7 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -4910,14 +4916,14 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing Settings: st, StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - ContextTestingKnobs: knobs, - StickyEngineRegistry: stickyEngineRegistry, + ContextTestingKnobs: knobs, + StickyVFSRegistry: stickyVFSRegistry, }, }, } @@ -5256,9 +5262,9 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { keyA, keyB roachpb.Key, lhsID roachpb.RangeID, lhsPartition *testClusterPartitionedRange, - stickyEngineRegistry server.StickyInMemEnginesRegistry, + stickyVFSRegistry server.StickyVFSRegistry, ) { - stickyEngineRegistry = server.NewStickyInMemEnginesRegistry() + stickyVFSRegistry = server.NewStickyVFSRegistry() lisReg := listenerutil.NewListenerRegistry() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -5266,13 +5272,13 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, Store: &kvserver.StoreTestingKnobs{ // Newly-started stores (including the "rogue" one) should not GC @@ -5326,7 +5332,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { increment(t, db, keyA, 1) tc.WaitForValues(t, keyA, []int64{6, 7, 7}) - return tc, db, keyA, keyB, lhsID, lhsPartition, stickyEngineRegistry + return tc, db, keyA, keyB, lhsID, lhsPartition, stickyVFSRegistry } // In this case we only have the LHS partitioned. The RHS will learn about its @@ -5335,8 +5341,8 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // partition the RHS and ensure that the split does not clobber the RHS's hard // state. t.Run("(1) no RHS partition", func(t *testing.T) { - tc, db, keyA, keyB, _, lhsPartition, stickyEngineRegistry := setup(t) - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + tc, db, keyA, keyB, _, lhsPartition, stickyVFSRegistry := setup(t) + defer stickyVFSRegistry.CloseAllEngines() defer tc.Stopper().Stop(ctx) tc.SplitRangeOrFatal(t, keyB) @@ -5388,8 +5394,8 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // This case is like the previous case except the store crashes after // laying down a tombstone. t.Run("(2) no RHS partition, with restart", func(t *testing.T) { - tc, db, keyA, keyB, _, lhsPartition, stickyEngineRegistry := setup(t) - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + tc, db, keyA, keyB, _, lhsPartition, stickyVFSRegistry := setup(t) + defer stickyVFSRegistry.CloseAllEngines() defer tc.Stopper().Stop(ctx) tc.SplitRangeOrFatal(t, keyB) @@ -5461,8 +5467,8 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // the split is processed. We partition the RHS's new replica ID before // processing the split to ensure that the RHS doesn't get initialized. t.Run("(3) initial replica RHS partition, no restart", func(t *testing.T) { - tc, db, keyA, keyB, _, lhsPartition, stickyEngineRegistry := setup(t) - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + tc, db, keyA, keyB, _, lhsPartition, stickyVFSRegistry := setup(t) + defer stickyVFSRegistry.CloseAllEngines() defer tc.Stopper().Stop(ctx) var rhsPartition *testClusterPartitionedRange partitionReplicaOnSplit(t, tc, keyB, lhsPartition, &rhsPartition) @@ -5523,8 +5529,8 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // its higher replica ID the store crashes and forgets. The RHS replica gets // initialized by the split. t.Run("(4) initial replica RHS partition, with restart", func(t *testing.T) { - tc, db, keyA, keyB, _, lhsPartition, stickyEngineRegistry := setup(t) - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + tc, db, keyA, keyB, _, lhsPartition, stickyVFSRegistry := setup(t) + defer stickyVFSRegistry.CloseAllEngines() defer tc.Stopper().Stop(ctx) var rhsPartition *testClusterPartitionedRange @@ -5642,9 +5648,7 @@ func TestElectionAfterRestart(t *testing.T) { const electionTimeoutTicks = 30 const raftTickInterval = 200 * time.Millisecond - r := server.NewStickyInMemEnginesRegistry() - defer r.CloseAllStickyInMemEngines() - + r := server.NewStickyVFSRegistry() newTCArgs := func(parallel bool, replMode base.TestClusterReplicationMode, onTimeoutCampaign func(roachpb.RangeID)) base.TestClusterArgs { return base.TestClusterArgs{ ReplicationMode: replMode, @@ -5656,7 +5660,7 @@ func TestElectionAfterRestart(t *testing.T) { }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: r, + StickyVFSRegistry: r, }, Store: &kvserver.StoreTestingKnobs{ OnRaftTimeoutCampaign: onTimeoutCampaign, diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index de7c576c9f67..1f4994a2a1fb 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -804,7 +804,7 @@ func setupCircuitBreakerTest(t *testing.T) *circuitBreakerTest { raftCfg.SetDefaults() raftCfg.RaftHeartbeatIntervalTicks = 1 raftCfg.RaftElectionTimeoutTicks = 2 - reg := server.NewStickyInMemEnginesRegistry() + reg := server.NewStickyVFSRegistry() args := base.TestClusterArgs{ ReplicationMode: base.ReplicationManual, ServerArgs: base.TestServerArgs{ @@ -812,15 +812,15 @@ func setupCircuitBreakerTest(t *testing.T) *circuitBreakerTest { RaftConfig: raftCfg, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - WallClock: manualClock, - StickyEngineRegistry: reg, + WallClock: manualClock, + StickyVFSRegistry: reg, }, Store: storeKnobs, }, }, } tc := testcluster.StartTestCluster(t, 2, args) - tc.Stopper().AddCloser(stop.CloserFn(reg.CloseAllStickyInMemEngines)) + tc.Stopper().AddCloser(stop.CloserFn(reg.CloseAllEngines)) _, err := tc.ServerConn(0).Exec(`SET CLUSTER SETTING kv.replica_circuit_breaker.slow_replication_threshold = '45s'`) require.NoError(t, err) diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 3a9049fd1eec..7d2119805ec9 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -2172,8 +2172,7 @@ func TestLeaseNotUsedAfterRestart(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -2187,14 +2186,14 @@ func TestLeaseNotUsedAfterRestart(t *testing.T) { ServerArgs: base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: "1", + InMemory: true, + StickyVFSID: "1", }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - WallClock: manual, - StickyEngineRegistry: stickyEngineRegistry, + WallClock: manual, + StickyVFSRegistry: stickyVFSRegistry, }, Store: &kvserver.StoreTestingKnobs{ LeaseRequestEvent: func(ts hlc.Timestamp, _ roachpb.StoreID, _ roachpb.RangeID) *kvpb.Error { @@ -4800,8 +4799,7 @@ func TestTenantID(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() ctx := context.Background() // Create a config with a sticky-in-mem engine so we can restart the server. // We also configure the settings to be as robust as possible to problems @@ -4811,13 +4809,13 @@ func TestTenantID(t *testing.T) { Insecure: true, StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: "1", + InMemory: true, + StickyVFSID: "1", }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, } diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index d7a3adde4a9b..f583066e7554 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -1392,7 +1392,7 @@ func TestStoreRangeSplitBackpressureWrites(t *testing.T) { // See https://github.com/cockroachdb/cockroach/issues/1644. func runSetupSplitSnapshotRace( t *testing.T, - stickyEnginesRegistry server.StickyInMemEnginesRegistry, + stickyVFSRegistry server.StickyVFSRegistry, testFn func(*testcluster.TestCluster, roachpb.Key, roachpb.Key), ) { const numServers int = 6 @@ -1401,13 +1401,13 @@ func runSetupSplitSnapshotRace( stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEnginesRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, Store: &kvserver.StoreTestingKnobs{ DisableGCQueue: true, @@ -1542,10 +1542,12 @@ func TestSplitSnapshotRace_SplitWins(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + // TODO(jackson): Currently this test uses ReuseEnginesDeprecated because + // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. + stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEnginesDeprecated) + defer stickyVFSRegistry.CloseAllEngines() - runSetupSplitSnapshotRace(t, stickyEngineRegistry, func(tc *testcluster.TestCluster, leftKey, rightKey roachpb.Key) { + runSetupSplitSnapshotRace(t, stickyVFSRegistry, func(tc *testcluster.TestCluster, leftKey, rightKey roachpb.Key) { // Bring the left range up first so that the split happens before it sees a snapshot. for i := 1; i <= 3; i++ { require.NoError(t, tc.RestartServer(i)) @@ -1579,10 +1581,12 @@ func TestSplitSnapshotRace_SnapshotWins(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + // TODO(jackson): Currently this test uses ReuseEngines because + // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. + stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEnginesDeprecated) + defer stickyVFSRegistry.CloseAllEngines() - runSetupSplitSnapshotRace(t, stickyEngineRegistry, func(tc *testcluster.TestCluster, leftKey, rightKey roachpb.Key) { + runSetupSplitSnapshotRace(t, stickyVFSRegistry, func(tc *testcluster.TestCluster, leftKey, rightKey roachpb.Key) { // Bring the right range up first. for i := 3; i <= 5; i++ { require.NoError(t, tc.RestartServer(i)) diff --git a/pkg/kv/kvserver/consistency_queue_test.go b/pkg/kv/kvserver/consistency_queue_test.go index 8019bd27fd32..4710fd54fd38 100644 --- a/pkg/kv/kvserver/consistency_queue_test.go +++ b/pkg/kv/kvserver/consistency_queue_test.go @@ -249,8 +249,7 @@ func TestCheckConsistencyInconsistent(t *testing.T) { // Test uses sticky registry to have persistent pebble state that could // be analyzed for existence of snapshots and to verify snapshot content // after failures. - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() // The cluster has 3 nodes, one store per node. The test writes a few KVs to a // range, which gets replicated to all 3 stores. Then it manually replaces an @@ -271,11 +270,11 @@ func TestCheckConsistencyInconsistent(t *testing.T) { serverArgsPerNode[i] = base.TestServerArgs{ Knobs: base.TestingKnobs{ Store: &testKnobs, - Server: &server.TestingKnobs{StickyEngineRegistry: stickyEngineRegistry}, + Server: &server.TestingKnobs{StickyVFSRegistry: stickyVFSRegistry}, }, StoreSpecs: []base.StoreSpec{{ - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }}, } } @@ -308,8 +307,8 @@ func TestCheckConsistencyInconsistent(t *testing.T) { } onDiskCheckpointPaths := func(nodeIdx int) []string { - fs, err := stickyEngineRegistry.GetUnderlyingFS( - base.StoreSpec{StickyInMemoryEngineID: strconv.FormatInt(int64(nodeIdx), 10)}) + fs, err := stickyVFSRegistry.Get( + base.StoreSpec{StickyVFSID: strconv.FormatInt(int64(nodeIdx), 10)}) require.NoError(t, err) store := tc.GetFirstStoreFromServer(t, nodeIdx) checkpointPath := filepath.Join(store.TODOEngine().GetAuxiliaryDir(), "checkpoints") @@ -389,7 +388,7 @@ func TestCheckConsistencyInconsistent(t *testing.T) { // Create a new store on top of checkpoint location inside existing in-mem // VFS to verify its contents. - fs, err := stickyEngineRegistry.GetUnderlyingFS(base.StoreSpec{StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10)}) + fs, err := stickyVFSRegistry.Get(base.StoreSpec{StickyVFSID: strconv.FormatInt(int64(i), 10)}) require.NoError(t, err) // Copy the min-version file so we can open the checkpoint as a store. require.NoError(t, vfs.Copy(fs, storage.MinVersionFilename, fs.PathJoin(cps[0], storage.MinVersionFilename))) diff --git a/pkg/kv/kvserver/loqrecovery/server_integration_test.go b/pkg/kv/kvserver/loqrecovery/server_integration_test.go index e19adfaab35f..01d10b536760 100644 --- a/pkg/kv/kvserver/loqrecovery/server_integration_test.go +++ b/pkg/kv/kvserver/loqrecovery/server_integration_test.go @@ -198,8 +198,7 @@ func TestGetPlanStagingState(t *testing.T) { ctx := context.Background() - tc, reg, planStores := prepTestCluster(t, 3) - defer reg.CloseAllStickyInMemEngines() + tc, _, planStores := prepTestCluster(t, 3) defer tc.Stopper().Stop(ctx) adm, err := tc.GetAdminClient(ctx, t, 0) @@ -260,8 +259,7 @@ func TestStageRecoveryPlans(t *testing.T) { ctx := context.Background() - tc, reg, _ := prepTestCluster(t, 3) - defer reg.CloseAllStickyInMemEngines() + tc, _, _ := prepTestCluster(t, 3) defer tc.Stopper().Stop(ctx) adm, err := tc.GetAdminClient(ctx, t, 0) @@ -301,8 +299,7 @@ func TestStageBadVersions(t *testing.T) { ctx := context.Background() - tc, reg, _ := prepTestCluster(t, 1) - defer reg.CloseAllStickyInMemEngines() + tc, _, _ := prepTestCluster(t, 1) defer tc.Stopper().Stop(ctx) adm, err := tc.GetAdminClient(ctx, t, 0) @@ -330,8 +327,7 @@ func TestStageConflictingPlans(t *testing.T) { ctx := context.Background() - tc, reg, _ := prepTestCluster(t, 3) - defer reg.CloseAllStickyInMemEngines() + tc, _, _ := prepTestCluster(t, 3) defer tc.Stopper().Stop(ctx) adm, err := tc.GetAdminClient(ctx, t, 0) @@ -370,8 +366,7 @@ func TestForcePlanUpdate(t *testing.T) { ctx := context.Background() - tc, reg, _ := prepTestCluster(t, 3) - defer reg.CloseAllStickyInMemEngines() + tc, _, _ := prepTestCluster(t, 3) defer tc.Stopper().Stop(ctx) adm, err := tc.GetAdminClient(ctx, t, 0) @@ -412,8 +407,7 @@ func TestNodeDecommissioned(t *testing.T) { ctx := context.Background() - tc, reg, _ := prepTestCluster(t, 3) - defer reg.CloseAllStickyInMemEngines() + tc, _, _ := prepTestCluster(t, 3) defer tc.Stopper().Stop(ctx) adm, err := tc.GetAdminClient(ctx, t, 0) @@ -445,8 +439,7 @@ func TestRejectDecommissionReachableNode(t *testing.T) { ctx := context.Background() - tc, reg, _ := prepTestCluster(t, 3) - defer reg.CloseAllStickyInMemEngines() + tc, _, _ := prepTestCluster(t, 3) defer tc.Stopper().Stop(ctx) adm, err := tc.GetAdminClient(ctx, t, 0) @@ -466,8 +459,7 @@ func TestStageRecoveryPlansToWrongCluster(t *testing.T) { ctx := context.Background() - tc, reg, _ := prepTestCluster(t, 3) - defer reg.CloseAllStickyInMemEngines() + tc, _, _ := prepTestCluster(t, 3) defer tc.Stopper().Stop(ctx) adm, err := tc.GetAdminClient(ctx, t, 0) @@ -498,8 +490,7 @@ func TestRetrieveRangeStatus(t *testing.T) { ctx := context.Background() - tc, reg, _ := prepTestCluster(t, 5) - defer reg.CloseAllStickyInMemEngines() + tc, _, _ := prepTestCluster(t, 5) defer tc.Stopper().Stop(ctx) // Use scratch range to ensure we have a range that loses quorum. @@ -555,8 +546,7 @@ func TestRetrieveApplyStatus(t *testing.T) { ctx := context.Background() - tc, reg, _ := prepTestCluster(t, 5) - defer reg.CloseAllStickyInMemEngines() + tc, _, _ := prepTestCluster(t, 5) defer tc.Stopper().Stop(ctx) // Use scratch range to ensure we have a range that loses quorum. @@ -652,8 +642,7 @@ func TestRejectBadVersionApplication(t *testing.T) { ctx := context.Background() - tc, reg, pss := prepTestCluster(t, 3) - defer reg.CloseAllStickyInMemEngines() + tc, _, pss := prepTestCluster(t, 3) defer tc.Stopper().Stop(ctx) adm, err := tc.GetAdminClient(ctx, t, 0) @@ -690,10 +679,10 @@ func TestRejectBadVersionApplication(t *testing.T) { func prepTestCluster( t *testing.T, nodes int, -) (*testcluster.TestCluster, server.StickyInMemEnginesRegistry, map[int]loqrecovery.PlanStore) { +) (*testcluster.TestCluster, server.StickyVFSRegistry, map[int]loqrecovery.PlanStore) { skip.UnderStressRace(t, "cluster frequently fails to start under stress race") - reg := server.NewStickyInMemEnginesRegistry() + reg := server.NewStickyVFSRegistry() lReg := listenerutil.NewListenerRegistry() @@ -705,7 +694,7 @@ func prepTestCluster( args.ServerArgsPerNode[i] = base.TestServerArgs{ Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: reg, + StickyVFSRegistry: reg, }, SpanConfig: &spanconfig.TestingKnobs{ ConfigureScratchRange: true, @@ -713,8 +702,8 @@ func prepTestCluster( }, StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, } @@ -730,8 +719,8 @@ func prepInMemPlanStores( ) map[int]loqrecovery.PlanStore { pss := make(map[int]loqrecovery.PlanStore) for id, args := range serverArgs { - reg := args.Knobs.Server.(*server.TestingKnobs).StickyEngineRegistry - store, err := reg.GetUnderlyingFS(args.StoreSpecs[0]) + reg := args.Knobs.Server.(*server.TestingKnobs).StickyVFSRegistry + store, err := reg.Get(args.StoreSpecs[0]) require.NoError(t, err, "can't create loq recovery plan store") pss[id] = loqrecovery.NewPlanStore(".", store) } diff --git a/pkg/kv/kvserver/node_liveness_test.go b/pkg/kv/kvserver/node_liveness_test.go index c639b45e6abb..2bd43f966dd3 100644 --- a/pkg/kv/kvserver/node_liveness_test.go +++ b/pkg/kv/kvserver/node_liveness_test.go @@ -157,8 +157,7 @@ func TestNodeLivenessInitialIncrement(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -170,13 +169,13 @@ func TestNodeLivenessInitialIncrement(t *testing.T) { ServerArgs: base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: "1", + InMemory: true, + StickyVFSID: "1", }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, }, @@ -502,8 +501,7 @@ func TestNodeLivenessRestart(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() const numServers int = 2 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -511,13 +509,13 @@ func TestNodeLivenessRestart(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, } @@ -889,8 +887,7 @@ func TestNodeLivenessSetDraining(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -900,13 +897,13 @@ func TestNodeLivenessSetDraining(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, } @@ -1227,8 +1224,7 @@ func verifyNodeIsDecommissioning(t *testing.T, tc *testcluster.TestCluster, node } func testNodeLivenessSetDecommissioning(t *testing.T, decommissionNodeIdx int) { - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -1238,13 +1234,13 @@ func testNodeLivenessSetDecommissioning(t *testing.T, decommissionNodeIdx int) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, } diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index edf66247ad23..ed08e9751b94 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -2082,8 +2082,6 @@ func TestReplicateQueueAcquiresInvalidLeases(t *testing.T) { st := cluster.MakeTestingClusterSettings() kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, false) // override metamorphism - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -2102,7 +2100,7 @@ func TestReplicateQueueAcquiresInvalidLeases(t *testing.T) { ScanMaxIdleTime: time.Millisecond, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: server.NewStickyVFSRegistry(), DefaultZoneConfigOverride: &zcfg, }, }, diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index a216cac10cee..45b665246998 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -71,7 +71,7 @@ go_library( "statements.go", "status.go", "status_local_file_retrieval.go", - "sticky_engine.go", + "sticky_vfs.go", "stop_trigger.go", "tcp_keepalive_manager.go", "tenant.go", @@ -441,7 +441,7 @@ go_test( "stats_test.go", "status_ext_test.go", "status_test.go", - "sticky_engine_test.go", + "sticky_vfs_test.go", "tenant_delayed_id_set_test.go", "tenant_range_lookup_test.go", "testserver_test.go", diff --git a/pkg/server/config.go b/pkg/server/config.go index 01f1b7bec0c9..6541d3e073ff 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -743,19 +743,19 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, error) { for i, spec := range cfg.Stores.Specs { log.Eventf(ctx, "initializing %+v", spec) - if spec.InMemory && spec.StickyInMemoryEngineID != "" { + if spec.InMemory && spec.StickyVFSID != "" { if cfg.TestingKnobs.Server == nil { return Engines{}, errors.AssertionFailedf("Could not create a sticky " + "engine no server knobs available to get a registry. " + - "Please use Knobs.Server.StickyEngineRegistry to provide one.") + "Please use Knobs.Server.StickyVFSRegistry to provide one.") } knobs := cfg.TestingKnobs.Server.(*TestingKnobs) - if knobs.StickyEngineRegistry == nil { + if knobs.StickyVFSRegistry == nil { return Engines{}, errors.Errorf("Could not create a sticky " + "engine no registry available. Please use " + - "Knobs.Server.StickyEngineRegistry to provide one.") + "Knobs.Server.StickyVFSRegistry to provide one.") } - eng, err := knobs.StickyEngineRegistry.GetOrCreateStickyInMemEngine(ctx, cfg, spec) + eng, err := knobs.StickyVFSRegistry.Open(ctx, cfg, spec) if err != nil { return Engines{}, err } diff --git a/pkg/server/loss_of_quorum.go b/pkg/server/loss_of_quorum.go index 050cb266893c..25b568392965 100644 --- a/pkg/server/loss_of_quorum.go +++ b/pkg/server/loss_of_quorum.go @@ -39,20 +39,20 @@ func newPlanStore(cfg Config) (loqrecovery.PlanStore, error) { path := spec.Path if spec.InMemory { path = "" - if spec.StickyInMemoryEngineID != "" { + if spec.StickyVFSID != "" { if cfg.TestingKnobs.Server == nil { return loqrecovery.PlanStore{}, errors.AssertionFailedf("Could not create a sticky " + "engine no server knobs available to get a registry. " + - "Please use Knobs.Server.StickyEngineRegistry to provide one.") + "Please use Knobs.Server.StickyVFSRegistry to provide one.") } knobs := cfg.TestingKnobs.Server.(*TestingKnobs) - if knobs.StickyEngineRegistry == nil { + if knobs.StickyVFSRegistry == nil { return loqrecovery.PlanStore{}, errors.Errorf("Could not create a sticky " + "engine no registry available. Please use " + - "Knobs.Server.StickyEngineRegistry to provide one.") + "Knobs.Server.StickyVFSRegistry to provide one.") } var err error - fs, err = knobs.StickyEngineRegistry.GetUnderlyingFS(spec) + fs, err = knobs.StickyVFSRegistry.Get(spec) if err != nil { return loqrecovery.PlanStore{}, err } diff --git a/pkg/server/server_startup_test.go b/pkg/server/server_startup_test.go index 363d21c016b6..91fef928954e 100644 --- a/pkg/server/server_startup_test.go +++ b/pkg/server/server_startup_test.go @@ -40,8 +40,7 @@ func TestStartupInjectedFailureSingleNode(t *testing.T) { rng, seed := randutil.NewLockedTestRand() t.Log("TestStartupInjectedFailure random seed", seed) - reg := server.NewStickyInMemEnginesRegistry() - defer reg.CloseAllStickyInMemEngines() + reg := server.NewStickyVFSRegistry() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -51,13 +50,13 @@ func TestStartupInjectedFailureSingleNode(t *testing.T) { ServerArgs: base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: "1", + InMemory: true, + StickyVFSID: "1", }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: reg, + StickyVFSRegistry: reg, }, SpanConfig: &spanconfig.TestingKnobs{ // Ensure that scratch range has proper zone config, otherwise it is diff --git a/pkg/server/settings_cache_test.go b/pkg/server/settings_cache_test.go index ea82dbf922e7..2e9ad9ca165e 100644 --- a/pkg/server/settings_cache_test.go +++ b/pkg/server/settings_cache_test.go @@ -65,16 +65,15 @@ func TestCachedSettingsServerRestart(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - stickyEngineRegistry := NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := NewStickyVFSRegistry() serverArgs := base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ - {InMemory: true, StickyInMemoryEngineID: "1"}, + {InMemory: true, StickyVFSID: "1"}, }, Knobs: base.TestingKnobs{ Server: &TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, } diff --git a/pkg/server/sticky_engine.go b/pkg/server/sticky_engine.go deleted file mode 100644 index 9532b631658d..000000000000 --- a/pkg/server/sticky_engine.go +++ /dev/null @@ -1,208 +0,0 @@ -// Copyright 2019 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package server - -import ( - "context" - - "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/errors" - "github.com/cockroachdb/pebble/vfs" -) - -// stickyInMemEngine extends a normal engine, but does not allow them to be -// closed using the normal Close() method, instead keeping the engines in -// memory until CloseAllStickyInMemEngines is called, hence being "sticky". -// This prevents users of the in memory engine from having to special -// case "sticky" engines on every instance of "Close". -// It is intended for use in demos and/or tests, where we want in-memory -// storage nodes to persist between killed nodes. -type stickyInMemEngine struct { - // id is the unique identifier for this sticky engine. - id string - // closed indicates whether the current engine has been closed. - closed bool - - // Engine extends the Engine interface. - storage.Engine - - // Underlying in-mem filesystem backing the engine - fs vfs.FS -} - -// StickyEngineRegistryConfigOption is a config option for a sticky engine -// registry that can be passed to NewStickyInMemEnginesRegistry. -type StickyEngineRegistryConfigOption func(cfg *stickyEngineRegistryConfig) - -// ReplaceEngines configures a sticky engine registry to return a new engine -// with the same underlying in-memory FS instead of simply reopening it in -// the case where it already exists. -var ReplaceEngines StickyEngineRegistryConfigOption = func(cfg *stickyEngineRegistryConfig) { - cfg.replaceEngines = true -} - -// StickyInMemEnginesRegistry manages the lifecycle of sticky engines. -type StickyInMemEnginesRegistry interface { - // GetOrCreateStickyInMemEngine returns an engine associated with the given id. - // It will create a new in-memory engine if one does not already exist. - // At most one engine with a given id can be active in - // "GetOrCreateStickyInMemEngine" at any given time. - // Note that if you re-create an existing sticky engine the new attributes - // and cache size will be ignored. - // One must Close() on the sticky engine before another can be fetched. - GetOrCreateStickyInMemEngine(ctx context.Context, cfg *Config, spec base.StoreSpec) (storage.Engine, error) - // GetUnderlyingFS returns FS backing in mem engine. If engine was not created - // error is returned. - GetUnderlyingFS(spec base.StoreSpec) (vfs.FS, error) - // CloseAllStickyInMemEngines closes all sticky in memory engines that were - // created by this registry. - CloseAllStickyInMemEngines() -} - -// stickyInMemEngine implements Engine. -var _ storage.Engine = &stickyInMemEngine{} - -// Close overwrites the default Engine interface to not close the underlying -// engine if called. We mark the state as closed to reflect a correct result -// in Closed(). -func (e *stickyInMemEngine) Close() { - e.closed = true -} - -// Closed overwrites the default Engine interface. -func (e *stickyInMemEngine) Closed() bool { - return e.closed -} - -// SetStoreID implements the StoreIDSetter interface. -func (e *stickyInMemEngine) SetStoreID(ctx context.Context, storeID int32) error { - return e.Engine.SetStoreID(ctx, storeID) -} - -// stickyInMemEnginesRegistryImpl is the bookkeeper for all active -// sticky engines, keyed by their id. It implements the -// StickyInMemEnginesRegistry interface. -type stickyInMemEnginesRegistryImpl struct { - entries map[string]*stickyInMemEngine - mu syncutil.Mutex - cfg stickyEngineRegistryConfig -} - -// NewStickyInMemEnginesRegistry creates a new StickyInMemEnginesRegistry. -func NewStickyInMemEnginesRegistry( - opts ...StickyEngineRegistryConfigOption, -) StickyInMemEnginesRegistry { - var cfg stickyEngineRegistryConfig - for _, opt := range opts { - opt(&cfg) - } - return &stickyInMemEnginesRegistryImpl{ - entries: map[string]*stickyInMemEngine{}, - cfg: cfg, - } -} - -// GetOrCreateStickyInMemEngine implements the StickyInMemEnginesRegistry interface. -func (registry *stickyInMemEnginesRegistryImpl) GetOrCreateStickyInMemEngine( - ctx context.Context, cfg *Config, spec base.StoreSpec, -) (storage.Engine, error) { - registry.mu.Lock() - defer registry.mu.Unlock() - - var fs vfs.FS - if engine, ok := registry.entries[spec.StickyInMemoryEngineID]; ok { - if !engine.closed { - return nil, errors.Errorf("sticky engine %s has not been closed", spec.StickyInMemoryEngineID) - } - if !registry.cfg.replaceEngines { - log.Infof(ctx, "re-using sticky in-mem engine %s", spec.StickyInMemoryEngineID) - engine.closed = false - return engine, nil - } - fs = engine.fs - registry.deleteEngine(spec.StickyInMemoryEngineID) - } else { - fs = vfs.NewMem() - } - options := []storage.ConfigOption{ - storage.Attributes(spec.Attributes), - storage.CacheSize(cfg.CacheSize), - storage.MaxSize(spec.Size.InBytes), - storage.EncryptionAtRest(spec.EncryptionOptions), - storage.ForStickyEngineTesting, - } - - if s := cfg.TestingKnobs.Store; s != nil { - stk := s.(*kvserver.StoreTestingKnobs) - if stk.SmallEngineBlocks { - options = append(options, storage.BlockSize(1)) - } - if len(stk.EngineKnobs) > 0 { - options = append(options, stk.EngineKnobs...) - } - } - - log.Infof(ctx, "creating new sticky in-mem engine %s", spec.StickyInMemoryEngineID) - engine := storage.InMemFromFS(ctx, fs, "", cluster.MakeClusterSettings(), options...) - - engineEntry := &stickyInMemEngine{ - id: spec.StickyInMemoryEngineID, - closed: false, - Engine: engine, - fs: fs, - } - registry.entries[spec.StickyInMemoryEngineID] = engineEntry - return engineEntry, nil -} - -func (registry *stickyInMemEnginesRegistryImpl) GetUnderlyingFS( - spec base.StoreSpec, -) (vfs.FS, error) { - registry.mu.Lock() - defer registry.mu.Unlock() - - if engine, ok := registry.entries[spec.StickyInMemoryEngineID]; ok { - return engine.fs, nil - } - return nil, errors.Errorf("engine '%s' was not created", spec.StickyInMemoryEngineID) -} - -// CloseAllStickyInMemEngines closes and removes all sticky in memory engines. -func (registry *stickyInMemEnginesRegistryImpl) CloseAllStickyInMemEngines() { - registry.mu.Lock() - defer registry.mu.Unlock() - - for id := range registry.entries { - registry.deleteEngine(id) - } -} - -func (registry *stickyInMemEnginesRegistryImpl) deleteEngine(id string) { - engine, ok := registry.entries[id] - if !ok { - return - } - engine.closed = true - engine.Engine.Close() - delete(registry.entries, id) -} - -type stickyEngineRegistryConfig struct { - // replaceEngines is true if a sticky engine registry should return a new - // engine with the same underlying in-memory FS instead of simply reopening - // it in the case where it already exists. - replaceEngines bool -} diff --git a/pkg/server/sticky_vfs.go b/pkg/server/sticky_vfs.go new file mode 100644 index 000000000000..7265c25dd89b --- /dev/null +++ b/pkg/server/sticky_vfs.go @@ -0,0 +1,226 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/vfs" +) + +// StickyVFSOption is a config option for a sticky engine +// registry that can be passed to NewStickyVFSRegistry. +type StickyVFSOption func(cfg *stickyConfig) + +// ReuseEnginesDeprecated configures a sticky VFS registry to return exactly the +// same Engine without closing and re-opening from the underlying in-memory FS. +// This option is deprecated. Callers should refactor their usage to not depend +// on ephemeral, non-persisted state of an Engine. +var ReuseEnginesDeprecated StickyVFSOption = func(cfg *stickyConfig) { + cfg.reuseEngines = true +} + +type stickyConfig struct { + // reuseEngines is true if a sticky engine registry should return an existing + // engine instead of reopening it from the underlying in-memory FS. + reuseEngines bool +} + +// StickyVFSRegistry manages the lifecycle of sticky in-memory filesystems. It +// is intended for use in demos and/or tests, where we want in-memory storage +// nodes to persist between killed nodes. +type StickyVFSRegistry interface { + // Open returns an engine associated with the given spec's StickyVFSID. If + // neither Open nor Get has been called with the given spec's StickyVFSID + // before, Open will create a new in-memory filesystem. Otherwise, the new + // Engine will re-open the existing store held within the in-memory + // filesystem. + // + // If the registry was created with the ReuseEnginesDeprecated option, Open + // will return an Engine whose Close method does not close the underlying + // engine. Subsequent Open calls will return the same Engine. With this + // option, at most one engine with a given id can be active at any given + // time. Note that with the ReuseEnginesDeprecated option, an Open call that + // returns an existing sticky engine will not respect modified configuration + // or attributes. When using the ReuseEnginesDeprecated option, the caller + // must call CloseAllEngines when they're finished to close the underlying + // engines. + Open(ctx context.Context, cfg *Config, spec base.StoreSpec) (storage.Engine, error) + // Get returns the named in-memory FS. + Get(spec base.StoreSpec) (vfs.FS, error) + // CloseAllEngines closes all open sticky in-memory engines that were + // created by this registry. Calling this method is required when using the + // ReuseEnginesDeprecated option. + CloseAllEngines() +} + +// stickyVFSRegistryImpl is the bookkeeper for all active sticky filesystems, +// keyed by their id. It implements the StickyVFSRegistry interface. +type stickyVFSRegistryImpl struct { + entries map[string]*vfs.MemFS + engines map[string]*stickyInMemEngine + mu syncutil.Mutex + cfg stickyConfig +} + +// NewStickyVFSRegistry creates a new StickyVFSRegistry. +func NewStickyVFSRegistry(opts ...StickyVFSOption) StickyVFSRegistry { + registry := &stickyVFSRegistryImpl{ + entries: map[string]*vfs.MemFS{}, + engines: map[string]*stickyInMemEngine{}, + } + for _, opt := range opts { + opt(®istry.cfg) + } + return registry +} + +// Open implements the StickyVFSRegistry interface. +func (registry *stickyVFSRegistryImpl) Open( + ctx context.Context, cfg *Config, spec base.StoreSpec, +) (storage.Engine, error) { + registry.mu.Lock() + defer registry.mu.Unlock() + + // Look up the VFS. + fs, ok := registry.entries[spec.StickyVFSID] + + // If the registry is configured to reuse whole Engines (eg, skipping + // shutdown and recovery of the storage engine), then check if we already + // have an open Engine. + if ok && registry.cfg.reuseEngines { + if engine, engineOk := registry.engines[spec.StickyVFSID]; engineOk { + if !engine.closed { + return nil, errors.Errorf("sticky engine %s has not been closed", spec.StickyVFSID) + } + log.Infof(ctx, "re-using existing sticky in-mem engine %s", spec.StickyVFSID) + engine.closed = false + return engine, nil + } + } + + // If the VFS doesn't yet exist, construct a new one and save the + // association. + if !ok { + fs = vfs.NewMem() + registry.entries[spec.StickyVFSID] = fs + } + + // Create a new engine. + options := []storage.ConfigOption{ + storage.Attributes(spec.Attributes), + storage.CacheSize(cfg.CacheSize), + storage.MaxSize(spec.Size.InBytes), + storage.EncryptionAtRest(spec.EncryptionOptions), + storage.ForStickyEngineTesting, + } + + if s := cfg.TestingKnobs.Store; s != nil { + stk := s.(*kvserver.StoreTestingKnobs) + if stk.SmallEngineBlocks { + options = append(options, storage.BlockSize(1)) + } + if len(stk.EngineKnobs) > 0 { + options = append(options, stk.EngineKnobs...) + } + } + + log.Infof(ctx, "creating engine with sticky in-memory VFS %s", spec.StickyVFSID) + engine := storage.InMemFromFS(ctx, fs, "", cluster.MakeClusterSettings(), options...) + + // If this registry is configured to keep Engines open rather recovering + // from filesystem state, wrap the Engine within a *stickyInMemEngine + // wrapper so that calls to Close do not Close the underlying Engine. + if registry.cfg.reuseEngines { + wrappedEngine := &stickyInMemEngine{ + id: spec.StickyVFSID, + closed: false, + Engine: engine, + fs: fs, + } + registry.engines[spec.StickyVFSID] = wrappedEngine + return wrappedEngine, nil + } + return engine, nil +} + +func (registry *stickyVFSRegistryImpl) Get(spec base.StoreSpec) (vfs.FS, error) { + registry.mu.Lock() + defer registry.mu.Unlock() + + if fs, ok := registry.entries[spec.StickyVFSID]; ok { + return fs, nil + } else { + fs = vfs.NewMem() + registry.entries[spec.StickyVFSID] = fs + return fs, nil + } +} + +// CloseAllEngines closes all open sticky in-memory engines that were +// created by this registry. Calling this method is required when using the +// ReuseEnginesDeprecated option. +func (registry *stickyVFSRegistryImpl) CloseAllEngines() { + registry.mu.Lock() + defer registry.mu.Unlock() + + for id, engine := range registry.engines { + engine.closed = true + engine.Engine.Close() + delete(registry.entries, id) + delete(registry.engines, id) + } +} + +// stickyInMemEngine extends a normal engine, but holds on to the Engine between +// Opens to allow subsequent Opens to reuse a previous Engine instance. This +// type is used only when a the StickyVFSRegistry is created with the +// ReuseEnginesDeprecated option. +// +// Engine.Close does not close the underlying Engine. The engine is kept open +// until CloseAllEngines is called, hence being "sticky". +type stickyInMemEngine struct { + // id is the unique identifier for this sticky engine. + id string + // closed indicates whether the current engine has been closed. + closed bool + // Engine extends the Engine interface. + storage.Engine + // Underlying in-mem filesystem backing the engine. + fs vfs.FS +} + +// stickyInMemEngine implements Engine. +var _ storage.Engine = &stickyInMemEngine{} + +// Close overwrites the default Engine interface to not close the underlying +// engine if called. We mark the state as closed to reflect a correct result +// in Closed(). +func (e *stickyInMemEngine) Close() { + e.closed = true +} + +// Closed overwrites the default Engine interface. +func (e *stickyInMemEngine) Closed() bool { + return e.closed +} + +// SetStoreID implements the StoreIDSetter interface. +func (e *stickyInMemEngine) SetStoreID(ctx context.Context, storeID int32) error { + return e.Engine.SetStoreID(ctx, storeID) +} diff --git a/pkg/server/sticky_engine_test.go b/pkg/server/sticky_vfs_test.go similarity index 65% rename from pkg/server/sticky_engine_test.go rename to pkg/server/sticky_vfs_test.go index f3e5053b1a48..7c4263d70a74 100644 --- a/pkg/server/sticky_engine_test.go +++ b/pkg/server/sticky_vfs_test.go @@ -32,32 +32,32 @@ func TestStickyEngines(t *testing.T) { cacheSize := int64(1 << 20) /* 1 MiB */ storeSize := int64(512 << 20) /* 512 MiB */ - registry := NewStickyInMemEnginesRegistry() + registry := NewStickyVFSRegistry(ReuseEnginesDeprecated) cfg1 := MakeConfig(ctx, cluster.MakeTestingClusterSettings()) cfg1.CacheSize = cacheSize spec1 := base.StoreSpec{ - StickyInMemoryEngineID: "engine1", - Attributes: attrs, - Size: base.SizeSpec{InBytes: storeSize}, + StickyVFSID: "engine1", + Attributes: attrs, + Size: base.SizeSpec{InBytes: storeSize}, } - engine1, err := registry.GetOrCreateStickyInMemEngine(ctx, &cfg1, spec1) + engine1, err := registry.Open(ctx, &cfg1, spec1) require.NoError(t, err) require.False(t, engine1.Closed()) cfg2 := MakeConfig(ctx, cluster.MakeTestingClusterSettings()) cfg2.CacheSize = cacheSize spec2 := base.StoreSpec{ - StickyInMemoryEngineID: "engine2", - Attributes: attrs, - Size: base.SizeSpec{InBytes: storeSize}, + StickyVFSID: "engine2", + Attributes: attrs, + Size: base.SizeSpec{InBytes: storeSize}, } - engine2, err := registry.GetOrCreateStickyInMemEngine(ctx, &cfg2, spec2) + engine2, err := registry.Open(ctx, &cfg2, spec2) require.NoError(t, err) require.False(t, engine2.Closed()) // Regetting the engine whilst it is not closed will fail. - _, err = registry.GetOrCreateStickyInMemEngine(ctx, &cfg1, spec1) + _, err = registry.Open(ctx, &cfg1, spec1) require.EqualError(t, err, "sticky engine engine1 has not been closed") // Close the engine, which allows it to be refetched. @@ -66,20 +66,20 @@ func TestStickyEngines(t *testing.T) { require.False(t, engine1.(*stickyInMemEngine).Engine.Closed()) // Refetching the engine should give back the same engine. - engine1Refetched, err := registry.GetOrCreateStickyInMemEngine(ctx, &cfg1, spec1) + engine1Refetched, err := registry.Open(ctx, &cfg1, spec1) require.NoError(t, err) require.Equal(t, engine1, engine1Refetched) require.False(t, engine1.Closed()) // Cleaning up everything asserts everything is closed. - registry.CloseAllStickyInMemEngines() + registry.CloseAllEngines() for _, engine := range []storage.Engine{engine1, engine2} { require.True(t, engine.Closed()) require.True(t, engine.(*stickyInMemEngine).Engine.Closed()) } } -func TestStickyEnginesReplaceEngines(t *testing.T) { +func TestStickyVFS(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -88,36 +88,35 @@ func TestStickyEnginesReplaceEngines(t *testing.T) { cacheSize := int64(1 << 20) /* 1 MiB */ storeSize := int64(512 << 20) /* 512 MiB */ - registry := NewStickyInMemEnginesRegistry(ReplaceEngines) + registry := NewStickyVFSRegistry() cfg1 := MakeConfig(ctx, cluster.MakeTestingClusterSettings()) cfg1.CacheSize = cacheSize spec1 := base.StoreSpec{ - StickyInMemoryEngineID: "engine1", - Attributes: attrs, - Size: base.SizeSpec{InBytes: storeSize}, + StickyVFSID: "engine1", + Attributes: attrs, + Size: base.SizeSpec{InBytes: storeSize}, } - engine1, err := registry.GetOrCreateStickyInMemEngine(ctx, &cfg1, spec1) + engine1, err := registry.Open(ctx, &cfg1, spec1) require.NoError(t, err) - fs1, err := registry.GetUnderlyingFS(spec1) + fs1, err := registry.Get(spec1) require.NoError(t, err) require.False(t, engine1.Closed()) engine1.Close() // Refetching the engine should give back a different engine with the same // underlying fs. - engine1Refetched, err := registry.GetOrCreateStickyInMemEngine(ctx, &cfg1, spec1) + engine2, err := registry.Open(ctx, &cfg1, spec1) require.NoError(t, err) - fs1Refetched, err := registry.GetUnderlyingFS(spec1) + fs2, err := registry.Get(spec1) require.NoError(t, err) - require.NotEqual(t, engine1, engine1Refetched) - require.Equal(t, fs1, fs1Refetched) + require.NotEqual(t, engine1, engine2) + require.Equal(t, fs1, fs2) require.True(t, engine1.Closed()) - require.False(t, engine1Refetched.Closed()) + require.False(t, engine2.Closed()) + engine2.Close() - registry.CloseAllStickyInMemEngines() - for _, engine := range []storage.Engine{engine1, engine1Refetched} { + for _, engine := range []storage.Engine{engine1, engine2} { require.True(t, engine.Closed()) - require.True(t, engine.(*stickyInMemEngine).Engine.Closed()) } } diff --git a/pkg/server/testing_knobs.go b/pkg/server/testing_knobs.go index 059b2437c0f7..d96453bf75e2 100644 --- a/pkg/server/testing_knobs.go +++ b/pkg/server/testing_knobs.go @@ -104,12 +104,12 @@ type TestingKnobs struct { // An (additional) callback invoked whenever a // node is permanently removed from the cluster. OnDecommissionedCallback func(livenesspb.Liveness) - // StickyEngineRegistry manages the lifecycle of sticky in memory engines, - // which can be enabled via base.StoreSpec.StickyInMemoryEngineID. + // StickyVFSRegistry manages the lifecycle of sticky in memory engines, + // which can be enabled via base.StoreSpec.StickyVFSID. // // When supplied to a TestCluster, StickyEngineIDs will be associated auto- // matically to the StoreSpecs used. - StickyEngineRegistry StickyInMemEnginesRegistry + StickyVFSRegistry StickyVFSRegistry // WallClock is used to inject a custom clock for testing the server. It is // typically either an hlc.HybridManualClock or hlc.ManualClock. WallClock hlc.WallClock diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index 2ceda1aac04b..2b7ef21b61be 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -300,10 +300,10 @@ func NewTestCluster(t testing.TB, nodes int, clusterArgs base.TestClusterArgs) * if len(serverArgs.StoreSpecs) == 0 { serverArgs.StoreSpecs = []base.StoreSpec{base.DefaultTestStoreSpec} } - if knobs, ok := serverArgs.Knobs.Server.(*server.TestingKnobs); ok && knobs.StickyEngineRegistry != nil { + if knobs, ok := serverArgs.Knobs.Server.(*server.TestingKnobs); ok && knobs.StickyVFSRegistry != nil { for j := range serverArgs.StoreSpecs { - if serverArgs.StoreSpecs[j].StickyInMemoryEngineID == "" { - serverArgs.StoreSpecs[j].StickyInMemoryEngineID = fmt.Sprintf("auto-node%d-store%d", i+1, j+1) + if serverArgs.StoreSpecs[j].StickyVFSID == "" { + serverArgs.StoreSpecs[j].StickyVFSID = fmt.Sprintf("auto-node%d-store%d", i+1, j+1) } } } @@ -1666,7 +1666,7 @@ func (tc *TestCluster) RestartServerWithInspect(idx int, inspect func(s *server. } for i, specs := range serverArgs.StoreSpecs { - if specs.InMemory && specs.StickyInMemoryEngineID == "" { + if specs.InMemory && specs.StickyVFSID == "" { return errors.Errorf("failed to restart Server %d, because a restart can only be used on a server with a sticky engine", i) } } diff --git a/pkg/testutils/testcluster/testcluster_test.go b/pkg/testutils/testcluster/testcluster_test.go index de21245b6ff2..0dbf881dfa8a 100644 --- a/pkg/testutils/testcluster/testcluster_test.go +++ b/pkg/testutils/testcluster/testcluster_test.go @@ -274,8 +274,7 @@ func TestStopServer(t *testing.T) { func TestRestart(t *testing.T) { defer leaktest.AfterTest(t)() - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -285,13 +284,13 @@ func TestRestart(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: "TestRestart" + strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: "TestRestart" + strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, } diff --git a/pkg/util/startup/startup_test.go b/pkg/util/startup/startup_test.go index 9a4512665570..7c305e6b0502 100644 --- a/pkg/util/startup/startup_test.go +++ b/pkg/util/startup/startup_test.go @@ -139,8 +139,7 @@ func runCircuitBreakerTestForKey( lReg := listenerutil.NewListenerRegistry() defer lReg.Close() - reg := server.NewStickyInMemEnginesRegistry() - defer reg.CloseAllStickyInMemEngines() + reg := server.NewStickyVFSRegistry() // TODO: Disable expiration based leases metamorphism since it currently // breaks closed timestamps and prevent rangefeeds from advancing checkpoint @@ -158,7 +157,7 @@ func runCircuitBreakerTestForKey( Settings: st, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: reg, + StickyVFSRegistry: reg, }, SpanConfig: &spanconfig.TestingKnobs{ ConfigureScratchRange: true, @@ -166,8 +165,8 @@ func runCircuitBreakerTestForKey( }, StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Listener: lReg.MustGetOrCreate(t, i),