From 280ab898edeeab830d0610118ecc7271efae9e85 Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Thu, 2 Feb 2023 10:26:30 -0500 Subject: [PATCH] db: add Options.WithFSDefaults MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a facility for easily layering in the default VFS middleware—currently the disk-health checking FS. Options.EnsureDefaults by default uses the disk-health checking FS, but most of our tests explicitly set a VFS, and in particular a *vfs.MemFS. These tests have always run without the disk-health checking filesystem layer. Use the new WithFSDefaults method across many Pebble unit tests and the Pebble metamorphic tests. This is sufficient to surface the concurrent `Sync` operations observed in cockroachdb/cockroach#96422 and cockroachdb/cockroach#96414. See #2282 for context on where this panic is originating. --- cleaner_test.go | 4 ++-- format_major_version_test.go | 28 ++++++++++++++-------------- internal/metamorphic/meta_test.go | 2 ++ options.go | 27 ++++++++++++++++++--------- 4 files changed, 36 insertions(+), 25 deletions(-) diff --git a/cleaner_test.go b/cleaner_test.go index c953fc0f8f..29db44115f 100644 --- a/cleaner_test.go +++ b/cleaner_test.go @@ -25,11 +25,11 @@ func TestArchiveCleaner(t *testing.T) { var buf syncedBuffer mem := vfs.NewMem() - opts := &Options{ + opts := (&Options{ Cleaner: ArchiveCleaner{}, FS: loggingFS{mem, &buf}, WALDir: "wal", - } + }).WithFSDefaults() datadriven.RunTest(t, "testdata/cleaner", func(td *datadriven.TestData) string { switch td.Cmd { diff --git a/format_major_version_test.go b/format_major_version_test.go index 634ac59151..8dbc527622 100644 --- a/format_major_version_test.go +++ b/format_major_version_test.go @@ -32,7 +32,7 @@ func TestFormatMajorVersion_MigrationDefined(t *testing.T) { func TestRatchetFormat(t *testing.T) { fs := vfs.NewMem() - d, err := Open("", &Options{FS: fs}) + d, err := Open("", (&Options{FS: fs}).WithFSDefaults()) require.NoError(t, err) require.NoError(t, d.Set([]byte("foo"), []byte("bar"), Sync)) require.Equal(t, FormatMostCompatible, d.FormatMajorVersion()) @@ -60,7 +60,7 @@ func TestRatchetFormat(t *testing.T) { // If we Open the database again, leaving the default format, the // database should Open using the persisted FormatNewest. - d, err = Open("", &Options{FS: fs}) + d, err = Open("", (&Options{FS: fs}).WithFSDefaults()) require.NoError(t, err) require.Equal(t, FormatNewest, d.FormatMajorVersion()) require.NoError(t, d.Close()) @@ -71,10 +71,10 @@ func TestRatchetFormat(t *testing.T) { require.NoError(t, m.Move("999999")) require.NoError(t, m.Close()) - _, err = Open("", &Options{ + _, err = Open("", (&Options{ FS: fs, FormatMajorVersion: FormatVersioned, - }) + }).WithFSDefaults()) require.Error(t, err) require.EqualError(t, err, `pebble: database "" written in format major version 999999`) } @@ -105,10 +105,10 @@ func TestFormatMajorVersions(t *testing.T) { for vers := FormatMostCompatible; vers <= FormatNewest; vers++ { t.Run(fmt.Sprintf("vers=%03d", vers), func(t *testing.T) { fs := vfs.NewStrictMem() - opts := &Options{ + opts := (&Options{ FS: fs, FormatMajorVersion: vers, - } + }).WithFSDefaults() // Create a database at this format major version and perform // some very basic operations. @@ -251,7 +251,7 @@ func TestSplitUserKeyMigration(t *testing.T) { } buf.Reset() } - opts = &Options{ + opts = (&Options{ FormatMajorVersion: FormatBlockPropertyCollector, EventListener: EventListener{ CompactionEnd: func(info CompactionInfo) { @@ -263,7 +263,7 @@ func TestSplitUserKeyMigration(t *testing.T) { }, }, DisableAutomaticCompactions: true, - } + }).WithFSDefaults() var err error if d, err = runDBDefineCmd(td, opts); err != nil { return err.Error() @@ -359,10 +359,10 @@ func TestPebblev1Migration(t *testing.T) { return fmt.Sprintf("unknown argument: %s", cmd) } } - opts := &Options{ + opts := (&Options{ FS: vfs.NewMem(), FormatMajorVersion: FormatMajorVersion(version), - } + }).WithFSDefaults() d, err = Open("", opts) if err != nil { return err.Error() @@ -480,13 +480,13 @@ func TestPebblev1MigrationRace(t *testing.T) { defer cache.Unref() tableCache := NewTableCache(cache, 1, 5) defer tableCache.Unref() - d, err := Open("", &Options{ + d, err := Open("", (&Options{ Cache: cache, FS: vfs.NewMem(), FormatMajorVersion: FormatMajorVersion(FormatPrePebblev1Marked - 1), TableCache: tableCache, Levels: []LevelOptions{{TargetFileSize: 1}}, - }) + }).WithFSDefaults()) require.NoError(t, err) defer d.Close() @@ -525,7 +525,7 @@ func TestPebblev1MigrationRace(t *testing.T) { // Regression test for #2044, where multiple concurrent compactions can lead // to an indefinite wait on the compaction goroutine in compactMarkedFilesLocked. func TestPebblev1MigrationConcurrencyRace(t *testing.T) { - opts := &Options{ + opts := (&Options{ Comparer: testkeys.Comparer, FS: vfs.NewMem(), FormatMajorVersion: FormatSplitUserKeysMarked, @@ -533,7 +533,7 @@ func TestPebblev1MigrationConcurrencyRace(t *testing.T) { MaxConcurrentCompactions: func() int { return 4 }, - } + }).WithFSDefaults() func() { d, err := Open("", opts) require.NoError(t, err) diff --git a/internal/metamorphic/meta_test.go b/internal/metamorphic/meta_test.go index 18be696c55..a704297d78 100644 --- a/internal/metamorphic/meta_test.go +++ b/internal/metamorphic/meta_test.go @@ -166,6 +166,8 @@ func testMetaRun(t *testing.T, runDir string, seed uint64, historyPath string) { opts.FS = vfs.NewMem() } } + opts.WithFSDefaults() + threads := testOpts.threads if *maxThreads < threads { threads = *maxThreads diff --git a/options.go b/options.go index 4e3d577527..a06a234160 100644 --- a/options.go +++ b/options.go @@ -224,7 +224,7 @@ func (o *IterOptions) getLogger() Logger { // Specifically, when configured with a RangeKeyMasking.Suffix _s_, and there // exists a range key with suffix _r_ covering a point key with suffix _p_, and // -// _s_ ≤ _r_ < _p_ +// _s_ ≤ _r_ < _p_ // // then the point key is elided. // @@ -927,15 +927,8 @@ func (o *Options) EnsureDefaults() *Options { if o.FormatMajorVersion == FormatDefault { o.FormatMajorVersion = FormatMostCompatible } - if o.FS == nil { - o.FS, o.private.fsCloser = vfs.WithDiskHealthChecks(vfs.Default, 5*time.Second, - func(name string, duration time.Duration) { - o.EventListener.DiskSlow(DiskSlowInfo{ - Path: name, - Duration: duration, - }) - }) + o.WithFSDefaults() } if o.FlushSplitBytes <= 0 { o.FlushSplitBytes = 2 * o.Levels[0].TargetFileSize @@ -960,6 +953,22 @@ func (o *Options) EnsureDefaults() *Options { return o } +// WithFSDefaults configures the Options to wrap the configured filesystem with +// the default virtual file system middleware, like disk-health checking. +func (o *Options) WithFSDefaults() *Options { + if o.FS == nil { + o.FS = vfs.Default + } + o.FS, o.private.fsCloser = vfs.WithDiskHealthChecks(o.FS, 5*time.Second, + func(name string, duration time.Duration) { + o.EventListener.DiskSlow(DiskSlowInfo{ + Path: name, + Duration: duration, + }) + }) + return o +} + func (o *Options) equal() Equal { if o.Comparer.Equal == nil { return bytes.Equal