Skip to content

Commit

Permalink
db: add Options.WithFSDefaults
Browse files Browse the repository at this point in the history
Add a facility for easily layering in the default VFS middleware—currently the
disk-health checking FS. Options.EnsureDefaults by default uses the disk-health
checking FS, but most of our tests explicitly set a VFS, and in particular a
*vfs.MemFS. These tests have always run without the disk-health checking
filesystem layer. Use the new WithFSDefaults method across many Pebble unit
tests and the Pebble metamorphic tests.

This is sufficient to surface the concurrent `Sync` operations observed in
cockroachdb/cockroach#96422 and cockroachdb/cockroach#96414. See #2282 for
context on where this panic is originating.
  • Loading branch information
jbowens committed Feb 6, 2023
1 parent 3ec3d78 commit ed6834a
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 25 deletions.
4 changes: 2 additions & 2 deletions cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ func TestArchiveCleaner(t *testing.T) {

var buf syncedBuffer
mem := vfs.NewMem()
opts := &Options{
opts := (&Options{
Cleaner: ArchiveCleaner{},
FS: loggingFS{mem, &buf},
WALDir: "wal",
}
}).WithFSDefaults()

datadriven.RunTest(t, "testdata/cleaner", func(td *datadriven.TestData) string {
switch td.Cmd {
Expand Down
28 changes: 14 additions & 14 deletions format_major_version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestFormatMajorVersion_MigrationDefined(t *testing.T) {

func TestRatchetFormat(t *testing.T) {
fs := vfs.NewMem()
d, err := Open("", &Options{FS: fs})
d, err := Open("", (&Options{FS: fs}).WithFSDefaults())
require.NoError(t, err)
require.NoError(t, d.Set([]byte("foo"), []byte("bar"), Sync))
require.Equal(t, FormatMostCompatible, d.FormatMajorVersion())
Expand Down Expand Up @@ -60,7 +60,7 @@ func TestRatchetFormat(t *testing.T) {

// If we Open the database again, leaving the default format, the
// database should Open using the persisted FormatNewest.
d, err = Open("", &Options{FS: fs})
d, err = Open("", (&Options{FS: fs}).WithFSDefaults())
require.NoError(t, err)
require.Equal(t, FormatNewest, d.FormatMajorVersion())
require.NoError(t, d.Close())
Expand All @@ -71,10 +71,10 @@ func TestRatchetFormat(t *testing.T) {
require.NoError(t, m.Move("999999"))
require.NoError(t, m.Close())

_, err = Open("", &Options{
_, err = Open("", (&Options{
FS: fs,
FormatMajorVersion: FormatVersioned,
})
}).WithFSDefaults())
require.Error(t, err)
require.EqualError(t, err, `pebble: database "" written in format major version 999999`)
}
Expand Down Expand Up @@ -105,10 +105,10 @@ func TestFormatMajorVersions(t *testing.T) {
for vers := FormatMostCompatible; vers <= FormatNewest; vers++ {
t.Run(fmt.Sprintf("vers=%03d", vers), func(t *testing.T) {
fs := vfs.NewStrictMem()
opts := &Options{
opts := (&Options{
FS: fs,
FormatMajorVersion: vers,
}
}).WithFSDefaults()

// Create a database at this format major version and perform
// some very basic operations.
Expand Down Expand Up @@ -251,7 +251,7 @@ func TestSplitUserKeyMigration(t *testing.T) {
}
buf.Reset()
}
opts = &Options{
opts = (&Options{
FormatMajorVersion: FormatBlockPropertyCollector,
EventListener: EventListener{
CompactionEnd: func(info CompactionInfo) {
Expand All @@ -263,7 +263,7 @@ func TestSplitUserKeyMigration(t *testing.T) {
},
},
DisableAutomaticCompactions: true,
}
}).WithFSDefaults()
var err error
if d, err = runDBDefineCmd(td, opts); err != nil {
return err.Error()
Expand Down Expand Up @@ -359,10 +359,10 @@ func TestPebblev1Migration(t *testing.T) {
return fmt.Sprintf("unknown argument: %s", cmd)
}
}
opts := &Options{
opts := (&Options{
FS: vfs.NewMem(),
FormatMajorVersion: FormatMajorVersion(version),
}
}).WithFSDefaults()
d, err = Open("", opts)
if err != nil {
return err.Error()
Expand Down Expand Up @@ -480,13 +480,13 @@ func TestPebblev1MigrationRace(t *testing.T) {
defer cache.Unref()
tableCache := NewTableCache(cache, 1, 5)
defer tableCache.Unref()
d, err := Open("", &Options{
d, err := Open("", (&Options{
Cache: cache,
FS: vfs.NewMem(),
FormatMajorVersion: FormatMajorVersion(FormatPrePebblev1Marked - 1),
TableCache: tableCache,
Levels: []LevelOptions{{TargetFileSize: 1}},
})
}).WithFSDefaults())
require.NoError(t, err)
defer d.Close()

Expand Down Expand Up @@ -525,15 +525,15 @@ func TestPebblev1MigrationRace(t *testing.T) {
// Regression test for #2044, where multiple concurrent compactions can lead
// to an indefinite wait on the compaction goroutine in compactMarkedFilesLocked.
func TestPebblev1MigrationConcurrencyRace(t *testing.T) {
opts := &Options{
opts := (&Options{
Comparer: testkeys.Comparer,
FS: vfs.NewMem(),
FormatMajorVersion: FormatSplitUserKeysMarked,
Levels: []LevelOptions{{FilterPolicy: bloom.FilterPolicy(10)}},
MaxConcurrentCompactions: func() int {
return 4
},
}
}).WithFSDefaults()
func() {
d, err := Open("", opts)
require.NoError(t, err)
Expand Down
2 changes: 2 additions & 0 deletions internal/metamorphic/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ func testMetaRun(t *testing.T, runDir string, seed uint64, historyPath string) {
opts.FS = vfs.NewMem()
}
}
opts.WithFSDefaults()

threads := testOpts.threads
if *maxThreads < threads {
threads = *maxThreads
Expand Down
27 changes: 18 additions & 9 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (o *IterOptions) getLogger() Logger {
// Specifically, when configured with a RangeKeyMasking.Suffix _s_, and there
// exists a range key with suffix _r_ covering a point key with suffix _p_, and
//
// _s_ ≤ _r_ < _p_
// _s_ ≤ _r_ < _p_
//
// then the point key is elided.
//
Expand Down Expand Up @@ -927,15 +927,8 @@ func (o *Options) EnsureDefaults() *Options {
if o.FormatMajorVersion == FormatDefault {
o.FormatMajorVersion = FormatMostCompatible
}

if o.FS == nil {
o.FS, o.private.fsCloser = vfs.WithDiskHealthChecks(vfs.Default, 5*time.Second,
func(name string, duration time.Duration) {
o.EventListener.DiskSlow(DiskSlowInfo{
Path: name,
Duration: duration,
})
})
o.WithFSDefaults()
}
if o.FlushSplitBytes <= 0 {
o.FlushSplitBytes = 2 * o.Levels[0].TargetFileSize
Expand All @@ -960,6 +953,22 @@ func (o *Options) EnsureDefaults() *Options {
return o
}

// WithFSDefaults configures the Options to wrap the configured filesystem with
// the default virtual file system middleware, like disk-health checking.
func (o *Options) WithFSDefaults() *Options {
if o.FS == nil {
o.FS = vfs.Default
}
o.FS, o.private.fsCloser = vfs.WithDiskHealthChecks(o.FS, 5*time.Second,
func(name string, duration time.Duration) {
o.EventListener.DiskSlow(DiskSlowInfo{
Path: name,
Duration: duration,
})
})
return o
}

func (o *Options) equal() Equal {
if o.Comparer.Equal == nil {
return bytes.Equal
Expand Down

0 comments on commit ed6834a

Please sign in to comment.