diff --git a/pkg/base/store_spec.go b/pkg/base/store_spec.go index 400f68ee8442..d121f3ecc78f 100644 --- a/pkg/base/store_spec.go +++ b/pkg/base/store_spec.go @@ -249,12 +249,11 @@ type StoreSpec struct { BallastSize *SizeSpec InMemory bool Attributes roachpb.Attributes - // StickyInMemoryEngineID is a unique identifier associated with a given - // store which will remain in memory even after the default Engine close - // until it has been explicitly cleaned up by CleanupStickyInMemEngine[s] - // or the process has been terminated. - // This only applies to in-memory storage engine. - StickyInMemoryEngineID string + // StickyVFSID is a unique identifier associated with a given store which + // will preserve the in-memory virtual file system (VFS) even after the + // storage engine has been closed. This only applies to in-memory storage + // engine. + StickyVFSID string // UseFileRegistry is true if the "file registry" store version is desired. // This is set by CCL code when encryption-at-rest is in use. UseFileRegistry bool diff --git a/pkg/cli/debug_recover_loss_of_quorum_test.go b/pkg/cli/debug_recover_loss_of_quorum_test.go index aed90be50b5c..845854d050d4 100644 --- a/pkg/cli/debug_recover_loss_of_quorum_test.go +++ b/pkg/cli/debug_recover_loss_of_quorum_test.go @@ -332,19 +332,18 @@ func TestStageVersionCheck(t *testing.T) { }) defer c.Cleanup() - storeReg := server.NewStickyInMemEnginesRegistry() - defer storeReg.CloseAllStickyInMemEngines() + storeReg := server.NewStickyVFSRegistry() tc := testcluster.NewTestCluster(t, 4, base.TestClusterArgs{ ReplicationMode: base.ReplicationManual, ServerArgsPerNode: map[int]base.TestServerArgs{ 0: { Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: storeReg, + StickyVFSRegistry: storeReg, }, }, StoreSpecs: []base.StoreSpec{ - {InMemory: true, StickyInMemoryEngineID: "1"}, + {InMemory: true, StickyVFSID: "1"}, }, }, }, @@ -385,7 +384,7 @@ func TestStageVersionCheck(t *testing.T) { }) require.NoError(t, err, "force local must fix incorrect version") // Check that stored plan has version matching cluster version. - fs, err := storeReg.GetUnderlyingFS(base.StoreSpec{InMemory: true, StickyInMemoryEngineID: "1"}) + fs, err := storeReg.Get(base.StoreSpec{InMemory: true, StickyVFSID: "1"}) require.NoError(t, err, "failed to get shared store fs") ps := loqrecovery.NewPlanStore("", fs) p, ok, err := ps.LoadPlan() @@ -441,9 +440,6 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) { listenerReg := listenerutil.NewListenerRegistry() defer listenerReg.Close() - storeReg := server.NewStickyInMemEnginesRegistry() - defer storeReg.CloseAllStickyInMemEngines() - // Test cluster contains 3 nodes that we would turn into a single node // cluster using loss of quorum recovery. To do that, we will terminate // two nodes and run recovery on remaining one. Restarting node should @@ -459,7 +455,7 @@ func TestHalfOnlineLossOfQuorumRecovery(t *testing.T) { sa[i] = base.TestServerArgs{ Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: storeReg, + StickyVFSRegistry: server.NewStickyVFSRegistry(), }, }, StoreSpecs: []base.StoreSpec{ diff --git a/pkg/cli/democluster/demo_cluster.go b/pkg/cli/democluster/demo_cluster.go index d4bb253d5f05..77d6a0c094de 100644 --- a/pkg/cli/democluster/demo_cluster.go +++ b/pkg/cli/democluster/demo_cluster.go @@ -86,7 +86,7 @@ type transientCluster struct { adminPassword string adminUser username.SQLUsername - stickyEngineRegistry server.StickyInMemEnginesRegistry + stickyVFSRegistry server.StickyVFSRegistry getAdminClient func(ctx context.Context, cfg server.Config) (serverpb.AdminClient, func(), error) drainAndShutdown func(ctx context.Context, adminClient serverpb.AdminClient) error @@ -212,7 +212,7 @@ func NewDemoCluster( } } - c.stickyEngineRegistry = server.NewStickyInMemEnginesRegistry() + c.stickyVFSRegistry = server.NewStickyVFSRegistry() return c, nil } @@ -316,7 +316,7 @@ func (c *transientCluster) Start(ctx context.Context) (err error) { // individual servers' Stop() methods have been registered // via createAndAddNode() above. c.stopper.AddCloser(stop.CloserFn(func() { - c.stickyEngineRegistry.CloseAllStickyInMemEngines() + c.stickyVFSRegistry.CloseAllEngines() })) // Start the remaining nodes asynchronously. @@ -600,7 +600,7 @@ func (c *transientCluster) createAndAddNode( } args := c.demoCtx.testServerArgsForTransientCluster( socketDetails, idx, joinAddr, c.demoDir, - c.stickyEngineRegistry, + c.stickyVFSRegistry, ) if idx == 0 { // The first node also auto-inits the cluster. @@ -871,11 +871,11 @@ func (demoCtx *Context) testServerArgsForTransientCluster( serverIdx int, joinAddr string, demoDir string, - stickyEngineRegistry server.StickyInMemEnginesRegistry, + stickyVFSRegistry server.StickyVFSRegistry, ) base.TestServerArgs { // Assign a path to the store spec, to be saved. storeSpec := base.DefaultTestStoreSpec - storeSpec.StickyInMemoryEngineID = fmt.Sprintf("demo-server%d", serverIdx) + storeSpec.StickyVFSID = fmt.Sprintf("demo-server%d", serverIdx) args := base.TestServerArgs{ SocketFile: sock.filename(), @@ -895,7 +895,7 @@ func (demoCtx *Context) testServerArgsForTransientCluster( Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, JobsTestingKnobs: &jobs.TestingKnobs{ // Allow the scheduler daemon to start earlier in demo. @@ -1142,7 +1142,7 @@ func (c *transientCluster) startServerInternal( socketDetails, serverIdx, c.firstServer.AdvRPCAddr(), c.demoDir, - c.stickyEngineRegistry) + c.stickyVFSRegistry) s, err := server.TestServerFactory.New(args) if err != nil { return 0, err diff --git a/pkg/cli/democluster/demo_cluster_test.go b/pkg/cli/democluster/demo_cluster_test.go index 6a594c94b6a7..bb5e4e0eb728 100644 --- a/pkg/cli/democluster/demo_cluster_test.go +++ b/pkg/cli/democluster/demo_cluster_test.go @@ -51,7 +51,7 @@ func TestTestServerArgsForTransientCluster(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEnginesRegistry := server.NewStickyInMemEnginesRegistry() + stickyVFSRegistry := server.NewStickyVFSRegistry() testCases := []struct { serverIdx int @@ -81,7 +81,7 @@ func TestTestServerArgsForTransientCluster(t *testing.T) { EnableDemoLoginEndpoint: true, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEnginesRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, }, @@ -106,7 +106,7 @@ func TestTestServerArgsForTransientCluster(t *testing.T) { EnableDemoLoginEndpoint: true, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEnginesRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, }, @@ -120,7 +120,7 @@ func TestTestServerArgsForTransientCluster(t *testing.T) { demoCtx.CacheSize = tc.cacheSize demoCtx.SQLPort = 1234 demoCtx.HTTPPort = 4567 - actual := demoCtx.testServerArgsForTransientCluster(unixSocketDetails{}, tc.serverIdx, tc.joinAddr, "", stickyEnginesRegistry) + actual := demoCtx.testServerArgsForTransientCluster(unixSocketDetails{}, tc.serverIdx, tc.joinAddr, "", stickyVFSRegistry) stopper := actual.Stopper defer stopper.Stop(context.Background()) @@ -128,7 +128,7 @@ func TestTestServerArgsForTransientCluster(t *testing.T) { assert.Equal( t, fmt.Sprintf("demo-server%d", tc.serverIdx), - actual.StoreSpecs[0].StickyInMemoryEngineID, + actual.StoreSpecs[0].StickyVFSID, ) // We cannot compare these. @@ -169,13 +169,13 @@ func TestTransientClusterSimulateLatencies(t *testing.T) { // Setup the transient cluster. c := transientCluster{ - demoCtx: demoCtx, - stopper: stop.NewStopper(), - demoDir: certsDir, - stickyEngineRegistry: server.NewStickyInMemEnginesRegistry(), - infoLog: log.Infof, - warnLog: log.Warningf, - shoutLog: log.Ops.Shoutf, + demoCtx: demoCtx, + stopper: stop.NewStopper(), + demoDir: certsDir, + stickyVFSRegistry: server.NewStickyVFSRegistry(), + infoLog: log.Infof, + warnLog: log.Warningf, + shoutLog: log.Ops.Shoutf, } // Stop the cluster when the test exits, including when it fails. // This also calls the Stop() method on the stopper, and thus @@ -281,13 +281,13 @@ func TestTransientClusterMultitenant(t *testing.T) { // Setup the transient cluster. c := transientCluster{ - demoCtx: demoCtx, - stopper: stop.NewStopper(), - demoDir: certsDir, - stickyEngineRegistry: server.NewStickyInMemEnginesRegistry(), - infoLog: log.Infof, - warnLog: log.Warningf, - shoutLog: log.Ops.Shoutf, + demoCtx: demoCtx, + stopper: stop.NewStopper(), + demoDir: certsDir, + stickyVFSRegistry: server.NewStickyVFSRegistry(), + infoLog: log.Infof, + warnLog: log.Warningf, + shoutLog: log.Ops.Shoutf, } // Stop the cluster when the test exits, including when it fails. // This also calls the Stop() method on the stopper, and thus diff --git a/pkg/kv/kvserver/client_lease_test.go b/pkg/kv/kvserver/client_lease_test.go index a85dbd5364d9..999d4d9eb3c8 100644 --- a/pkg/kv/kvserver/client_lease_test.go +++ b/pkg/kv/kvserver/client_lease_test.go @@ -783,8 +783,7 @@ func TestLeasePreferencesRebalance(t *testing.T) { func TestLeaseholderRelocate(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyRegistry.CloseAllStickyInMemEngines() + stickyRegistry := server.NewStickyVFSRegistry() ctx := context.Background() manualClock := hlc.NewHybridManualClock() @@ -809,14 +808,14 @@ func TestLeaseholderRelocate(t *testing.T) { Locality: localities[i], Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - WallClock: manualClock, - StickyEngineRegistry: stickyRegistry, + WallClock: manualClock, + StickyVFSRegistry: stickyRegistry, }, }, StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, } @@ -919,8 +918,7 @@ func TestLeasePreferencesDuringOutage(t *testing.T) { skip.WithIssue(t, 88769, "flaky test") defer log.Scope(t).Close(t) - stickyRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyRegistry.CloseAllStickyInMemEngines() + stickyRegistry := server.NewStickyVFSRegistry() ctx := context.Background() manualClock := hlc.NewHybridManualClock() // Place all the leases in the us. @@ -956,7 +954,7 @@ func TestLeasePreferencesDuringOutage(t *testing.T) { Server: &server.TestingKnobs{ WallClock: manualClock, DefaultZoneConfigOverride: &zcfg, - StickyEngineRegistry: stickyRegistry, + StickyVFSRegistry: stickyRegistry, }, Store: &kvserver.StoreTestingKnobs{ // The Raft leadership may not end up on the eu node, but it needs to @@ -966,8 +964,8 @@ func TestLeasePreferencesDuringOutage(t *testing.T) { }, StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, } @@ -1128,8 +1126,7 @@ func TestLeasesDontThrashWhenNodeBecomesSuspect(t *testing.T) { kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, false) // override metamorphism // Speed up lease transfers. - stickyRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyRegistry.CloseAllStickyInMemEngines() + stickyRegistry := server.NewStickyVFSRegistry() manualClock := hlc.NewHybridManualClock() serverArgs := make(map[int]base.TestServerArgs) numNodes := 4 @@ -1141,13 +1138,13 @@ func TestLeasesDontThrashWhenNodeBecomesSuspect(t *testing.T) { Server: &server.TestingKnobs{ WallClock: manualClock, DefaultZoneConfigOverride: &zcfg, - StickyEngineRegistry: stickyRegistry, + StickyVFSRegistry: stickyRegistry, }, }, StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, } diff --git a/pkg/kv/kvserver/client_metrics_test.go b/pkg/kv/kvserver/client_metrics_test.go index 85333117b59a..470e4e22ea98 100644 --- a/pkg/kv/kvserver/client_metrics_test.go +++ b/pkg/kv/kvserver/client_metrics_test.go @@ -266,8 +266,8 @@ func TestStoreMetrics(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEngines) + defer stickyVFSRegistry.CloseAllEngines() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) for i := 0; i < numServers; i++ { @@ -275,8 +275,8 @@ func TestStoreMetrics(t *testing.T) { CacheSize: 1 << 20, /* 1 MiB */ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), // Specify a size to trigger the BlockCache in Pebble. Size: base.SizeSpec{ InBytes: 512 << 20, /* 512 MiB */ @@ -285,7 +285,7 @@ func TestStoreMetrics(t *testing.T) { }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, Store: &kvserver.StoreTestingKnobs{ DisableRaftLogQueue: true, diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 47e7283edf13..40fd62a36604 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -95,8 +95,7 @@ func TestStoreRecoverFromEngine(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -108,13 +107,13 @@ func TestStoreRecoverFromEngine(t *testing.T) { ServerArgs: base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: "1", + InMemory: true, + StickyVFSID: "1", }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, }, @@ -204,8 +203,7 @@ func TestStoreRecoverWithErrors(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -220,7 +218,7 @@ func TestStoreRecoverWithErrors(t *testing.T) { ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, Store: &kvserver.StoreTestingKnobs{ EvalKnobs: kvserverbase.BatchEvalTestingKnobs{ @@ -236,8 +234,8 @@ func TestStoreRecoverWithErrors(t *testing.T) { }, StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: "1", + InMemory: true, + StickyVFSID: "1", }, }, }, @@ -350,8 +348,7 @@ func TestRestoreReplicas(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -361,13 +358,13 @@ func TestRestoreReplicas(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, } @@ -673,8 +670,10 @@ func TestSnapshotAfterTruncation(t *testing.T) { name = "differentTerm" } t.Run(name, func(t *testing.T) { - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + // TODO(jackson): Currently this test uses ReuseEngines because + // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. + stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEngines) + defer stickyVFSRegistry.CloseAllEngines() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -684,13 +683,13 @@ func TestSnapshotAfterTruncation(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, } @@ -1722,8 +1721,10 @@ func TestConcurrentRaftSnapshots(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + // TODO(jackson): Currently this test uses ReuseEngines because + // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. + stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEngines) + defer stickyVFSRegistry.CloseAllEngines() const numServers int = 5 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -1731,13 +1732,13 @@ func TestConcurrentRaftSnapshots(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, } @@ -1808,8 +1809,7 @@ func TestReplicateAfterRemoveAndSplit(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -1817,13 +1817,13 @@ func TestReplicateAfterRemoveAndSplit(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, Store: &kvserver.StoreTestingKnobs{ // Disable the replica GC queue so that it doesn't accidentally pick up the @@ -1890,8 +1890,7 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() raftConfig := base.RaftConfig{ // Drop the raft tick interval so the Raft group is ticked more. @@ -1913,14 +1912,14 @@ func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, RaftConfig: raftConfig, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, Store: &kvserver.StoreTestingKnobs{ // Disable leader transfers during leaseholder changes so that we @@ -2213,8 +2212,10 @@ func TestProgressWithDownNode(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + // TODO(jackson): Currently this test uses ReuseEngines because + // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. + stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEngines) + defer stickyVFSRegistry.CloseAllEngines() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -2222,13 +2223,13 @@ func TestProgressWithDownNode(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, } @@ -2298,22 +2299,22 @@ func runReplicateRestartAfterTruncation(t *testing.T, removeBeforeTruncateAndReA ctx := context.Background() manualClock := hlc.NewHybridManualClock() - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEngines) + defer stickyVFSRegistry.CloseAllEngines() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) for i := 0; i < numServers; i++ { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, - WallClock: manualClock, + StickyVFSRegistry: stickyVFSRegistry, + WallClock: manualClock, }, }, RaftConfig: base.RaftConfig{ @@ -2403,8 +2404,10 @@ func testReplicaAddRemove(t *testing.T, addFirst bool) { ctx := context.Background() manualClock := hlc.NewHybridManualClock() - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + // TODO(jackson): Currently this test uses ReuseEngines because + // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. + stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEngines) + defer stickyVFSRegistry.CloseAllEngines() const numServers int = 4 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -2412,14 +2415,14 @@ func testReplicaAddRemove(t *testing.T, addFirst bool) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, - WallClock: manualClock, + StickyVFSRegistry: stickyVFSRegistry, + WallClock: manualClock, }, Store: &kvserver.StoreTestingKnobs{ // We're gonna want to validate the state of the store before and @@ -3505,8 +3508,10 @@ func TestReplicateRogueRemovedNode(t *testing.T) { ctx := context.Background() manualClock := hlc.NewHybridManualClock() - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + // TODO(jackson): Currently this test uses ReuseEngines because + // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. + stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEngines) + defer stickyVFSRegistry.CloseAllEngines() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -3514,14 +3519,14 @@ func TestReplicateRogueRemovedNode(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, - WallClock: manualClock, + StickyVFSRegistry: stickyVFSRegistry, + WallClock: manualClock, }, Store: &kvserver.StoreTestingKnobs{ // Newly-started stores (including the "rogue" one) should not GC @@ -3852,8 +3857,10 @@ func TestReplicaTooOldGC(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + // TODO(jackson): Currently this test uses ReuseEngines because + // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. + stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEngines) + defer stickyVFSRegistry.CloseAllEngines() const numServers int = 4 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -3861,13 +3868,13 @@ func TestReplicaTooOldGC(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, Store: &kvserver.StoreTestingKnobs{ DisableScanner: true, @@ -3952,8 +3959,10 @@ func TestReplicateReAddAfterDown(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + // TODO(jackson): Currently this test uses ReuseEngines because + // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. + stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEngines) + defer stickyVFSRegistry.CloseAllEngines() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -3961,13 +3970,13 @@ func TestReplicateReAddAfterDown(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, } @@ -4834,8 +4843,7 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -4884,14 +4892,14 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing Settings: st, StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - ContextTestingKnobs: knobs, - StickyEngineRegistry: stickyEngineRegistry, + ContextTestingKnobs: knobs, + StickyVFSRegistry: stickyVFSRegistry, }, }, } @@ -5230,9 +5238,9 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { keyA, keyB roachpb.Key, lhsID roachpb.RangeID, lhsPartition *testClusterPartitionedRange, - stickyEngineRegistry server.StickyInMemEnginesRegistry, + stickyVFSRegistry server.StickyVFSRegistry, ) { - stickyEngineRegistry = server.NewStickyInMemEnginesRegistry() + stickyVFSRegistry = server.NewStickyVFSRegistry() lisReg := listenerutil.NewListenerRegistry() const numServers int = 3 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -5240,13 +5248,13 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, Store: &kvserver.StoreTestingKnobs{ // Newly-started stores (including the "rogue" one) should not GC @@ -5300,7 +5308,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { increment(t, db, keyA, 1) tc.WaitForValues(t, keyA, []int64{6, 7, 7}) - return tc, db, keyA, keyB, lhsID, lhsPartition, stickyEngineRegistry + return tc, db, keyA, keyB, lhsID, lhsPartition, stickyVFSRegistry } // In this case we only have the LHS partitioned. The RHS will learn about its @@ -5309,8 +5317,8 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // partition the RHS and ensure that the split does not clobber the RHS's hard // state. t.Run("(1) no RHS partition", func(t *testing.T) { - tc, db, keyA, keyB, _, lhsPartition, stickyEngineRegistry := setup(t) - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + tc, db, keyA, keyB, _, lhsPartition, stickyVFSRegistry := setup(t) + defer stickyVFSRegistry.CloseAllEngines() defer tc.Stopper().Stop(ctx) tc.SplitRangeOrFatal(t, keyB) @@ -5362,8 +5370,8 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // This case is like the previous case except the store crashes after // laying down a tombstone. t.Run("(2) no RHS partition, with restart", func(t *testing.T) { - tc, db, keyA, keyB, _, lhsPartition, stickyEngineRegistry := setup(t) - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + tc, db, keyA, keyB, _, lhsPartition, stickyVFSRegistry := setup(t) + defer stickyVFSRegistry.CloseAllEngines() defer tc.Stopper().Stop(ctx) tc.SplitRangeOrFatal(t, keyB) @@ -5435,8 +5443,8 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // the split is processed. We partition the RHS's new replica ID before // processing the split to ensure that the RHS doesn't get initialized. t.Run("(3) initial replica RHS partition, no restart", func(t *testing.T) { - tc, db, keyA, keyB, _, lhsPartition, stickyEngineRegistry := setup(t) - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + tc, db, keyA, keyB, _, lhsPartition, stickyVFSRegistry := setup(t) + defer stickyVFSRegistry.CloseAllEngines() defer tc.Stopper().Stop(ctx) var rhsPartition *testClusterPartitionedRange partitionReplicaOnSplit(t, tc, keyB, lhsPartition, &rhsPartition) @@ -5497,8 +5505,8 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) { // its higher replica ID the store crashes and forgets. The RHS replica gets // initialized by the split. t.Run("(4) initial replica RHS partition, with restart", func(t *testing.T) { - tc, db, keyA, keyB, _, lhsPartition, stickyEngineRegistry := setup(t) - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + tc, db, keyA, keyB, _, lhsPartition, stickyVFSRegistry := setup(t) + defer stickyVFSRegistry.CloseAllEngines() defer tc.Stopper().Stop(ctx) var rhsPartition *testClusterPartitionedRange @@ -5616,9 +5624,7 @@ func TestElectionAfterRestart(t *testing.T) { const electionTimeoutTicks = 30 const raftTickInterval = 200 * time.Millisecond - r := server.NewStickyInMemEnginesRegistry() - defer r.CloseAllStickyInMemEngines() - + r := server.NewStickyVFSRegistry() newTCArgs := func(parallel bool, replMode base.TestClusterReplicationMode, onTimeoutCampaign func(roachpb.RangeID)) base.TestClusterArgs { return base.TestClusterArgs{ ReplicationMode: replMode, @@ -5630,7 +5636,7 @@ func TestElectionAfterRestart(t *testing.T) { }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: r, + StickyVFSRegistry: r, }, Store: &kvserver.StoreTestingKnobs{ OnRaftTimeoutCampaign: onTimeoutCampaign, diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index 44e6667e419a..be42f2167dab 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -802,7 +802,7 @@ func setupCircuitBreakerTest(t *testing.T) *circuitBreakerTest { raftCfg.SetDefaults() raftCfg.RaftHeartbeatIntervalTicks = 1 raftCfg.RaftElectionTimeoutTicks = 2 - reg := server.NewStickyInMemEnginesRegistry() + reg := server.NewStickyVFSRegistry() args := base.TestClusterArgs{ ReplicationMode: base.ReplicationManual, ServerArgs: base.TestServerArgs{ @@ -810,15 +810,15 @@ func setupCircuitBreakerTest(t *testing.T) *circuitBreakerTest { RaftConfig: raftCfg, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - WallClock: manualClock, - StickyEngineRegistry: reg, + WallClock: manualClock, + StickyVFSRegistry: reg, }, Store: storeKnobs, }, }, } tc := testcluster.StartTestCluster(t, 2, args) - tc.Stopper().AddCloser(stop.CloserFn(reg.CloseAllStickyInMemEngines)) + tc.Stopper().AddCloser(stop.CloserFn(reg.CloseAllEngines)) _, err := tc.ServerConn(0).Exec(`SET CLUSTER SETTING kv.replica_circuit_breaker.slow_replication_threshold = '45s'`) require.NoError(t, err) diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 9837ffc9c4cc..3ca5bfcade23 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -2157,8 +2157,7 @@ func TestLeaseNotUsedAfterRestart(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -2172,14 +2171,14 @@ func TestLeaseNotUsedAfterRestart(t *testing.T) { ServerArgs: base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: "1", + InMemory: true, + StickyVFSID: "1", }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - WallClock: manual, - StickyEngineRegistry: stickyEngineRegistry, + WallClock: manual, + StickyVFSRegistry: stickyVFSRegistry, }, Store: &kvserver.StoreTestingKnobs{ LeaseRequestEvent: func(ts hlc.Timestamp, _ roachpb.StoreID, _ roachpb.RangeID) *kvpb.Error { @@ -4683,8 +4682,7 @@ func TestTenantID(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() ctx := context.Background() // Create a config with a sticky-in-mem engine so we can restart the server. // We also configure the settings to be as robust as possible to problems @@ -4694,13 +4692,13 @@ func TestTenantID(t *testing.T) { Insecure: true, StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: "1", + InMemory: true, + StickyVFSID: "1", }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, } diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index 36ff72934853..de17aa19e080 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -1391,7 +1391,7 @@ func TestStoreRangeSplitBackpressureWrites(t *testing.T) { // See https://github.com/cockroachdb/cockroach/issues/1644. func runSetupSplitSnapshotRace( t *testing.T, - stickyEnginesRegistry server.StickyInMemEnginesRegistry, + stickyVFSRegistry server.StickyVFSRegistry, testFn func(*testcluster.TestCluster, roachpb.Key, roachpb.Key), ) { const numServers int = 6 @@ -1400,13 +1400,13 @@ func runSetupSplitSnapshotRace( stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEnginesRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, Store: &kvserver.StoreTestingKnobs{ DisableGCQueue: true, @@ -1541,10 +1541,12 @@ func TestSplitSnapshotRace_SplitWins(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + // TODO(jackson): Currently this test uses ReuseEngines because + // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. + stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEngines) + defer stickyVFSRegistry.CloseAllEngines() - runSetupSplitSnapshotRace(t, stickyEngineRegistry, func(tc *testcluster.TestCluster, leftKey, rightKey roachpb.Key) { + runSetupSplitSnapshotRace(t, stickyVFSRegistry, func(tc *testcluster.TestCluster, leftKey, rightKey roachpb.Key) { // Bring the left range up first so that the split happens before it sees a snapshot. for i := 1; i <= 3; i++ { require.NoError(t, tc.RestartServer(i)) @@ -1578,10 +1580,12 @@ func TestSplitSnapshotRace_SnapshotWins(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + // TODO(jackson): Currently this test uses ReuseEngines because + // `tc.WaitForValues` will try to read from a closed Engine otherwise; fix. + stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEngines) + defer stickyVFSRegistry.CloseAllEngines() - runSetupSplitSnapshotRace(t, stickyEngineRegistry, func(tc *testcluster.TestCluster, leftKey, rightKey roachpb.Key) { + runSetupSplitSnapshotRace(t, stickyVFSRegistry, func(tc *testcluster.TestCluster, leftKey, rightKey roachpb.Key) { // Bring the right range up first. for i := 3; i <= 5; i++ { require.NoError(t, tc.RestartServer(i)) diff --git a/pkg/kv/kvserver/client_store_test.go b/pkg/kv/kvserver/client_store_test.go index 48deba083f7b..d9918f70aafe 100644 --- a/pkg/kv/kvserver/client_store_test.go +++ b/pkg/kv/kvserver/client_store_test.go @@ -119,8 +119,7 @@ func TestStoreLoadReplicaQuiescent(t *testing.T) { defer log.Scope(t).Close(t) testutils.RunTrueAndFalse(t, "kv.expiration_leases_only.enabled", func(t *testing.T, expOnly bool) { - storeReg := server.NewStickyInMemEnginesRegistry() - defer storeReg.CloseAllStickyInMemEngines() + storeReg := server.NewStickyVFSRegistry() listenerReg := listenerutil.NewListenerRegistry() defer listenerReg.Close() @@ -138,7 +137,7 @@ func TestStoreLoadReplicaQuiescent(t *testing.T) { }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: storeReg, + StickyVFSRegistry: storeReg, }, Store: &kvserver.StoreTestingKnobs{ DisableScanner: true, @@ -146,8 +145,8 @@ func TestStoreLoadReplicaQuiescent(t *testing.T) { }, StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: "test", + InMemory: true, + StickyVFSID: "test", }, }, }, diff --git a/pkg/kv/kvserver/consistency_queue_test.go b/pkg/kv/kvserver/consistency_queue_test.go index bd53ed544dc1..1ba17a49c3b1 100644 --- a/pkg/kv/kvserver/consistency_queue_test.go +++ b/pkg/kv/kvserver/consistency_queue_test.go @@ -248,8 +248,7 @@ func TestCheckConsistencyInconsistent(t *testing.T) { // Test uses sticky registry to have persistent pebble state that could // be analyzed for existence of snapshots and to verify snapshot content // after failures. - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() // The cluster has 3 nodes, one store per node. The test writes a few KVs to a // range, which gets replicated to all 3 stores. Then it manually replaces an @@ -270,11 +269,11 @@ func TestCheckConsistencyInconsistent(t *testing.T) { serverArgsPerNode[i] = base.TestServerArgs{ Knobs: base.TestingKnobs{ Store: &testKnobs, - Server: &server.TestingKnobs{StickyEngineRegistry: stickyEngineRegistry}, + Server: &server.TestingKnobs{StickyVFSRegistry: stickyVFSRegistry}, }, StoreSpecs: []base.StoreSpec{{ - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }}, } } @@ -307,8 +306,8 @@ func TestCheckConsistencyInconsistent(t *testing.T) { } onDiskCheckpointPaths := func(nodeIdx int) []string { - fs, err := stickyEngineRegistry.GetUnderlyingFS( - base.StoreSpec{StickyInMemoryEngineID: strconv.FormatInt(int64(nodeIdx), 10)}) + fs, err := stickyVFSRegistry.Get( + base.StoreSpec{StickyVFSID: strconv.FormatInt(int64(nodeIdx), 10)}) require.NoError(t, err) store := tc.GetFirstStoreFromServer(t, nodeIdx) checkpointPath := filepath.Join(store.TODOEngine().GetAuxiliaryDir(), "checkpoints") @@ -388,7 +387,7 @@ func TestCheckConsistencyInconsistent(t *testing.T) { // Create a new store on top of checkpoint location inside existing in-mem // VFS to verify its contents. - fs, err := stickyEngineRegistry.GetUnderlyingFS(base.StoreSpec{StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10)}) + fs, err := stickyVFSRegistry.Get(base.StoreSpec{StickyVFSID: strconv.FormatInt(int64(i), 10)}) require.NoError(t, err) cpEng := storage.InMemFromFS(context.Background(), fs, cps[0], cluster.MakeClusterSettings(), storage.ForTesting, storage.MustExist, storage.ReadOnly, storage.CacheSize(1<<20)) diff --git a/pkg/kv/kvserver/flow_control_integration_test.go b/pkg/kv/kvserver/flow_control_integration_test.go index 0ff1c2f63e65..ae08f62c231e 100644 --- a/pkg/kv/kvserver/flow_control_integration_test.go +++ b/pkg/kv/kvserver/flow_control_integration_test.go @@ -763,8 +763,10 @@ func TestFlowControlRaftSnapshot(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + // TODO(jackson): It's unclear if this test can be refactor to mit the + // ReuseEngines option. + stickyVFSRegistry := server.NewStickyVFSRegistry(server.ReuseEngines) + defer stickyVFSRegistry.CloseAllEngines() const numServers int = 5 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -783,8 +785,8 @@ func TestFlowControlRaftSnapshot(t *testing.T) { Settings: st, StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, RaftConfig: base.RaftConfig{ @@ -794,7 +796,7 @@ func TestFlowControlRaftSnapshot(t *testing.T) { }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, Store: &kvserver.StoreTestingKnobs{ FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{ diff --git a/pkg/kv/kvserver/loqrecovery/server_integration_test.go b/pkg/kv/kvserver/loqrecovery/server_integration_test.go index 3326d6f1a500..3475c670e5f5 100644 --- a/pkg/kv/kvserver/loqrecovery/server_integration_test.go +++ b/pkg/kv/kvserver/loqrecovery/server_integration_test.go @@ -198,8 +198,7 @@ func TestGetPlanStagingState(t *testing.T) { ctx := context.Background() - tc, reg, planStores := prepTestCluster(t, 3) - defer reg.CloseAllStickyInMemEngines() + tc, _, planStores := prepTestCluster(t, 3) defer tc.Stopper().Stop(ctx) adm, err := tc.GetAdminClient(ctx, t, 0) @@ -260,8 +259,7 @@ func TestStageRecoveryPlans(t *testing.T) { ctx := context.Background() - tc, reg, _ := prepTestCluster(t, 3) - defer reg.CloseAllStickyInMemEngines() + tc, _, _ := prepTestCluster(t, 3) defer tc.Stopper().Stop(ctx) adm, err := tc.GetAdminClient(ctx, t, 0) @@ -301,8 +299,7 @@ func TestStageBadVersions(t *testing.T) { ctx := context.Background() - tc, reg, _ := prepTestCluster(t, 1) - defer reg.CloseAllStickyInMemEngines() + tc, _, _ := prepTestCluster(t, 1) defer tc.Stopper().Stop(ctx) adm, err := tc.GetAdminClient(ctx, t, 0) @@ -330,8 +327,7 @@ func TestStageConflictingPlans(t *testing.T) { ctx := context.Background() - tc, reg, _ := prepTestCluster(t, 3) - defer reg.CloseAllStickyInMemEngines() + tc, _, _ := prepTestCluster(t, 3) defer tc.Stopper().Stop(ctx) adm, err := tc.GetAdminClient(ctx, t, 0) @@ -370,8 +366,7 @@ func TestForcePlanUpdate(t *testing.T) { ctx := context.Background() - tc, reg, _ := prepTestCluster(t, 3) - defer reg.CloseAllStickyInMemEngines() + tc, _, _ := prepTestCluster(t, 3) defer tc.Stopper().Stop(ctx) adm, err := tc.GetAdminClient(ctx, t, 0) @@ -412,8 +407,7 @@ func TestNodeDecommissioned(t *testing.T) { ctx := context.Background() - tc, reg, _ := prepTestCluster(t, 3) - defer reg.CloseAllStickyInMemEngines() + tc, _, _ := prepTestCluster(t, 3) defer tc.Stopper().Stop(ctx) adm, err := tc.GetAdminClient(ctx, t, 0) @@ -445,8 +439,7 @@ func TestRejectDecommissionReachableNode(t *testing.T) { ctx := context.Background() - tc, reg, _ := prepTestCluster(t, 3) - defer reg.CloseAllStickyInMemEngines() + tc, _, _ := prepTestCluster(t, 3) defer tc.Stopper().Stop(ctx) adm, err := tc.GetAdminClient(ctx, t, 0) @@ -466,8 +459,7 @@ func TestStageRecoveryPlansToWrongCluster(t *testing.T) { ctx := context.Background() - tc, reg, _ := prepTestCluster(t, 3) - defer reg.CloseAllStickyInMemEngines() + tc, _, _ := prepTestCluster(t, 3) defer tc.Stopper().Stop(ctx) adm, err := tc.GetAdminClient(ctx, t, 0) @@ -498,8 +490,7 @@ func TestRetrieveRangeStatus(t *testing.T) { ctx := context.Background() - tc, reg, _ := prepTestCluster(t, 5) - defer reg.CloseAllStickyInMemEngines() + tc, _, _ := prepTestCluster(t, 5) defer tc.Stopper().Stop(ctx) // Use scratch range to ensure we have a range that loses quorum. @@ -554,8 +545,7 @@ func TestRetrieveApplyStatus(t *testing.T) { ctx := context.Background() - tc, reg, _ := prepTestCluster(t, 5) - defer reg.CloseAllStickyInMemEngines() + tc, _, _ := prepTestCluster(t, 5) defer tc.Stopper().Stop(ctx) // Use scratch range to ensure we have a range that loses quorum. @@ -651,8 +641,7 @@ func TestRejectBadVersionApplication(t *testing.T) { ctx := context.Background() - tc, reg, pss := prepTestCluster(t, 3) - defer reg.CloseAllStickyInMemEngines() + tc, _, pss := prepTestCluster(t, 3) defer tc.Stopper().Stop(ctx) adm, err := tc.GetAdminClient(ctx, t, 0) @@ -689,10 +678,10 @@ func TestRejectBadVersionApplication(t *testing.T) { func prepTestCluster( t *testing.T, nodes int, -) (*testcluster.TestCluster, server.StickyInMemEnginesRegistry, map[int]loqrecovery.PlanStore) { +) (*testcluster.TestCluster, server.StickyVFSRegistry, map[int]loqrecovery.PlanStore) { skip.UnderStressRace(t, "cluster frequently fails to start under stress race") - reg := server.NewStickyInMemEnginesRegistry() + reg := server.NewStickyVFSRegistry() lReg := listenerutil.NewListenerRegistry() @@ -704,7 +693,7 @@ func prepTestCluster( args.ServerArgsPerNode[i] = base.TestServerArgs{ Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: reg, + StickyVFSRegistry: reg, }, SpanConfig: &spanconfig.TestingKnobs{ ConfigureScratchRange: true, @@ -712,8 +701,8 @@ func prepTestCluster( }, StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, } @@ -729,8 +718,8 @@ func prepInMemPlanStores( ) map[int]loqrecovery.PlanStore { pss := make(map[int]loqrecovery.PlanStore) for id, args := range serverArgs { - reg := args.Knobs.Server.(*server.TestingKnobs).StickyEngineRegistry - store, err := reg.GetUnderlyingFS(args.StoreSpecs[0]) + reg := args.Knobs.Server.(*server.TestingKnobs).StickyVFSRegistry + store, err := reg.Get(args.StoreSpecs[0]) require.NoError(t, err, "can't create loq recovery plan store") pss[id] = loqrecovery.NewPlanStore(".", store) } diff --git a/pkg/kv/kvserver/node_liveness_test.go b/pkg/kv/kvserver/node_liveness_test.go index a4ee4e75cd2d..656def51b891 100644 --- a/pkg/kv/kvserver/node_liveness_test.go +++ b/pkg/kv/kvserver/node_liveness_test.go @@ -151,8 +151,7 @@ func TestNodeLivenessInitialIncrement(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -164,13 +163,13 @@ func TestNodeLivenessInitialIncrement(t *testing.T) { ServerArgs: base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: "1", + InMemory: true, + StickyVFSID: "1", }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, }, @@ -465,8 +464,7 @@ func TestNodeLivenessRestart(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() const numServers int = 2 stickyServerArgs := make(map[int]base.TestServerArgs) @@ -474,13 +472,13 @@ func TestNodeLivenessRestart(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, } @@ -834,8 +832,7 @@ func TestNodeLivenessSetDraining(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -845,13 +842,13 @@ func TestNodeLivenessSetDraining(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, } @@ -1168,8 +1165,7 @@ func verifyNodeIsDecommissioning(t *testing.T, tc *testcluster.TestCluster, node } func testNodeLivenessSetDecommissioning(t *testing.T, decommissionNodeIdx int) { - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -1179,13 +1175,13 @@ func testNodeLivenessSetDecommissioning(t *testing.T, decommissionNodeIdx int) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, } diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index 5be58113966b..72d8adb178a9 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -2089,8 +2089,6 @@ func TestReplicateQueueAcquiresInvalidLeases(t *testing.T) { st := cluster.MakeTestingClusterSettings() kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, false) // override metamorphism - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -2109,7 +2107,7 @@ func TestReplicateQueueAcquiresInvalidLeases(t *testing.T) { ScanMaxIdleTime: time.Millisecond, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: server.NewStickyVFSRegistry(), DefaultZoneConfigOverride: &zcfg, }, }, diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index eea4f5485b35..b481a2c1e462 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -69,7 +69,7 @@ go_library( "statements.go", "status.go", "status_local_file_retrieval.go", - "sticky_engine.go", + "sticky_vfs.go", "stop_trigger.go", "tcp_keepalive_manager.go", "tenant.go", @@ -451,7 +451,7 @@ go_test( "statements_test.go", "stats_test.go", "status_ext_test.go", - "sticky_engine_test.go", + "sticky_vfs_test.go", "tenant_range_lookup_test.go", "testserver_test.go", "user_test.go", diff --git a/pkg/server/config.go b/pkg/server/config.go index 76438579cba0..8a29c24d4510 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -747,19 +747,19 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, error) { for i, spec := range cfg.Stores.Specs { log.Eventf(ctx, "initializing %+v", spec) - if spec.InMemory && spec.StickyInMemoryEngineID != "" { + if spec.InMemory && spec.StickyVFSID != "" { if cfg.TestingKnobs.Server == nil { return Engines{}, errors.AssertionFailedf("Could not create a sticky " + "engine no server knobs available to get a registry. " + - "Please use Knobs.Server.StickyEngineRegistry to provide one.") + "Please use Knobs.Server.StickyVFSRegistry to provide one.") } knobs := cfg.TestingKnobs.Server.(*TestingKnobs) - if knobs.StickyEngineRegistry == nil { + if knobs.StickyVFSRegistry == nil { return Engines{}, errors.Errorf("Could not create a sticky " + "engine no registry available. Please use " + - "Knobs.Server.StickyEngineRegistry to provide one.") + "Knobs.Server.StickyVFSRegistry to provide one.") } - eng, err := knobs.StickyEngineRegistry.GetOrCreateStickyInMemEngine(ctx, cfg, spec) + eng, err := knobs.StickyVFSRegistry.Open(ctx, cfg, spec) if err != nil { return Engines{}, err } diff --git a/pkg/server/loss_of_quorum.go b/pkg/server/loss_of_quorum.go index 050cb266893c..25b568392965 100644 --- a/pkg/server/loss_of_quorum.go +++ b/pkg/server/loss_of_quorum.go @@ -39,20 +39,20 @@ func newPlanStore(cfg Config) (loqrecovery.PlanStore, error) { path := spec.Path if spec.InMemory { path = "" - if spec.StickyInMemoryEngineID != "" { + if spec.StickyVFSID != "" { if cfg.TestingKnobs.Server == nil { return loqrecovery.PlanStore{}, errors.AssertionFailedf("Could not create a sticky " + "engine no server knobs available to get a registry. " + - "Please use Knobs.Server.StickyEngineRegistry to provide one.") + "Please use Knobs.Server.StickyVFSRegistry to provide one.") } knobs := cfg.TestingKnobs.Server.(*TestingKnobs) - if knobs.StickyEngineRegistry == nil { + if knobs.StickyVFSRegistry == nil { return loqrecovery.PlanStore{}, errors.Errorf("Could not create a sticky " + "engine no registry available. Please use " + - "Knobs.Server.StickyEngineRegistry to provide one.") + "Knobs.Server.StickyVFSRegistry to provide one.") } var err error - fs, err = knobs.StickyEngineRegistry.GetUnderlyingFS(spec) + fs, err = knobs.StickyVFSRegistry.Get(spec) if err != nil { return loqrecovery.PlanStore{}, err } diff --git a/pkg/server/multi_store_test.go b/pkg/server/multi_store_test.go index 3006504f5db5..a5f5ea1c84e3 100644 --- a/pkg/server/multi_store_test.go +++ b/pkg/server/multi_store_test.go @@ -46,8 +46,7 @@ func TestAddNewStoresToExistingNodes(t *testing.T) { ctx := context.Background() - ser := server.NewStickyInMemEnginesRegistry() - defer ser.CloseAllStickyInMemEngines() + ser := server.NewStickyVFSRegistry() const ( numNodes = 3 @@ -68,12 +67,12 @@ func TestAddNewStoresToExistingNodes(t *testing.T) { serverArgs := base.TestServerArgs{ DefaultTestTenant: base.TODOTestTenantDisabled, } - serverArgs.Knobs.Server = &server.TestingKnobs{StickyEngineRegistry: ser} + serverArgs.Knobs.Server = &server.TestingKnobs{StickyVFSRegistry: ser} for storeIdx := 0; storeIdx < numStoresPerNode; storeIdx++ { id := fmt.Sprintf("s%d.%d", srvIdx+1, storeIdx+1) serverArgs.StoreSpecs = append( serverArgs.StoreSpecs, - base.StoreSpec{InMemory: true, StickyInMemoryEngineID: id}, + base.StoreSpec{InMemory: true, StickyVFSID: id}, ) } tcArgs.ServerArgsPerNode[srvIdx] = serverArgs diff --git a/pkg/server/server_startup_test.go b/pkg/server/server_startup_test.go index 363d21c016b6..91fef928954e 100644 --- a/pkg/server/server_startup_test.go +++ b/pkg/server/server_startup_test.go @@ -40,8 +40,7 @@ func TestStartupInjectedFailureSingleNode(t *testing.T) { rng, seed := randutil.NewLockedTestRand() t.Log("TestStartupInjectedFailure random seed", seed) - reg := server.NewStickyInMemEnginesRegistry() - defer reg.CloseAllStickyInMemEngines() + reg := server.NewStickyVFSRegistry() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -51,13 +50,13 @@ func TestStartupInjectedFailureSingleNode(t *testing.T) { ServerArgs: base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: "1", + InMemory: true, + StickyVFSID: "1", }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: reg, + StickyVFSRegistry: reg, }, SpanConfig: &spanconfig.TestingKnobs{ // Ensure that scratch range has proper zone config, otherwise it is diff --git a/pkg/server/settings_cache_test.go b/pkg/server/settings_cache_test.go index 185e0fea812a..bb0e35fde520 100644 --- a/pkg/server/settings_cache_test.go +++ b/pkg/server/settings_cache_test.go @@ -64,16 +64,15 @@ func TestCachedSettingsServerRestart(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - stickyEngineRegistry := NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := NewStickyVFSRegistry() serverArgs := base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ - {InMemory: true, StickyInMemoryEngineID: "1"}, + {InMemory: true, StickyVFSID: "1"}, }, Knobs: base.TestingKnobs{ Server: &TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, } diff --git a/pkg/server/sticky_engine.go b/pkg/server/sticky_engine.go deleted file mode 100644 index 9532b631658d..000000000000 --- a/pkg/server/sticky_engine.go +++ /dev/null @@ -1,208 +0,0 @@ -// Copyright 2019 The Cockroach Authors. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package server - -import ( - "context" - - "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/errors" - "github.com/cockroachdb/pebble/vfs" -) - -// stickyInMemEngine extends a normal engine, but does not allow them to be -// closed using the normal Close() method, instead keeping the engines in -// memory until CloseAllStickyInMemEngines is called, hence being "sticky". -// This prevents users of the in memory engine from having to special -// case "sticky" engines on every instance of "Close". -// It is intended for use in demos and/or tests, where we want in-memory -// storage nodes to persist between killed nodes. -type stickyInMemEngine struct { - // id is the unique identifier for this sticky engine. - id string - // closed indicates whether the current engine has been closed. - closed bool - - // Engine extends the Engine interface. - storage.Engine - - // Underlying in-mem filesystem backing the engine - fs vfs.FS -} - -// StickyEngineRegistryConfigOption is a config option for a sticky engine -// registry that can be passed to NewStickyInMemEnginesRegistry. -type StickyEngineRegistryConfigOption func(cfg *stickyEngineRegistryConfig) - -// ReplaceEngines configures a sticky engine registry to return a new engine -// with the same underlying in-memory FS instead of simply reopening it in -// the case where it already exists. -var ReplaceEngines StickyEngineRegistryConfigOption = func(cfg *stickyEngineRegistryConfig) { - cfg.replaceEngines = true -} - -// StickyInMemEnginesRegistry manages the lifecycle of sticky engines. -type StickyInMemEnginesRegistry interface { - // GetOrCreateStickyInMemEngine returns an engine associated with the given id. - // It will create a new in-memory engine if one does not already exist. - // At most one engine with a given id can be active in - // "GetOrCreateStickyInMemEngine" at any given time. - // Note that if you re-create an existing sticky engine the new attributes - // and cache size will be ignored. - // One must Close() on the sticky engine before another can be fetched. - GetOrCreateStickyInMemEngine(ctx context.Context, cfg *Config, spec base.StoreSpec) (storage.Engine, error) - // GetUnderlyingFS returns FS backing in mem engine. If engine was not created - // error is returned. - GetUnderlyingFS(spec base.StoreSpec) (vfs.FS, error) - // CloseAllStickyInMemEngines closes all sticky in memory engines that were - // created by this registry. - CloseAllStickyInMemEngines() -} - -// stickyInMemEngine implements Engine. -var _ storage.Engine = &stickyInMemEngine{} - -// Close overwrites the default Engine interface to not close the underlying -// engine if called. We mark the state as closed to reflect a correct result -// in Closed(). -func (e *stickyInMemEngine) Close() { - e.closed = true -} - -// Closed overwrites the default Engine interface. -func (e *stickyInMemEngine) Closed() bool { - return e.closed -} - -// SetStoreID implements the StoreIDSetter interface. -func (e *stickyInMemEngine) SetStoreID(ctx context.Context, storeID int32) error { - return e.Engine.SetStoreID(ctx, storeID) -} - -// stickyInMemEnginesRegistryImpl is the bookkeeper for all active -// sticky engines, keyed by their id. It implements the -// StickyInMemEnginesRegistry interface. -type stickyInMemEnginesRegistryImpl struct { - entries map[string]*stickyInMemEngine - mu syncutil.Mutex - cfg stickyEngineRegistryConfig -} - -// NewStickyInMemEnginesRegistry creates a new StickyInMemEnginesRegistry. -func NewStickyInMemEnginesRegistry( - opts ...StickyEngineRegistryConfigOption, -) StickyInMemEnginesRegistry { - var cfg stickyEngineRegistryConfig - for _, opt := range opts { - opt(&cfg) - } - return &stickyInMemEnginesRegistryImpl{ - entries: map[string]*stickyInMemEngine{}, - cfg: cfg, - } -} - -// GetOrCreateStickyInMemEngine implements the StickyInMemEnginesRegistry interface. -func (registry *stickyInMemEnginesRegistryImpl) GetOrCreateStickyInMemEngine( - ctx context.Context, cfg *Config, spec base.StoreSpec, -) (storage.Engine, error) { - registry.mu.Lock() - defer registry.mu.Unlock() - - var fs vfs.FS - if engine, ok := registry.entries[spec.StickyInMemoryEngineID]; ok { - if !engine.closed { - return nil, errors.Errorf("sticky engine %s has not been closed", spec.StickyInMemoryEngineID) - } - if !registry.cfg.replaceEngines { - log.Infof(ctx, "re-using sticky in-mem engine %s", spec.StickyInMemoryEngineID) - engine.closed = false - return engine, nil - } - fs = engine.fs - registry.deleteEngine(spec.StickyInMemoryEngineID) - } else { - fs = vfs.NewMem() - } - options := []storage.ConfigOption{ - storage.Attributes(spec.Attributes), - storage.CacheSize(cfg.CacheSize), - storage.MaxSize(spec.Size.InBytes), - storage.EncryptionAtRest(spec.EncryptionOptions), - storage.ForStickyEngineTesting, - } - - if s := cfg.TestingKnobs.Store; s != nil { - stk := s.(*kvserver.StoreTestingKnobs) - if stk.SmallEngineBlocks { - options = append(options, storage.BlockSize(1)) - } - if len(stk.EngineKnobs) > 0 { - options = append(options, stk.EngineKnobs...) - } - } - - log.Infof(ctx, "creating new sticky in-mem engine %s", spec.StickyInMemoryEngineID) - engine := storage.InMemFromFS(ctx, fs, "", cluster.MakeClusterSettings(), options...) - - engineEntry := &stickyInMemEngine{ - id: spec.StickyInMemoryEngineID, - closed: false, - Engine: engine, - fs: fs, - } - registry.entries[spec.StickyInMemoryEngineID] = engineEntry - return engineEntry, nil -} - -func (registry *stickyInMemEnginesRegistryImpl) GetUnderlyingFS( - spec base.StoreSpec, -) (vfs.FS, error) { - registry.mu.Lock() - defer registry.mu.Unlock() - - if engine, ok := registry.entries[spec.StickyInMemoryEngineID]; ok { - return engine.fs, nil - } - return nil, errors.Errorf("engine '%s' was not created", spec.StickyInMemoryEngineID) -} - -// CloseAllStickyInMemEngines closes and removes all sticky in memory engines. -func (registry *stickyInMemEnginesRegistryImpl) CloseAllStickyInMemEngines() { - registry.mu.Lock() - defer registry.mu.Unlock() - - for id := range registry.entries { - registry.deleteEngine(id) - } -} - -func (registry *stickyInMemEnginesRegistryImpl) deleteEngine(id string) { - engine, ok := registry.entries[id] - if !ok { - return - } - engine.closed = true - engine.Engine.Close() - delete(registry.entries, id) -} - -type stickyEngineRegistryConfig struct { - // replaceEngines is true if a sticky engine registry should return a new - // engine with the same underlying in-memory FS instead of simply reopening - // it in the case where it already exists. - replaceEngines bool -} diff --git a/pkg/server/sticky_vfs.go b/pkg/server/sticky_vfs.go new file mode 100644 index 000000000000..aa1bf3d48278 --- /dev/null +++ b/pkg/server/sticky_vfs.go @@ -0,0 +1,219 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/vfs" +) + +// StickyVFSOption is a config option for a sticky engine +// registry that can be passed to NewStickyVFSRegistry. +type StickyVFSOption func(cfg *stickyConfig) + +// ReuseEngines configures a sticky VFS registry to return exactly the same +// Engine without closing and re-opening from the underlying in-memory FS. +var ReuseEngines StickyVFSOption = func(cfg *stickyConfig) { + cfg.reuseEngines = true +} + +type stickyConfig struct { + // reuseEngines is true if a sticky engine registry should return an existing + // engine instead of reopening it from the underlying in-memory FS. + reuseEngines bool +} + +// StickyVFSRegistry manages the lifecycle of sticky in-memory filesystems. It +// is intended for use in demos and/or tests, where we want in-memory storage +// nodes to persist between killed nodes. +type StickyVFSRegistry interface { + // Open returns an engine associated with the given id. It will create a new + // in-memory filesystem if one does not already exist. Otherwise, the new + // Engine will re-open the existing store held within the in-memory + // filesystem. + // + // If the registry was created with the ReuseEngines option, Open will + // return the previous Engine on a subsequent Open. With this option, at + // most one engine with a given id can be active at any given time. Note + // that if you re-create an existing sticky engine the new attributes and + // cache size will be ignored. One must Close() on the sticky engine before + // another can be fetched. When using the ReuseEngines option, the caller + // must call CloseAllEngines when they're finished to close the underlying + // engines. + Open(ctx context.Context, cfg *Config, spec base.StoreSpec) (storage.Engine, error) + // Get returns the named in-memory FS. + Get(spec base.StoreSpec) (vfs.FS, error) + // CloseAllEngines closes all open sticky in-memory engines that were + // created by this registry. Calling this method is required when using the + // ReuseEngines option. + CloseAllEngines() +} + +// stickyVFSRegistryImpl is the bookkeeper for all active sticky filesystems, +// keyed by their id. It implements the StickyVFSRegistry interface. +type stickyVFSRegistryImpl struct { + entries map[string]*vfs.MemFS + engines map[string]*stickyInMemEngine + mu syncutil.Mutex + cfg stickyConfig +} + +// NewStickyVFSRegistry creates a new StickyVFSRegistry. +func NewStickyVFSRegistry(opts ...StickyVFSOption) StickyVFSRegistry { + registry := &stickyVFSRegistryImpl{ + entries: map[string]*vfs.MemFS{}, + engines: map[string]*stickyInMemEngine{}, + } + for _, opt := range opts { + opt(®istry.cfg) + } + return registry +} + +// Open implements the StickyInMemEnginesRegistry interface. +func (registry *stickyVFSRegistryImpl) Open( + ctx context.Context, cfg *Config, spec base.StoreSpec, +) (storage.Engine, error) { + registry.mu.Lock() + defer registry.mu.Unlock() + + // Look up the VFS. + fs, ok := registry.entries[spec.StickyVFSID] + + // If the registry is configured to reuse whole Engines (eg, skipping + // shutdown and recovery of the storage engine), then check if we already + // have an open Engine. + if ok && registry.cfg.reuseEngines { + if engine, engineOk := registry.engines[spec.StickyVFSID]; engineOk { + if !engine.closed { + return nil, errors.Errorf("sticky engine %s has not been closed", spec.StickyVFSID) + } + log.Infof(ctx, "re-using existing sticky in-mem engine %s", spec.StickyVFSID) + engine.closed = false + return engine, nil + } + } + + // If the VFS doesn't yet exist, construct a new one and save the + // association. + if !ok { + fs = vfs.NewMem() + registry.entries[spec.StickyVFSID] = fs + } + + // Create a new engine. + options := []storage.ConfigOption{ + storage.Attributes(spec.Attributes), + storage.CacheSize(cfg.CacheSize), + storage.MaxSize(spec.Size.InBytes), + storage.EncryptionAtRest(spec.EncryptionOptions), + storage.ForStickyEngineTesting, + } + + if s := cfg.TestingKnobs.Store; s != nil { + stk := s.(*kvserver.StoreTestingKnobs) + if stk.SmallEngineBlocks { + options = append(options, storage.BlockSize(1)) + } + if len(stk.EngineKnobs) > 0 { + options = append(options, stk.EngineKnobs...) + } + } + + log.Infof(ctx, "creating engine with sticky in-memory VFS %s", spec.StickyVFSID) + engine := storage.InMemFromFS(ctx, fs, "", cluster.MakeClusterSettings(), options...) + + // If this registry is configured to keep Engines open rather recovering + // from filesystem state, wrap the Engine within a *stickyInMemEngine + // wrapper so that calls to Close do not Close the underlying Engine. + if registry.cfg.reuseEngines { + wrappedEngine := &stickyInMemEngine{ + id: spec.StickyVFSID, + closed: false, + Engine: engine, + fs: fs, + } + registry.engines[spec.StickyVFSID] = wrappedEngine + return wrappedEngine, nil + } + return engine, nil +} + +func (registry *stickyVFSRegistryImpl) Get(spec base.StoreSpec) (vfs.FS, error) { + registry.mu.Lock() + defer registry.mu.Unlock() + + if fs, ok := registry.entries[spec.StickyVFSID]; ok { + return fs, nil + } else { + fs = vfs.NewMem() + registry.entries[spec.StickyVFSID] = fs + return fs, nil + } +} + +// CloseAllEngines closes and removes all sticky in-memory engines. +func (registry *stickyVFSRegistryImpl) CloseAllEngines() { + registry.mu.Lock() + defer registry.mu.Unlock() + + for id, engine := range registry.engines { + engine.closed = true + engine.Engine.Close() + delete(registry.entries, id) + } +} + +// stickyInMemEngine extends a normal engine, but holds on to the Engine between +// Opens to allow subsequent Opens to reuse a previous Engine instance. This +// type is used only when a the StickyVFSRegistry is created with the +// ReuseEngines option. +// +// Engine.Close does not close the underlying Engine. The engine is kept open +// until CloseAllEngines is called, hence being "sticky". +type stickyInMemEngine struct { + // id is the unique identifier for this sticky engine. + id string + // closed indicates whether the current engine has been closed. + closed bool + // Engine extends the Engine interface. + storage.Engine + // Underlying in-mem filesystem backing the engine. + fs vfs.FS +} + +// stickyInMemEngine implements Engine. +var _ storage.Engine = &stickyInMemEngine{} + +// Close overwrites the default Engine interface to not close the underlying +// engine if called. We mark the state as closed to reflect a correct result +// in Closed(). +func (e *stickyInMemEngine) Close() { + e.closed = true +} + +// Closed overwrites the default Engine interface. +func (e *stickyInMemEngine) Closed() bool { + return e.closed +} + +// SetStoreID implements the StoreIDSetter interface. +func (e *stickyInMemEngine) SetStoreID(ctx context.Context, storeID int32) error { + return e.Engine.SetStoreID(ctx, storeID) +} diff --git a/pkg/server/sticky_engine_test.go b/pkg/server/sticky_vfs_test.go similarity index 65% rename from pkg/server/sticky_engine_test.go rename to pkg/server/sticky_vfs_test.go index f3e5053b1a48..8d1915491db2 100644 --- a/pkg/server/sticky_engine_test.go +++ b/pkg/server/sticky_vfs_test.go @@ -32,32 +32,32 @@ func TestStickyEngines(t *testing.T) { cacheSize := int64(1 << 20) /* 1 MiB */ storeSize := int64(512 << 20) /* 512 MiB */ - registry := NewStickyInMemEnginesRegistry() + registry := NewStickyVFSRegistry(ReuseEngines) cfg1 := MakeConfig(ctx, cluster.MakeTestingClusterSettings()) cfg1.CacheSize = cacheSize spec1 := base.StoreSpec{ - StickyInMemoryEngineID: "engine1", - Attributes: attrs, - Size: base.SizeSpec{InBytes: storeSize}, + StickyVFSID: "engine1", + Attributes: attrs, + Size: base.SizeSpec{InBytes: storeSize}, } - engine1, err := registry.GetOrCreateStickyInMemEngine(ctx, &cfg1, spec1) + engine1, err := registry.Open(ctx, &cfg1, spec1) require.NoError(t, err) require.False(t, engine1.Closed()) cfg2 := MakeConfig(ctx, cluster.MakeTestingClusterSettings()) cfg2.CacheSize = cacheSize spec2 := base.StoreSpec{ - StickyInMemoryEngineID: "engine2", - Attributes: attrs, - Size: base.SizeSpec{InBytes: storeSize}, + StickyVFSID: "engine2", + Attributes: attrs, + Size: base.SizeSpec{InBytes: storeSize}, } - engine2, err := registry.GetOrCreateStickyInMemEngine(ctx, &cfg2, spec2) + engine2, err := registry.Open(ctx, &cfg2, spec2) require.NoError(t, err) require.False(t, engine2.Closed()) // Regetting the engine whilst it is not closed will fail. - _, err = registry.GetOrCreateStickyInMemEngine(ctx, &cfg1, spec1) + _, err = registry.Open(ctx, &cfg1, spec1) require.EqualError(t, err, "sticky engine engine1 has not been closed") // Close the engine, which allows it to be refetched. @@ -66,20 +66,20 @@ func TestStickyEngines(t *testing.T) { require.False(t, engine1.(*stickyInMemEngine).Engine.Closed()) // Refetching the engine should give back the same engine. - engine1Refetched, err := registry.GetOrCreateStickyInMemEngine(ctx, &cfg1, spec1) + engine1Refetched, err := registry.Open(ctx, &cfg1, spec1) require.NoError(t, err) require.Equal(t, engine1, engine1Refetched) require.False(t, engine1.Closed()) // Cleaning up everything asserts everything is closed. - registry.CloseAllStickyInMemEngines() + registry.CloseAllEngines() for _, engine := range []storage.Engine{engine1, engine2} { require.True(t, engine.Closed()) require.True(t, engine.(*stickyInMemEngine).Engine.Closed()) } } -func TestStickyEnginesReplaceEngines(t *testing.T) { +func TestStickyVFS(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -88,36 +88,35 @@ func TestStickyEnginesReplaceEngines(t *testing.T) { cacheSize := int64(1 << 20) /* 1 MiB */ storeSize := int64(512 << 20) /* 512 MiB */ - registry := NewStickyInMemEnginesRegistry(ReplaceEngines) + registry := NewStickyVFSRegistry() cfg1 := MakeConfig(ctx, cluster.MakeTestingClusterSettings()) cfg1.CacheSize = cacheSize spec1 := base.StoreSpec{ - StickyInMemoryEngineID: "engine1", - Attributes: attrs, - Size: base.SizeSpec{InBytes: storeSize}, + StickyVFSID: "engine1", + Attributes: attrs, + Size: base.SizeSpec{InBytes: storeSize}, } - engine1, err := registry.GetOrCreateStickyInMemEngine(ctx, &cfg1, spec1) + engine1, err := registry.Open(ctx, &cfg1, spec1) require.NoError(t, err) - fs1, err := registry.GetUnderlyingFS(spec1) + fs1, err := registry.Get(spec1) require.NoError(t, err) require.False(t, engine1.Closed()) engine1.Close() // Refetching the engine should give back a different engine with the same // underlying fs. - engine1Refetched, err := registry.GetOrCreateStickyInMemEngine(ctx, &cfg1, spec1) + engine2, err := registry.Open(ctx, &cfg1, spec1) require.NoError(t, err) - fs1Refetched, err := registry.GetUnderlyingFS(spec1) + fs2, err := registry.Get(spec1) require.NoError(t, err) - require.NotEqual(t, engine1, engine1Refetched) - require.Equal(t, fs1, fs1Refetched) + require.NotEqual(t, engine1, engine2) + require.Equal(t, fs1, fs2) require.True(t, engine1.Closed()) - require.False(t, engine1Refetched.Closed()) + require.False(t, engine2.Closed()) + engine2.Close() - registry.CloseAllStickyInMemEngines() - for _, engine := range []storage.Engine{engine1, engine1Refetched} { + for _, engine := range []storage.Engine{engine1, engine2} { require.True(t, engine.Closed()) - require.True(t, engine.(*stickyInMemEngine).Engine.Closed()) } } diff --git a/pkg/server/testing_knobs.go b/pkg/server/testing_knobs.go index 27f2712b64db..ccb3fbd9a935 100644 --- a/pkg/server/testing_knobs.go +++ b/pkg/server/testing_knobs.go @@ -102,12 +102,12 @@ type TestingKnobs struct { // An (additional) callback invoked whenever a // node is permanently removed from the cluster. OnDecommissionedCallback func(id roachpb.NodeID) - // StickyEngineRegistry manages the lifecycle of sticky in memory engines, - // which can be enabled via base.StoreSpec.StickyInMemoryEngineID. + // StickyVFSRegistry manages the lifecycle of sticky in memory engines, + // which can be enabled via base.StoreSpec.StickyVFSID. // // When supplied to a TestCluster, StickyEngineIDs will be associated auto- // matically to the StoreSpecs used. - StickyEngineRegistry StickyInMemEnginesRegistry + StickyVFSRegistry StickyVFSRegistry // WallClock is used to inject a custom clock for testing the server. It is // typically either an hlc.HybridManualClock or hlc.ManualClock. WallClock hlc.WallClock diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index 26dbe23152f6..edf2c18246f2 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -311,10 +311,10 @@ func NewTestCluster(t testing.TB, nodes int, clusterArgs base.TestClusterArgs) * if len(serverArgs.StoreSpecs) == 0 { serverArgs.StoreSpecs = []base.StoreSpec{base.DefaultTestStoreSpec} } - if knobs, ok := serverArgs.Knobs.Server.(*server.TestingKnobs); ok && knobs.StickyEngineRegistry != nil { + if knobs, ok := serverArgs.Knobs.Server.(*server.TestingKnobs); ok && knobs.StickyVFSRegistry != nil { for j := range serverArgs.StoreSpecs { - if serverArgs.StoreSpecs[j].StickyInMemoryEngineID == "" { - serverArgs.StoreSpecs[j].StickyInMemoryEngineID = fmt.Sprintf("auto-node%d-store%d", i+1, j+1) + if serverArgs.StoreSpecs[j].StickyVFSID == "" { + serverArgs.StoreSpecs[j].StickyVFSID = fmt.Sprintf("auto-node%d-store%d", i+1, j+1) } } } @@ -1708,7 +1708,7 @@ func (tc *TestCluster) RestartServerWithInspect(idx int, inspect func(s *server. } for i, specs := range serverArgs.StoreSpecs { - if specs.InMemory && specs.StickyInMemoryEngineID == "" { + if specs.InMemory && specs.StickyVFSID == "" { return errors.Errorf("failed to restart Server %d, because a restart can only be used on a server with a sticky engine", i) } } diff --git a/pkg/testutils/testcluster/testcluster_test.go b/pkg/testutils/testcluster/testcluster_test.go index ecffba83bc42..e10a941c63be 100644 --- a/pkg/testutils/testcluster/testcluster_test.go +++ b/pkg/testutils/testcluster/testcluster_test.go @@ -274,8 +274,7 @@ func TestStopServer(t *testing.T) { func TestRestart(t *testing.T) { defer leaktest.AfterTest(t)() - stickyEngineRegistry := server.NewStickyInMemEnginesRegistry() - defer stickyEngineRegistry.CloseAllStickyInMemEngines() + stickyVFSRegistry := server.NewStickyVFSRegistry() lisReg := listenerutil.NewListenerRegistry() defer lisReg.Close() @@ -285,13 +284,13 @@ func TestRestart(t *testing.T) { stickyServerArgs[i] = base.TestServerArgs{ StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: "TestRestart" + strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: "TestRestart" + strconv.FormatInt(int64(i), 10), }, }, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: stickyEngineRegistry, + StickyVFSRegistry: stickyVFSRegistry, }, }, } diff --git a/pkg/util/startup/startup_test.go b/pkg/util/startup/startup_test.go index 9a4512665570..7c305e6b0502 100644 --- a/pkg/util/startup/startup_test.go +++ b/pkg/util/startup/startup_test.go @@ -139,8 +139,7 @@ func runCircuitBreakerTestForKey( lReg := listenerutil.NewListenerRegistry() defer lReg.Close() - reg := server.NewStickyInMemEnginesRegistry() - defer reg.CloseAllStickyInMemEngines() + reg := server.NewStickyVFSRegistry() // TODO: Disable expiration based leases metamorphism since it currently // breaks closed timestamps and prevent rangefeeds from advancing checkpoint @@ -158,7 +157,7 @@ func runCircuitBreakerTestForKey( Settings: st, Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ - StickyEngineRegistry: reg, + StickyVFSRegistry: reg, }, SpanConfig: &spanconfig.TestingKnobs{ ConfigureScratchRange: true, @@ -166,8 +165,8 @@ func runCircuitBreakerTestForKey( }, StoreSpecs: []base.StoreSpec{ { - InMemory: true, - StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10), + InMemory: true, + StickyVFSID: strconv.FormatInt(int64(i), 10), }, }, Listener: lReg.MustGetOrCreate(t, i),