Skip to content

Commit

Permalink
db: ensure Open closes opened directories on error
Browse files Browse the repository at this point in the history
Previously, there were handful of places where an open directory was leaked if
Open returned an error. These were discovered by CockroachDB's goroutine leak
detector while bumping Pebble to include d79f961. The goroutine created by
OpenDir was never being closed.
  • Loading branch information
jbowens committed May 16, 2022
1 parent b8c9a56 commit 3e06ce3
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 48 deletions.
27 changes: 27 additions & 0 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,24 @@ func Open(dirname string, opts *Options) (db *DB, _ error) {
}
}

// Ensure we close resources if we error out early. If the database is
// successfully opened, the named return value `db` will be set to `d`.
defer func() {
if db != nil {
// The database was successfully opened.
return
}
if d.dataDir != nil {
d.dataDir.Close()
}
if d.walDirname != d.dirname && d.walDir != nil {
d.walDir.Close()
}
if d.mu.formatVers.marker != nil {
d.mu.formatVers.marker.Close()
}
}()

// Open the database and WAL directories first in order to check for their
// existence.
var err error
Expand Down Expand Up @@ -220,6 +238,15 @@ func Open(dirname string, opts *Options) (db *DB, _ error) {
// Find the currently active manifest, if there is one.
manifestMarker, manifestFileNum, exists, err := findCurrentManifest(d.mu.formatVers.vers, opts.FS, dirname)
setCurrent := setCurrentFunc(d.mu.formatVers.vers, manifestMarker, opts.FS, dirname, d.dataDir)
defer func() {
// Ensure we close the manifest marker if we error out for any reason.
// If the database is successfully opened, the *versionSet will take
// ownership over the manifest marker, ensuring it's closed when the DB
// is closed.
if db == nil {
manifestMarker.Close()
}
}()
if err != nil {
return nil, errors.Wrapf(err, "pebble: database %q", dirname)
} else if !exists && !d.opts.ReadOnly && !d.opts.ErrorIfNotExists {
Expand Down
176 changes: 128 additions & 48 deletions open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"path/filepath"
"reflect"
"runtime/debug"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -65,60 +66,71 @@ func TestOpenSharedTableCache(t *testing.T) {

func TestErrorIfExists(t *testing.T) {
for _, b := range [...]bool{false, true} {
mem := vfs.NewMem()
d0, err := Open("", testingRandomized(&Options{
FS: mem,
}))
if err != nil {
t.Errorf("b=%v: d0 Open: %v", b, err)
continue
}
if err := d0.Close(); err != nil {
t.Errorf("b=%v: d0 Close: %v", b, err)
continue
}
t.Run(fmt.Sprintf("%t", b), func(t *testing.T) {
mem := vfs.NewMem()
d0, err := Open("", testingRandomized(&Options{
FS: mem,
}))
if err != nil {
t.Errorf("b=%v: d0 Open: %v", b, err)
return
}
if err := d0.Close(); err != nil {
t.Errorf("b=%v: d0 Close: %v", b, err)
return
}

d1, err := Open("", testingRandomized(&Options{
FS: mem,
ErrorIfExists: b,
}))
if d1 != nil {
defer d1.Close()
}
if got := err != nil; got != b {
t.Errorf("b=%v: d1 Open: err is %v, got (err != nil) is %v, want %v", b, err, got, b)
continue
}
opts := testingRandomized(&Options{
FS: mem,
ErrorIfExists: b,
})
defer ensureFilesClosed(t, opts)()
d1, err := Open("", opts)
if d1 != nil {
defer d1.Close()
}
if got := err != nil; got != b {
t.Errorf("b=%v: d1 Open: err is %v, got (err != nil) is %v, want %v", b, err, got, b)
return
}
})
}
}

func TestErrorIfNotExists(t *testing.T) {
mem := vfs.NewMem()
_, err := Open("", testingRandomized(&Options{
FS: mem,
ErrorIfNotExists: true,
}))
if err == nil {
t.Fatalf("expected error, but found success")
} else if !strings.HasSuffix(err.Error(), oserror.ErrNotExist.Error()) {
t.Fatalf("expected not exists, but found %q", err)
}
t.Run("does-not-exist", func(t *testing.T) {
opts := testingRandomized(&Options{
FS: vfs.NewMem(),
ErrorIfNotExists: true,
})
defer ensureFilesClosed(t, opts)()

// Create the DB and try again.
d, err := Open("", testingRandomized(&Options{
FS: mem,
ErrorIfNotExists: false,
}))
require.NoError(t, err)
require.NoError(t, d.Close())
_, err := Open("", opts)
if err == nil {
t.Fatalf("expected error, but found success")
} else if !strings.HasSuffix(err.Error(), oserror.ErrNotExist.Error()) {
t.Fatalf("expected not exists, but found %q", err)
}
})

// The DB exists, so the setting of ErrorIfNotExists is a no-op.
d, err = Open("", testingRandomized(&Options{
FS: mem,
ErrorIfNotExists: true,
}))
require.NoError(t, err)
require.NoError(t, d.Close())
t.Run("does-exist", func(t *testing.T) {
opts := testingRandomized(&Options{
FS: vfs.NewMem(),
ErrorIfNotExists: false,
})
defer ensureFilesClosed(t, opts)()

// Create the DB and try again.
d, err := Open("", opts)
require.NoError(t, err)
require.NoError(t, d.Close())

opts.ErrorIfNotExists = true
// The DB exists, so the setting of ErrorIfNotExists is a no-op.
d, err = Open("", opts)
require.NoError(t, err)
require.NoError(t, d.Close())
})
}

func TestNewDBFilenames(t *testing.T) {
Expand Down Expand Up @@ -386,7 +398,7 @@ func TestOpenReadOnly(t *testing.T) {
if err == nil {
t.Fatalf("expected error, but found success")
}
const expected = "open-dir: \nopen-dir: non-existent-waldir"
const expected = "open-dir: \nopen-dir: non-existent-waldir\nclose:"
if trimmed := strings.TrimSpace(buf.String()); expected != trimmed {
t.Fatalf("expected %q, but found %q", expected, trimmed)
}
Expand Down Expand Up @@ -1044,3 +1056,71 @@ func TestOpen_ErrorIfUnknownFormatVersion(t *testing.T) {
require.Error(t, err)
require.EqualError(t, err, `pebble: database "" written in format major version 999999`)
}

// ensureFilesClosed updates the provided Options to wrap the filesystem. It
// returns a closure that when invoked fails the test if any files opened by the
// filesystem are not closed.
//
// This function is intended to be used in tests with defer.
//
// opts := &Options{FS: vfs.NewMem()}
// defer ensureFilesClosed(t, opts)()
// /* test code */
func ensureFilesClosed(t *testing.T, o *Options) func() {
fs := &closeTrackingFS{
FS: o.FS,
files: map[*closeTrackingFile]struct{}{},
}
o.FS = fs
return func() {
// fs.files should be empty if all the files were closed.
for f := range fs.files {
t.Errorf("An open file was never closed. Opened at:\n%s", f.stack)
}
}
}

type closeTrackingFS struct {
vfs.FS
files map[*closeTrackingFile]struct{}
}

func (fs *closeTrackingFS) wrap(file vfs.File, err error) (vfs.File, error) {
if err != nil {
return nil, err
}
f := &closeTrackingFile{
File: file,
fs: fs,
stack: debug.Stack(),
}
fs.files[f] = struct{}{}
return f, err
}

func (fs *closeTrackingFS) Create(name string) (vfs.File, error) {
return fs.wrap(fs.FS.Create(name))
}

func (fs *closeTrackingFS) Open(name string, opts ...vfs.OpenOption) (vfs.File, error) {
return fs.wrap(fs.FS.Open(name))
}

func (fs *closeTrackingFS) OpenDir(name string) (vfs.File, error) {
return fs.wrap(fs.FS.OpenDir(name))
}

func (fs *closeTrackingFS) ReuseForWrite(oldname, newname string) (vfs.File, error) {
return fs.wrap(fs.FS.ReuseForWrite(oldname, newname))
}

type closeTrackingFile struct {
vfs.File
fs *closeTrackingFS
stack []byte
}

func (f *closeTrackingFile) Close() error {
delete(f.fs.files, f)
return f.File.Close()
}

0 comments on commit 3e06ce3

Please sign in to comment.