Skip to content

Commit

Permalink
server: remove support for sticky engines
Browse files Browse the repository at this point in the history
Remove support for reusing engines from the StickyVFSRegistry. Tests should not
depend on ephemeral, in-memory engine state between server restarts, or read
closed Engine state.

Close cockroachdb#108119.
Epic: none
Release note: none
  • Loading branch information
jbowens committed Aug 4, 2023
1 parent 2f79490 commit 179bfc9
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 223 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@
/pkg/server/status*go @cockroachdb/obs-inf-prs @cockroachdb/server-prs
/pkg/server/status*go @cockroachdb/obs-inf-prs @cockroachdb/server-prs
/pkg/server/status/ @cockroachdb/obs-inf-prs @cockroachdb/server-prs
/pkg/server/sticky_engine* @cockroachdb/storage
/pkg/server/sticky_vfs* @cockroachdb/storage
/pkg/server/structlogging/ @cockroachdb/cluster-observability @cockroachdb/obs-inf-prs
/pkg/server/systemconfigwatcher/ @cockroachdb/kv-prs @cockroachdb/multi-tenant
/pkg/server/telemetry/ @cockroachdb/obs-inf-prs @cockroachdb/server-prs
Expand Down
4 changes: 1 addition & 3 deletions pkg/cli/debug_recover_loss_of_quorum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,9 +377,7 @@ func TestStageVersionCheck(t *testing.T) {
})
require.NoError(t, err, "force local must fix incorrect version")
// Check that stored plan has version matching cluster version.
fs, err := storeReg.Get(base.StoreSpec{InMemory: true, StickyVFSID: "1"})
require.NoError(t, err, "failed to get shared store fs")
ps := loqrecovery.NewPlanStore("", fs)
ps := loqrecovery.NewPlanStore("", storeReg.Get("1"))
p, ok, err := ps.LoadPlan()
require.NoError(t, err, "failed to read node 0 plan")
require.True(t, ok, "plan was not staged")
Expand Down
7 changes: 2 additions & 5 deletions pkg/kv/kvserver/consistency_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,7 @@ func TestCheckConsistencyInconsistent(t *testing.T) {
}

onDiskCheckpointPaths := func(nodeIdx int) []string {
fs, err := stickyVFSRegistry.Get(
base.StoreSpec{StickyVFSID: strconv.FormatInt(int64(nodeIdx), 10)})
require.NoError(t, err)
fs := stickyVFSRegistry.Get(strconv.FormatInt(int64(nodeIdx), 10))
store := tc.GetFirstStoreFromServer(t, nodeIdx)
checkpointPath := filepath.Join(store.TODOEngine().GetAuxiliaryDir(), "checkpoints")
checkpoints, _ := fs.List(checkpointPath)
Expand Down Expand Up @@ -387,8 +385,7 @@ func TestCheckConsistencyInconsistent(t *testing.T) {

// Create a new store on top of checkpoint location inside existing in-mem
// VFS to verify its contents.
fs, err := stickyVFSRegistry.Get(base.StoreSpec{StickyVFSID: strconv.FormatInt(int64(i), 10)})
require.NoError(t, err)
fs := stickyVFSRegistry.Get(strconv.FormatInt(int64(i), 10))
cpEng := storage.InMemFromFS(context.Background(), fs, cps[0], cluster.MakeClusterSettings(),
storage.ForTesting, storage.MustExist, storage.ReadOnly, storage.CacheSize(1<<20))
defer cpEng.Close()
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/kvserver/loqrecovery/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,9 +706,7 @@ func prepInMemPlanStores(
pss := make(map[int]loqrecovery.PlanStore)
for id, args := range serverArgs {
reg := args.Knobs.Server.(*server.TestingKnobs).StickyVFSRegistry
store, err := reg.Get(args.StoreSpecs[0])
require.NoError(t, err, "can't create loq recovery plan store")
pss[id] = loqrecovery.NewPlanStore(".", store)
pss[id] = loqrecovery.NewPlanStore(".", reg.Get(args.StoreSpecs[0].StickyVFSID))
}
return pss
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/server/loss_of_quorum.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,7 @@ func newPlanStore(cfg Config) (loqrecovery.PlanStore, error) {
"engine no registry available. Please use " +
"Knobs.Server.StickyVFSRegistry to provide one.")
}
var err error
fs, err = knobs.StickyVFSRegistry.Get(spec)
if err != nil {
return loqrecovery.PlanStore{}, err
}
fs = knobs.StickyVFSRegistry.Get(spec.StickyVFSID)
} else {
fs = vfs.NewMem()
}
Expand Down
152 changes: 16 additions & 136 deletions pkg/server/sticky_vfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,16 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/vfs"
)

// StickyVFSOption is a config option for a sticky engine
// registry that can be passed to NewStickyVFSRegistry.
type StickyVFSOption func(cfg *stickyConfig)

// ReuseEnginesDeprecated configures a sticky VFS registry to return exactly the
// same Engine without closing and re-opening from the underlying in-memory FS.
// This option is deprecated. Callers should refactor their usage to not depend
// on ephemeral, non-persisted state of an Engine.
var ReuseEnginesDeprecated StickyVFSOption = func(cfg *stickyConfig) {
cfg.reuseEngines = true
}

type stickyConfig struct {
// reuseEngines is true if a sticky engine registry should return an existing
// engine instead of reopening it from the underlying in-memory FS.
reuseEngines bool
// Preserved for anticipated future options, such as the ability to use
// Pebble's "strict" MemFS.
}

// StickyVFSRegistry manages the lifecycle of sticky in-memory filesystems. It
Expand All @@ -49,31 +39,18 @@ type StickyVFSRegistry interface {
// neither Open nor Get has been called with the given spec's StickyVFSID
// before, Open will create a new in-memory filesystem. Otherwise, the new
// Engine will re-open the existing store held within the in-memory
// filesystem.
//
// If the registry was created with the ReuseEnginesDeprecated option, Open
// will return an Engine whose Close method does not close the underlying
// engine. Subsequent Open calls will return the same Engine. With this
// option, at most one engine with a given id can be active at any given
// time. Note that with the ReuseEnginesDeprecated option, an Open call that
// returns an existing sticky engine will not respect modified configuration
// or attributes. When using the ReuseEnginesDeprecated option, the caller
// must call CloseAllEngines when they're finished to close the underlying
// engines.
// filesystem. The caller should ensure any previous engine accessing the
// same VFS has already been closed.
Open(ctx context.Context, cfg *Config, spec base.StoreSpec) (storage.Engine, error)
// Get returns the named in-memory FS.
Get(spec base.StoreSpec) (vfs.FS, error)
// CloseAllEngines closes all open sticky in-memory engines that were
// created by this registry. Calling this method is required when using the
// ReuseEnginesDeprecated option.
CloseAllEngines()
// Get returns the named in-memory FS, constructing a new one if this is the
// first time a FS with the provided ID has been requested.
Get(stickyVFSID string) vfs.FS
}

// stickyVFSRegistryImpl is the bookkeeper for all active sticky filesystems,
// keyed by their id. It implements the StickyVFSRegistry interface.
type stickyVFSRegistryImpl struct {
entries map[string]*vfs.MemFS
engines map[string]*stickyInMemEngine
mu syncutil.Mutex
cfg stickyConfig
}
Expand All @@ -82,7 +59,6 @@ type stickyVFSRegistryImpl struct {
func NewStickyVFSRegistry(opts ...StickyVFSOption) StickyVFSRegistry {
registry := &stickyVFSRegistryImpl{
entries: map[string]*vfs.MemFS{},
engines: map[string]*stickyInMemEngine{},
}
for _, opt := range opts {
opt(&registry.cfg)
Expand All @@ -94,42 +70,14 @@ func NewStickyVFSRegistry(opts ...StickyVFSOption) StickyVFSRegistry {
func (registry *stickyVFSRegistryImpl) Open(
ctx context.Context, cfg *Config, spec base.StoreSpec,
) (storage.Engine, error) {
registry.mu.Lock()
defer registry.mu.Unlock()

// Look up the VFS.
fs, ok := registry.entries[spec.StickyVFSID]

// If the registry is configured to reuse whole Engines (eg, skipping
// shutdown and recovery of the storage engine), then check if we already
// have an open Engine.
if ok && registry.cfg.reuseEngines {
if engine, engineOk := registry.engines[spec.StickyVFSID]; engineOk {
if !engine.closed {
return nil, errors.Errorf("sticky engine %s has not been closed", spec.StickyVFSID)
}
log.Infof(ctx, "re-using existing sticky in-mem engine %s", spec.StickyVFSID)
engine.closed = false
return engine, nil
}
}

// If the VFS doesn't yet exist, construct a new one and save the
// association.
if !ok {
fs = vfs.NewMem()
registry.entries[spec.StickyVFSID] = fs
}

fs := registry.Get(spec.StickyVFSID)
// Create a new engine.
options := []storage.ConfigOption{
storage.Attributes(spec.Attributes),
storage.CacheSize(cfg.CacheSize),
storage.MaxSize(spec.Size.InBytes),
storage.EncryptionAtRest(spec.EncryptionOptions),
storage.ForStickyEngineTesting,
}

if s := cfg.TestingKnobs.Store; s != nil {
stk := s.(*kvserver.StoreTestingKnobs)
if stk.SmallEngineBlocks {
Expand All @@ -141,86 +89,18 @@ func (registry *stickyVFSRegistryImpl) Open(
}

log.Infof(ctx, "creating engine with sticky in-memory VFS %s", spec.StickyVFSID)
engine := storage.InMemFromFS(ctx, fs, "", cluster.MakeClusterSettings(), options...)

// If this registry is configured to keep Engines open rather recovering
// from filesystem state, wrap the Engine within a *stickyInMemEngine
// wrapper so that calls to Close do not Close the underlying Engine.
if registry.cfg.reuseEngines {
wrappedEngine := &stickyInMemEngine{
id: spec.StickyVFSID,
closed: false,
Engine: engine,
fs: fs,
}
registry.engines[spec.StickyVFSID] = wrappedEngine
return wrappedEngine, nil
}
return engine, nil
return storage.InMemFromFS(ctx, fs, "", cluster.MakeClusterSettings(), options...), nil
}

func (registry *stickyVFSRegistryImpl) Get(spec base.StoreSpec) (vfs.FS, error) {
// Get implements the StickyVFSRegistry interface.
func (registry *stickyVFSRegistryImpl) Get(stickyVFSID string) vfs.FS {
registry.mu.Lock()
defer registry.mu.Unlock()

if fs, ok := registry.entries[spec.StickyVFSID]; ok {
return fs, nil
} else {
fs = vfs.NewMem()
registry.entries[spec.StickyVFSID] = fs
return fs, nil
if fs, ok := registry.entries[stickyVFSID]; ok {
return fs
}
}

// CloseAllEngines closes all open sticky in-memory engines that were
// created by this registry. Calling this method is required when using the
// ReuseEnginesDeprecated option.
func (registry *stickyVFSRegistryImpl) CloseAllEngines() {
registry.mu.Lock()
defer registry.mu.Unlock()

for id, engine := range registry.engines {
engine.closed = true
engine.Engine.Close()
delete(registry.entries, id)
delete(registry.engines, id)
}
}

// stickyInMemEngine extends a normal engine, but holds on to the Engine between
// Opens to allow subsequent Opens to reuse a previous Engine instance. This
// type is used only when a the StickyVFSRegistry is created with the
// ReuseEnginesDeprecated option.
//
// Engine.Close does not close the underlying Engine. The engine is kept open
// until CloseAllEngines is called, hence being "sticky".
type stickyInMemEngine struct {
// id is the unique identifier for this sticky engine.
id string
// closed indicates whether the current engine has been closed.
closed bool
// Engine extends the Engine interface.
storage.Engine
// Underlying in-mem filesystem backing the engine.
fs vfs.FS
}

// stickyInMemEngine implements Engine.
var _ storage.Engine = &stickyInMemEngine{}

// Close overwrites the default Engine interface to not close the underlying
// engine if called. We mark the state as closed to reflect a correct result
// in Closed().
func (e *stickyInMemEngine) Close() {
e.closed = true
}

// Closed overwrites the default Engine interface.
func (e *stickyInMemEngine) Closed() bool {
return e.closed
}

// SetStoreID implements the StoreIDSetter interface.
func (e *stickyInMemEngine) SetStoreID(ctx context.Context, storeID int32) error {
return e.Engine.SetStoreID(ctx, storeID)
fs := vfs.NewMem()
registry.entries[stickyVFSID] = fs
return fs
}
61 changes: 2 additions & 59 deletions pkg/server/sticky_vfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,62 +23,6 @@ import (
"github.com/stretchr/testify/require"
)

func TestStickyEngines(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
attrs := roachpb.Attributes{}
cacheSize := int64(1 << 20) /* 1 MiB */
storeSize := int64(512 << 20) /* 512 MiB */

registry := NewStickyVFSRegistry(ReuseEnginesDeprecated)

cfg1 := MakeConfig(ctx, cluster.MakeTestingClusterSettings())
cfg1.CacheSize = cacheSize
spec1 := base.StoreSpec{
StickyVFSID: "engine1",
Attributes: attrs,
Size: base.SizeSpec{InBytes: storeSize},
}
engine1, err := registry.Open(ctx, &cfg1, spec1)
require.NoError(t, err)
require.False(t, engine1.Closed())

cfg2 := MakeConfig(ctx, cluster.MakeTestingClusterSettings())
cfg2.CacheSize = cacheSize
spec2 := base.StoreSpec{
StickyVFSID: "engine2",
Attributes: attrs,
Size: base.SizeSpec{InBytes: storeSize},
}
engine2, err := registry.Open(ctx, &cfg2, spec2)
require.NoError(t, err)
require.False(t, engine2.Closed())

// Regetting the engine whilst it is not closed will fail.
_, err = registry.Open(ctx, &cfg1, spec1)
require.EqualError(t, err, "sticky engine engine1 has not been closed")

// Close the engine, which allows it to be refetched.
engine1.Close()
require.True(t, engine1.Closed())
require.False(t, engine1.(*stickyInMemEngine).Engine.Closed())

// Refetching the engine should give back the same engine.
engine1Refetched, err := registry.Open(ctx, &cfg1, spec1)
require.NoError(t, err)
require.Equal(t, engine1, engine1Refetched)
require.False(t, engine1.Closed())

// Cleaning up everything asserts everything is closed.
registry.CloseAllEngines()
for _, engine := range []storage.Engine{engine1, engine2} {
require.True(t, engine.Closed())
require.True(t, engine.(*stickyInMemEngine).Engine.Closed())
}
}

func TestStickyVFS(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -99,16 +43,15 @@ func TestStickyVFS(t *testing.T) {
}
engine1, err := registry.Open(ctx, &cfg1, spec1)
require.NoError(t, err)
fs1, err := registry.Get(spec1)
require.NoError(t, err)
fs1 := registry.Get(spec1.StickyVFSID)
require.False(t, engine1.Closed())
engine1.Close()

// Refetching the engine should give back a different engine with the same
// underlying fs.
engine2, err := registry.Open(ctx, &cfg1, spec1)
require.NoError(t, err)
fs2, err := registry.Get(spec1)
fs2 := registry.Get(spec1.StickyVFSID)
require.NoError(t, err)
require.NotEqual(t, engine1, engine2)
require.Equal(t, fs1, fs2)
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type TestingKnobs struct {
// StickyVFSRegistry manages the lifecycle of sticky in memory engines,
// which can be enabled via base.StoreSpec.StickyVFSID.
//
// When supplied to a TestCluster, StickyEngineIDs will be associated auto-
// When supplied to a TestCluster, StickyVFSIDs will be associated auto-
// matically to the StoreSpecs used.
StickyVFSRegistry StickyVFSRegistry
// WallClock is used to inject a custom clock for testing the server. It is
Expand Down
10 changes: 0 additions & 10 deletions pkg/storage/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,6 @@ var ForTesting ConfigOption = func(cfg *engineConfig) error {
return nil
}

// ForStickyEngineTesting is similar to ForTesting but leaves separated
// intents as enabled since we cannot ensure consistency in the test setup
// between what the KV layer thinks and what the engine does in terms of
// writing separated intents. Since our optimizations are for the case where
// we know there are only separated intents, this sidesteps any test issues
// due to inconsistencies.
var ForStickyEngineTesting ConfigOption = func(cfg *engineConfig) error {
return nil
}

// Attributes configures the engine's attributes.
func Attributes(attrs roachpb.Attributes) ConfigOption {
return func(cfg *engineConfig) error {
Expand Down

0 comments on commit 179bfc9

Please sign in to comment.