Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: don't modify the given cfg.Opts #96770

Merged
merged 1 commit into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -1532,17 +1532,14 @@ func pebbleCryptoInitializer() error {
}
}

cfg := storage.PebbleConfig{
StorageConfig: storageConfig,
Opts: storage.DefaultPebbleOptions(),
}

// This has the side effect of storing the encrypted FS into cfg.Opts.FS.
_, _, err := storage.ResolveEncryptedEnvOptions(&cfg)
_, encryptedEnv, err := storage.ResolveEncryptedEnvOptions(&storageConfig, vfs.Default, false /* readOnly */)
if err != nil {
return err
}

pebbleToolFS.set(cfg.Opts.FS)
if encryptedEnv != nil {
pebbleToolFS.set(encryptedEnv.FS)
} else {
pebbleToolFS.set(vfs.Default)
}
return nil
}
117 changes: 59 additions & 58 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,10 +619,9 @@ func shortAttributeExtractorForValues(
}

// wrapFilesystemMiddleware wraps the Option's vfs.FS with disk-health checking
// and ENOSPC detection. It mutates the provided options to set the FS and
// returns a Closer that should be invoked when the filesystem will no longer be
// used.
func wrapFilesystemMiddleware(opts *pebble.Options) io.Closer {
// and ENOSPC detection. Returns the new FS and a Closer that should be invoked
// when the filesystem will no longer be used.
func wrapFilesystemMiddleware(opts *pebble.Options) (vfs.FS, io.Closer) {
// Set disk-health check interval to min(5s, maxSyncDurationDefault). This
// is mostly to ease testing; the default of 5s is too infrequent to test
// conveniently. See the disk-stalled roachtest for an example of how this
Expand All @@ -634,8 +633,7 @@ func wrapFilesystemMiddleware(opts *pebble.Options) io.Closer {
// Instantiate a file system with disk health checking enabled. This FS
// wraps the filesystem with a layer that times all write-oriented
// operations.
var closer io.Closer
opts.FS, closer = vfs.WithDiskHealthChecks(opts.FS, diskHealthCheckInterval,
fs, closer := vfs.WithDiskHealthChecks(opts.FS, diskHealthCheckInterval,
func(name string, opType vfs.OpType, duration time.Duration) {
opts.EventListener.DiskSlow(pebble.DiskSlowInfo{
Path: name,
Expand All @@ -644,10 +642,10 @@ func wrapFilesystemMiddleware(opts *pebble.Options) io.Closer {
})
})
// If we encounter ENOSPC, exit with an informative exit code.
opts.FS = vfs.OnDiskFull(opts.FS, func() {
fs = vfs.OnDiskFull(fs, func() {
exit.WithCode(exit.DiskFull())
})
return closer
return fs, closer
}

type pebbleLogger struct {
Expand Down Expand Up @@ -812,21 +810,23 @@ func (p *Pebble) SetStoreID(ctx context.Context, storeID int32) {
p.storeIDPebbleLog.Set(ctx, storeID)
}

// ResolveEncryptedEnvOptions fills in cfg.Opts.FS with an encrypted vfs if this
// store has encryption-at-rest enabled. Also returns the associated file
// registry and EncryptionStatsHandler.
func ResolveEncryptedEnvOptions(cfg *PebbleConfig) (*PebbleFileRegistry, *EncryptionEnv, error) {
fileRegistry := &PebbleFileRegistry{FS: cfg.Opts.FS, DBDir: cfg.Dir, ReadOnly: cfg.Opts.ReadOnly}
// ResolveEncryptedEnvOptions creates the EncryptionEnv and associated file
// registry if this store has encryption-at-rest enabled; otherwise returns a
// nil EncryptionEnv.
func ResolveEncryptedEnvOptions(
cfg *base.StorageConfig, fs vfs.FS, readOnly bool,
) (*PebbleFileRegistry, *EncryptionEnv, error) {
var fileRegistry *PebbleFileRegistry
if cfg.UseFileRegistry {
fileRegistry = &PebbleFileRegistry{FS: fs, DBDir: cfg.Dir, ReadOnly: readOnly}
if err := fileRegistry.Load(); err != nil {
return nil, nil, err
}
} else {
if err := fileRegistry.CheckNoRegistryFile(); err != nil {
if err := CheckNoRegistryFile(fs, cfg.Dir); err != nil {
return nil, nil, fmt.Errorf("encryption was used on this store before, but no encryption flags " +
"specified. You need a CCL build and must fully specify the --enterprise-encryption flag")
}
fileRegistry = nil
}

var env *EncryptionEnv
Expand All @@ -840,37 +840,40 @@ func ResolveEncryptedEnvOptions(cfg *PebbleConfig) (*PebbleFileRegistry, *Encryp
}
var err error
env, err = NewEncryptedEnvFunc(
cfg.Opts.FS,
fs,
fileRegistry,
cfg.Dir,
cfg.Opts.ReadOnly,
readOnly,
cfg.EncryptionOptions,
)
if err != nil {
return nil, nil, err
}
// TODO(jackson): Should this just return an EncryptionEnv,
// rather than mutating cfg.Opts?
cfg.Opts.FS = env.FS
}
return fileRegistry, env, nil
}

// NewPebble creates a new Pebble instance, at the specified path.
func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) {
// pebble.Open also calls EnsureDefaults, but only after doing a clone. Call
// EnsureDefaults beforehand so we have a matching cfg here for when we save
// cfg.FS and cfg.ReadOnly later on.
if cfg.Opts == nil {
cfg.Opts = DefaultPebbleOptions()
}
if cfg.Settings == nil {
return nil, errors.AssertionFailedf("NewPebble requires cfg.Settings to be set")
}

// Initialize the FS, wrapping it with disk health-checking and
// ENOSPC-detection.
filesystemCloser := wrapFilesystemMiddleware(cfg.Opts)
var opts *pebble.Options
if cfg.Opts == nil {
opts = DefaultPebbleOptions()
} else {
// Clone the given options so that we are free to modify them.
opts = cfg.Opts.Clone()
}

// pebble.Open also calls EnsureDefaults, but only after doing a clone. Call
// EnsureDefaults here to make sure we have a working FS.
opts.EnsureDefaults()

// Wrap the FS with disk health-checking and ENOSPC-detection.
var filesystemCloser io.Closer
opts.FS, filesystemCloser = wrapFilesystemMiddleware(opts)
defer func() {
if err != nil {
filesystemCloser.Close()
Expand All @@ -886,42 +889,40 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) {
logCtx = logtags.AddTag(logCtx, "s", storeIDContainer)
logCtx = logtags.AddTag(logCtx, "pebble", nil)

cfg.Opts.EnsureDefaults()
cfg.Opts.ErrorIfNotExists = cfg.MustExist
cfg.Opts.WALMinSyncInterval = func() time.Duration {
opts.ErrorIfNotExists = cfg.MustExist
opts.WALMinSyncInterval = func() time.Duration {
return minWALSyncInterval.Get(&cfg.Settings.SV)
}
cfg.Opts.Experimental.EnableValueBlocks = func() bool {
opts.Experimental.EnableValueBlocks = func() bool {
version := cfg.Settings.Version.ActiveVersionOrEmpty(logCtx)
return !version.Less(clusterversion.ByKey(
clusterversion.V23_1EnablePebbleFormatSSTableValueBlocks)) &&
valueBlocksEnabled.Get(&cfg.Settings.SV)
}

auxDir := cfg.Opts.FS.PathJoin(cfg.Dir, base.AuxiliaryDir)
if err := cfg.Opts.FS.MkdirAll(auxDir, 0755); err != nil {
auxDir := opts.FS.PathJoin(cfg.Dir, base.AuxiliaryDir)
if err := opts.FS.MkdirAll(auxDir, 0755); err != nil {
return nil, err
}
ballastPath := base.EmergencyBallastFile(cfg.Opts.FS.PathJoin, cfg.Dir)
ballastPath := base.EmergencyBallastFile(opts.FS.PathJoin, cfg.Dir)

// For some purposes, we want to always use an unencrypted
// filesystem. The call below to ResolveEncryptedEnvOptions will
// replace cfg.Opts.FS with a VFS wrapped with encryption-at-rest if
// necessary. Before we do that, save a handle on the unencrypted
// FS for those that need it. Some call sites need the unencrypted
// FS for the purpose of atomic renames.
unencryptedFS := cfg.Opts.FS
fileRegistry, env, err := ResolveEncryptedEnvOptions(&cfg)
// filesystem.
unencryptedFS := opts.FS
fileRegistry, encryptionEnv, err := ResolveEncryptedEnvOptions(&cfg.StorageConfig, opts.FS, opts.ReadOnly)
if err != nil {
return nil, err
}
if encryptionEnv != nil {
opts.FS = encryptionEnv.FS
}

// If no logger was passed, the previous call to `EnsureDefaults` on
// `cfg.Opts` will set the logger to pebble's `DefaultLogger`. In
// `opts` will set the logger to pebble's `DefaultLogger`. In
// crdb, we want pebble-related logs to go to the storage channel,
// so we update the logger here accordingly.
if cfg.Opts.Logger == nil || cfg.Opts.Logger == pebble.DefaultLogger {
cfg.Opts.Logger = pebbleLogger{
if opts.Logger == nil || opts.Logger == pebble.DefaultLogger {
opts.Logger = pebbleLogger{
ctx: logCtx,
depth: 1,
}
Expand All @@ -930,7 +931,7 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) {
// Establish the emergency ballast if we can. If there's not sufficient
// disk space, the ballast will be reestablished from Capacity when the
// store's capacity is queried periodically.
if !cfg.Opts.ReadOnly {
if !opts.ReadOnly {
du, err := unencryptedFS.GetDiskUsage(cfg.Dir)
// If the FS is an in-memory FS, GetDiskUsage returns
// vfs.ErrUnsupported and we skip ballast creation.
Expand All @@ -942,16 +943,16 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) {
return nil, errors.Wrap(err, "resizing ballast")
}
if resized {
cfg.Opts.Logger.Infof("resized ballast %s to size %s",
opts.Logger.Infof("resized ballast %s to size %s",
ballastPath, humanizeutil.IBytes(cfg.BallastSize))
}
}
}

storeProps := computeStoreProperties(ctx, cfg.Dir, cfg.Opts.ReadOnly, env != nil /* encryptionEnabled */)
storeProps := computeStoreProperties(ctx, cfg.Dir, opts.ReadOnly, encryptionEnv != nil /* encryptionEnabled */)

p = &Pebble{
readOnly: cfg.Opts.ReadOnly,
readOnly: opts.ReadOnly,
path: cfg.Dir,
auxDir: auxDir,
ballastPath: ballastPath,
Expand All @@ -960,11 +961,11 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) {
attrs: cfg.Attrs,
properties: storeProps,
settings: cfg.Settings,
encryption: env,
encryption: encryptionEnv,
fileRegistry: fileRegistry,
fs: cfg.Opts.FS,
fs: opts.FS,
unencryptedFS: unencryptedFS,
logger: cfg.Opts.Logger,
logger: opts.Logger,
logCtx: logCtx,
storeIDPebbleLog: storeIDContainer,
closer: filesystemCloser,
Expand All @@ -978,8 +979,8 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) {
// the compactions concurrency which has already been set and allow us
// to update the compactionConcurrency on the fly by changing the
// Pebble.atomic.compactionConcurrency variable.
p.atomic.compactionConcurrency = uint64(cfg.Opts.MaxConcurrentCompactions())
cfg.Opts.MaxConcurrentCompactions = func() int {
p.atomic.compactionConcurrency = uint64(opts.MaxConcurrentCompactions())
opts.MaxConcurrentCompactions = func() int {
return int(atomic.LoadUint64(&p.atomic.compactionConcurrency))
}

Expand Down Expand Up @@ -1015,7 +1016,7 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) {
)

p.eventListener = &el
cfg.Opts.EventListener = &el
opts.EventListener = &el
p.wrappedIntentWriter = wrapIntentWriter(p)

// Read the current store cluster version.
Expand Down Expand Up @@ -1051,10 +1052,10 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) {
}

if WorkloadCollectorEnabled {
p.replayer.Attach(cfg.Opts)
p.replayer.Attach(opts)
}

db, err := pebble.Open(cfg.StorageConfig.Dir, cfg.Opts)
db, err := pebble.Open(cfg.StorageConfig.Dir, opts)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/pebble_file_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ const (

// CheckNoRegistryFile checks that no registry file currently exists.
// CheckNoRegistryFile should be called if the file registry will not be used.
func (r *PebbleFileRegistry) CheckNoRegistryFile() error {
filename, err := atomicfs.ReadMarker(r.FS, r.DBDir, registryMarkerName)
func CheckNoRegistryFile(fs vfs.FS, dbDir string) error {
filename, err := atomicfs.ReadMarker(fs, dbDir, registryMarkerName)
if oserror.IsNotExist(err) {
// ReadMarker may return oserror.IsNotExist if the data
// directory does not exist.
Expand Down
12 changes: 4 additions & 8 deletions pkg/storage/pebble_file_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,11 @@ func TestFileRegistryCheckNoFile(t *testing.T) {
mem := vfs.NewMem()
fileEntry :=
&enginepb.FileEntry{EnvType: enginepb.EnvType_Data, EncryptionSettings: []byte("foo")}
require.NoError(t, CheckNoRegistryFile(mem, "" /* dbDir */))
registry := &PebbleFileRegistry{FS: mem}
require.NoError(t, registry.CheckNoRegistryFile())
require.NoError(t, registry.Load())
require.NoError(t, registry.SetFileEntry("/foo", fileEntry))
registry = &PebbleFileRegistry{FS: mem}
require.Error(t, registry.CheckNoRegistryFile())
require.Error(t, CheckNoRegistryFile(mem, "" /* dbDir */))
}

func TestFileRegistryElideUnencrypted(t *testing.T) {
Expand Down Expand Up @@ -293,8 +292,8 @@ func TestFileRegistryRecordsReadAndWrite(t *testing.T) {
}

// Create a file registry and add entries for a few files.
require.NoError(t, CheckNoRegistryFile(mem, "" /* dbDir */))
registry1 := &PebbleFileRegistry{FS: mem}
require.NoError(t, registry1.CheckNoRegistryFile())
require.NoError(t, registry1.Load())
for filename, entry := range files {
require.NoError(t, registry1.SetFileEntry(filename, entry))
Expand Down Expand Up @@ -332,10 +331,7 @@ func TestFileRegistry(t *testing.T) {
switch d.Cmd {
case "check-no-registry-file":
require.Nil(t, registry)
registry = &PebbleFileRegistry{FS: fs}
err := registry.CheckNoRegistryFile()
registry = nil
if err == nil {
if err := CheckNoRegistryFile(fs, "" /* dbDir */); err == nil {
fmt.Fprintf(&buf, "OK\n")
} else {
fmt.Fprintf(&buf, "Error: %s\n", err)
Expand Down