diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index b43c2d857e9e..530d7d26aa3a 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -219,7 +219,7 @@ /pkg/server/status*go @cockroachdb/obs-inf-prs @cockroachdb/server-prs /pkg/server/status*go @cockroachdb/obs-inf-prs @cockroachdb/server-prs /pkg/server/status/ @cockroachdb/obs-inf-prs @cockroachdb/server-prs -/pkg/server/sticky_engine* @cockroachdb/storage +/pkg/server/sticky_vfs* @cockroachdb/storage /pkg/server/structlogging/ @cockroachdb/cluster-observability @cockroachdb/obs-inf-prs /pkg/server/systemconfigwatcher/ @cockroachdb/kv-prs @cockroachdb/multi-tenant /pkg/server/telemetry/ @cockroachdb/obs-inf-prs @cockroachdb/server-prs diff --git a/pkg/cli/debug_recover_loss_of_quorum_test.go b/pkg/cli/debug_recover_loss_of_quorum_test.go index c54a536dcf0f..ba78a9d02506 100644 --- a/pkg/cli/debug_recover_loss_of_quorum_test.go +++ b/pkg/cli/debug_recover_loss_of_quorum_test.go @@ -377,9 +377,7 @@ func TestStageVersionCheck(t *testing.T) { }) require.NoError(t, err, "force local must fix incorrect version") // Check that stored plan has version matching cluster version. - fs, err := storeReg.Get(base.StoreSpec{InMemory: true, StickyVFSID: "1"}) - require.NoError(t, err, "failed to get shared store fs") - ps := loqrecovery.NewPlanStore("", fs) + ps := loqrecovery.NewPlanStore("", storeReg.Get("1")) p, ok, err := ps.LoadPlan() require.NoError(t, err, "failed to read node 0 plan") require.True(t, ok, "plan was not staged") diff --git a/pkg/kv/kvserver/consistency_queue_test.go b/pkg/kv/kvserver/consistency_queue_test.go index 19868ba030b0..0e9a366cca6e 100644 --- a/pkg/kv/kvserver/consistency_queue_test.go +++ b/pkg/kv/kvserver/consistency_queue_test.go @@ -306,9 +306,7 @@ func TestCheckConsistencyInconsistent(t *testing.T) { } onDiskCheckpointPaths := func(nodeIdx int) []string { - fs, err := stickyVFSRegistry.Get( - base.StoreSpec{StickyVFSID: strconv.FormatInt(int64(nodeIdx), 10)}) - require.NoError(t, err) + fs := stickyVFSRegistry.Get(strconv.FormatInt(int64(nodeIdx), 10)) store := tc.GetFirstStoreFromServer(t, nodeIdx) checkpointPath := filepath.Join(store.TODOEngine().GetAuxiliaryDir(), "checkpoints") checkpoints, _ := fs.List(checkpointPath) @@ -387,8 +385,7 @@ func TestCheckConsistencyInconsistent(t *testing.T) { // Create a new store on top of checkpoint location inside existing in-mem // VFS to verify its contents. - fs, err := stickyVFSRegistry.Get(base.StoreSpec{StickyVFSID: strconv.FormatInt(int64(i), 10)}) - require.NoError(t, err) + fs := stickyVFSRegistry.Get(strconv.FormatInt(int64(i), 10)) cpEng := storage.InMemFromFS(context.Background(), fs, cps[0], cluster.MakeClusterSettings(), storage.ForTesting, storage.MustExist, storage.ReadOnly, storage.CacheSize(1<<20)) defer cpEng.Close() diff --git a/pkg/kv/kvserver/loqrecovery/server_integration_test.go b/pkg/kv/kvserver/loqrecovery/server_integration_test.go index 25f367c2c9d0..b70107a499e4 100644 --- a/pkg/kv/kvserver/loqrecovery/server_integration_test.go +++ b/pkg/kv/kvserver/loqrecovery/server_integration_test.go @@ -706,9 +706,7 @@ func prepInMemPlanStores( pss := make(map[int]loqrecovery.PlanStore) for id, args := range serverArgs { reg := args.Knobs.Server.(*server.TestingKnobs).StickyVFSRegistry - store, err := reg.Get(args.StoreSpecs[0]) - require.NoError(t, err, "can't create loq recovery plan store") - pss[id] = loqrecovery.NewPlanStore(".", store) + pss[id] = loqrecovery.NewPlanStore(".", reg.Get(args.StoreSpecs[0].StickyVFSID)) } return pss } diff --git a/pkg/server/loss_of_quorum.go b/pkg/server/loss_of_quorum.go index e7ba3ba4bb23..ec4b93f3a3e2 100644 --- a/pkg/server/loss_of_quorum.go +++ b/pkg/server/loss_of_quorum.go @@ -51,11 +51,7 @@ func newPlanStore(cfg Config) (loqrecovery.PlanStore, error) { "engine no registry available. Please use " + "Knobs.Server.StickyVFSRegistry to provide one.") } - var err error - fs, err = knobs.StickyVFSRegistry.Get(spec) - if err != nil { - return loqrecovery.PlanStore{}, err - } + fs = knobs.StickyVFSRegistry.Get(spec.StickyVFSID) } else { fs = vfs.NewMem() } diff --git a/pkg/server/sticky_vfs.go b/pkg/server/sticky_vfs.go index 7265c25dd89b..0aa57bfaa3b1 100644 --- a/pkg/server/sticky_vfs.go +++ b/pkg/server/sticky_vfs.go @@ -19,7 +19,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/vfs" ) @@ -27,18 +26,9 @@ import ( // registry that can be passed to NewStickyVFSRegistry. type StickyVFSOption func(cfg *stickyConfig) -// ReuseEnginesDeprecated configures a sticky VFS registry to return exactly the -// same Engine without closing and re-opening from the underlying in-memory FS. -// This option is deprecated. Callers should refactor their usage to not depend -// on ephemeral, non-persisted state of an Engine. -var ReuseEnginesDeprecated StickyVFSOption = func(cfg *stickyConfig) { - cfg.reuseEngines = true -} - type stickyConfig struct { - // reuseEngines is true if a sticky engine registry should return an existing - // engine instead of reopening it from the underlying in-memory FS. - reuseEngines bool + // Preserved for anticipated future options, such as the ability to use + // Pebble's "strict" MemFS. } // StickyVFSRegistry manages the lifecycle of sticky in-memory filesystems. It @@ -49,31 +39,18 @@ type StickyVFSRegistry interface { // neither Open nor Get has been called with the given spec's StickyVFSID // before, Open will create a new in-memory filesystem. Otherwise, the new // Engine will re-open the existing store held within the in-memory - // filesystem. - // - // If the registry was created with the ReuseEnginesDeprecated option, Open - // will return an Engine whose Close method does not close the underlying - // engine. Subsequent Open calls will return the same Engine. With this - // option, at most one engine with a given id can be active at any given - // time. Note that with the ReuseEnginesDeprecated option, an Open call that - // returns an existing sticky engine will not respect modified configuration - // or attributes. When using the ReuseEnginesDeprecated option, the caller - // must call CloseAllEngines when they're finished to close the underlying - // engines. + // filesystem. The caller should ensure any previous engine accessing the + // same VFS has already been closed. Open(ctx context.Context, cfg *Config, spec base.StoreSpec) (storage.Engine, error) - // Get returns the named in-memory FS. - Get(spec base.StoreSpec) (vfs.FS, error) - // CloseAllEngines closes all open sticky in-memory engines that were - // created by this registry. Calling this method is required when using the - // ReuseEnginesDeprecated option. - CloseAllEngines() + // Get returns the named in-memory FS, constructing a new one if this is the + // first time a FS with the provided ID has been requested. + Get(stickyVFSID string) vfs.FS } // stickyVFSRegistryImpl is the bookkeeper for all active sticky filesystems, // keyed by their id. It implements the StickyVFSRegistry interface. type stickyVFSRegistryImpl struct { entries map[string]*vfs.MemFS - engines map[string]*stickyInMemEngine mu syncutil.Mutex cfg stickyConfig } @@ -82,7 +59,6 @@ type stickyVFSRegistryImpl struct { func NewStickyVFSRegistry(opts ...StickyVFSOption) StickyVFSRegistry { registry := &stickyVFSRegistryImpl{ entries: map[string]*vfs.MemFS{}, - engines: map[string]*stickyInMemEngine{}, } for _, opt := range opts { opt(®istry.cfg) @@ -94,42 +70,14 @@ func NewStickyVFSRegistry(opts ...StickyVFSOption) StickyVFSRegistry { func (registry *stickyVFSRegistryImpl) Open( ctx context.Context, cfg *Config, spec base.StoreSpec, ) (storage.Engine, error) { - registry.mu.Lock() - defer registry.mu.Unlock() - - // Look up the VFS. - fs, ok := registry.entries[spec.StickyVFSID] - - // If the registry is configured to reuse whole Engines (eg, skipping - // shutdown and recovery of the storage engine), then check if we already - // have an open Engine. - if ok && registry.cfg.reuseEngines { - if engine, engineOk := registry.engines[spec.StickyVFSID]; engineOk { - if !engine.closed { - return nil, errors.Errorf("sticky engine %s has not been closed", spec.StickyVFSID) - } - log.Infof(ctx, "re-using existing sticky in-mem engine %s", spec.StickyVFSID) - engine.closed = false - return engine, nil - } - } - - // If the VFS doesn't yet exist, construct a new one and save the - // association. - if !ok { - fs = vfs.NewMem() - registry.entries[spec.StickyVFSID] = fs - } - + fs := registry.Get(spec.StickyVFSID) // Create a new engine. options := []storage.ConfigOption{ storage.Attributes(spec.Attributes), storage.CacheSize(cfg.CacheSize), storage.MaxSize(spec.Size.InBytes), storage.EncryptionAtRest(spec.EncryptionOptions), - storage.ForStickyEngineTesting, } - if s := cfg.TestingKnobs.Store; s != nil { stk := s.(*kvserver.StoreTestingKnobs) if stk.SmallEngineBlocks { @@ -141,86 +89,18 @@ func (registry *stickyVFSRegistryImpl) Open( } log.Infof(ctx, "creating engine with sticky in-memory VFS %s", spec.StickyVFSID) - engine := storage.InMemFromFS(ctx, fs, "", cluster.MakeClusterSettings(), options...) - - // If this registry is configured to keep Engines open rather recovering - // from filesystem state, wrap the Engine within a *stickyInMemEngine - // wrapper so that calls to Close do not Close the underlying Engine. - if registry.cfg.reuseEngines { - wrappedEngine := &stickyInMemEngine{ - id: spec.StickyVFSID, - closed: false, - Engine: engine, - fs: fs, - } - registry.engines[spec.StickyVFSID] = wrappedEngine - return wrappedEngine, nil - } - return engine, nil + return storage.InMemFromFS(ctx, fs, "", cluster.MakeClusterSettings(), options...), nil } -func (registry *stickyVFSRegistryImpl) Get(spec base.StoreSpec) (vfs.FS, error) { +// Get implements the StickyVFSRegistry interface. +func (registry *stickyVFSRegistryImpl) Get(stickyVFSID string) vfs.FS { registry.mu.Lock() defer registry.mu.Unlock() - if fs, ok := registry.entries[spec.StickyVFSID]; ok { - return fs, nil - } else { - fs = vfs.NewMem() - registry.entries[spec.StickyVFSID] = fs - return fs, nil + if fs, ok := registry.entries[stickyVFSID]; ok { + return fs } -} - -// CloseAllEngines closes all open sticky in-memory engines that were -// created by this registry. Calling this method is required when using the -// ReuseEnginesDeprecated option. -func (registry *stickyVFSRegistryImpl) CloseAllEngines() { - registry.mu.Lock() - defer registry.mu.Unlock() - - for id, engine := range registry.engines { - engine.closed = true - engine.Engine.Close() - delete(registry.entries, id) - delete(registry.engines, id) - } -} - -// stickyInMemEngine extends a normal engine, but holds on to the Engine between -// Opens to allow subsequent Opens to reuse a previous Engine instance. This -// type is used only when a the StickyVFSRegistry is created with the -// ReuseEnginesDeprecated option. -// -// Engine.Close does not close the underlying Engine. The engine is kept open -// until CloseAllEngines is called, hence being "sticky". -type stickyInMemEngine struct { - // id is the unique identifier for this sticky engine. - id string - // closed indicates whether the current engine has been closed. - closed bool - // Engine extends the Engine interface. - storage.Engine - // Underlying in-mem filesystem backing the engine. - fs vfs.FS -} - -// stickyInMemEngine implements Engine. -var _ storage.Engine = &stickyInMemEngine{} - -// Close overwrites the default Engine interface to not close the underlying -// engine if called. We mark the state as closed to reflect a correct result -// in Closed(). -func (e *stickyInMemEngine) Close() { - e.closed = true -} - -// Closed overwrites the default Engine interface. -func (e *stickyInMemEngine) Closed() bool { - return e.closed -} - -// SetStoreID implements the StoreIDSetter interface. -func (e *stickyInMemEngine) SetStoreID(ctx context.Context, storeID int32) error { - return e.Engine.SetStoreID(ctx, storeID) + fs := vfs.NewMem() + registry.entries[stickyVFSID] = fs + return fs } diff --git a/pkg/server/sticky_vfs_test.go b/pkg/server/sticky_vfs_test.go index 7c4263d70a74..4f06772d1ed1 100644 --- a/pkg/server/sticky_vfs_test.go +++ b/pkg/server/sticky_vfs_test.go @@ -23,62 +23,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestStickyEngines(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - ctx := context.Background() - attrs := roachpb.Attributes{} - cacheSize := int64(1 << 20) /* 1 MiB */ - storeSize := int64(512 << 20) /* 512 MiB */ - - registry := NewStickyVFSRegistry(ReuseEnginesDeprecated) - - cfg1 := MakeConfig(ctx, cluster.MakeTestingClusterSettings()) - cfg1.CacheSize = cacheSize - spec1 := base.StoreSpec{ - StickyVFSID: "engine1", - Attributes: attrs, - Size: base.SizeSpec{InBytes: storeSize}, - } - engine1, err := registry.Open(ctx, &cfg1, spec1) - require.NoError(t, err) - require.False(t, engine1.Closed()) - - cfg2 := MakeConfig(ctx, cluster.MakeTestingClusterSettings()) - cfg2.CacheSize = cacheSize - spec2 := base.StoreSpec{ - StickyVFSID: "engine2", - Attributes: attrs, - Size: base.SizeSpec{InBytes: storeSize}, - } - engine2, err := registry.Open(ctx, &cfg2, spec2) - require.NoError(t, err) - require.False(t, engine2.Closed()) - - // Regetting the engine whilst it is not closed will fail. - _, err = registry.Open(ctx, &cfg1, spec1) - require.EqualError(t, err, "sticky engine engine1 has not been closed") - - // Close the engine, which allows it to be refetched. - engine1.Close() - require.True(t, engine1.Closed()) - require.False(t, engine1.(*stickyInMemEngine).Engine.Closed()) - - // Refetching the engine should give back the same engine. - engine1Refetched, err := registry.Open(ctx, &cfg1, spec1) - require.NoError(t, err) - require.Equal(t, engine1, engine1Refetched) - require.False(t, engine1.Closed()) - - // Cleaning up everything asserts everything is closed. - registry.CloseAllEngines() - for _, engine := range []storage.Engine{engine1, engine2} { - require.True(t, engine.Closed()) - require.True(t, engine.(*stickyInMemEngine).Engine.Closed()) - } -} - func TestStickyVFS(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -99,8 +43,7 @@ func TestStickyVFS(t *testing.T) { } engine1, err := registry.Open(ctx, &cfg1, spec1) require.NoError(t, err) - fs1, err := registry.Get(spec1) - require.NoError(t, err) + fs1 := registry.Get(spec1.StickyVFSID) require.False(t, engine1.Closed()) engine1.Close() @@ -108,7 +51,7 @@ func TestStickyVFS(t *testing.T) { // underlying fs. engine2, err := registry.Open(ctx, &cfg1, spec1) require.NoError(t, err) - fs2, err := registry.Get(spec1) + fs2 := registry.Get(spec1.StickyVFSID) require.NoError(t, err) require.NotEqual(t, engine1, engine2) require.Equal(t, fs1, fs2) diff --git a/pkg/server/testing_knobs.go b/pkg/server/testing_knobs.go index ccb3fbd9a935..bcff210bef71 100644 --- a/pkg/server/testing_knobs.go +++ b/pkg/server/testing_knobs.go @@ -105,7 +105,7 @@ type TestingKnobs struct { // StickyVFSRegistry manages the lifecycle of sticky in memory engines, // which can be enabled via base.StoreSpec.StickyVFSID. // - // When supplied to a TestCluster, StickyEngineIDs will be associated auto- + // When supplied to a TestCluster, StickyVFSIDs will be associated auto- // matically to the StoreSpecs used. StickyVFSRegistry StickyVFSRegistry // WallClock is used to inject a custom clock for testing the server. It is diff --git a/pkg/storage/open.go b/pkg/storage/open.go index e6083b69cf55..11271d394505 100644 --- a/pkg/storage/open.go +++ b/pkg/storage/open.go @@ -81,16 +81,6 @@ var ForTesting ConfigOption = func(cfg *engineConfig) error { return nil } -// ForStickyEngineTesting is similar to ForTesting but leaves separated -// intents as enabled since we cannot ensure consistency in the test setup -// between what the KV layer thinks and what the engine does in terms of -// writing separated intents. Since our optimizations are for the case where -// we know there are only separated intents, this sidesteps any test issues -// due to inconsistencies. -var ForStickyEngineTesting ConfigOption = func(cfg *engineConfig) error { - return nil -} - // Attributes configures the engine's attributes. func Attributes(attrs roachpb.Attributes) ConfigOption { return func(cfg *engineConfig) error {