diff --git a/db.go b/db.go index e7df476e91..0a9bf31b69 100644 --- a/db.go +++ b/db.go @@ -244,7 +244,7 @@ type DB struct { // The on-disk size of the current OPTIONS file. optionsFileSize uint64 - fileLock io.Closer + fileLock *Lock dataDir vfs.File walDir vfs.File @@ -1668,14 +1668,14 @@ func (d *DB) SSTables(opts ...SSTablesOption) ([][]SSTableInfo, error) { // EstimateDiskUsage returns the estimated filesystem space used in bytes for // storing the range `[start, end]`. The estimation is computed as follows: // -// - For sstables fully contained in the range the whole file size is included. -// - For sstables partially contained in the range the overlapping data block sizes -// are included. Even if a data block partially overlaps, or we cannot determine -// overlap due to abbreviated index keys, the full data block size is included in -// the estimation. Note that unlike fully contained sstables, none of the -// meta-block space is counted for partially overlapped files. -// - There may also exist WAL entries for unflushed keys in this range. This -// estimation currently excludes space used for the range in the WAL. +// - For sstables fully contained in the range the whole file size is included. +// - For sstables partially contained in the range the overlapping data block sizes +// are included. Even if a data block partially overlaps, or we cannot determine +// overlap due to abbreviated index keys, the full data block size is included in +// the estimation. Note that unlike fully contained sstables, none of the +// meta-block space is counted for partially overlapped files. +// - There may also exist WAL entries for unflushed keys in this range. This +// estimation currently excludes space used for the range in the WAL. func (d *DB) EstimateDiskUsage(start, end []byte) (uint64, error) { if err := d.closed.Load(); err != nil { panic(err) diff --git a/open.go b/open.go index dbb7c51d67..5db79d74aa 100644 --- a/open.go +++ b/open.go @@ -198,16 +198,25 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { } // Lock the database directory. - fileLock, err := opts.FS.Lock(base.MakeFilepath(opts.FS, dirname, fileTypeLock, 0)) - if err != nil { - d.dataDir.Close() - if d.dataDir != d.walDir { - d.walDir.Close() + var fileLock *Lock + if opts.Lock != nil { + // The caller already acquired the database lock. Ensure that the + // directory matches. + if dirname != opts.Lock.dirname { + return nil, errors.Newf("pebble: opts.Lock acquired in %q not %q", opts.Lock.dirname, dirname) + } + if err := opts.Lock.refForOpen(); err != nil { + return nil, err + } + fileLock = opts.Lock + } else { + fileLock, err = LockDirectory(dirname, opts.FS) + if err != nil { + return nil, err } - return nil, err } defer func() { - if fileLock != nil { + if db == nil { fileLock.Close() } }() @@ -785,3 +794,67 @@ func Peek(dirname string, fs vfs.FS) (*DBDesc, error) { } return desc, nil } + +// LockDirectory acquires the database directory lock in the named directory, +// preventing another process from opening the database. LockDirectory returns a +// handle to the held lock that may be passed to Open through Options.Lock to +// subsequently open the database, skipping lock acquistion during Open. +// +// LockDirectory may be used to expand the critical section protected by the +// database lock to include setup before the call to Open. +func LockDirectory(dirname string, fs vfs.FS) (*Lock, error) { + fileLock, err := fs.Lock(base.MakeFilepath(fs, dirname, fileTypeLock, 0)) + if err != nil { + return nil, err + } + l := &Lock{dirname: dirname, fileLock: fileLock} + atomic.StoreInt32(&l.atomic.refs, 1) + invariants.SetFinalizer(l, func(obj interface{}) { + if refs := atomic.LoadInt32(&l.atomic.refs); refs > 0 { + panic(errors.AssertionFailedf("lock for %q finalized with %d refs", dirname, refs)) + } + }) + return l, nil +} + +// Lock represents a file lock on a directory. It may be passed to Open through +// Options.Lock to elide lock aquisition during Open. +type Lock struct { + dirname string + fileLock io.Closer + atomic struct { + // refs is a count of the number of handles on the lock. refs must be 0, 1 + // or 2. It must be accessed atomically. + // + // When acquired by the client and passed to Open, refs = 1 and the Open + // call increments it to 2. When the database is closed, it's decremented to + // 1. Finally when the original caller, calls Close on the Lock, it's + // drecemented to zero and the underlying file lock is released. + // + // When Open acquires the file lock, refs remains at 1 until the database is + // closed. + refs int32 + } +} + +func (l *Lock) refForOpen() error { + // During Open, when a user passed in a lock, the reference count must be + // exactly 1. If it's zero, the lock is no longer held and is invalid. If + // it's 2, the lock is already in use by another database within the + // process. + if !atomic.CompareAndSwapInt32(&l.atomic.refs, 1, 2) { + return errors.Errorf("pebble: unexpected Lock reference count; is the lock already in use?") + } + return nil +} + +// Close releases the lock, permitting another process to lock and open the +// database. Close must not be called until after a database using the Lock has +// been closed. +func (l *Lock) Close() error { + if atomic.AddInt32(&l.atomic.refs, -1) > 0 { + return nil + } + defer func() { l.fileLock = nil }() + return l.fileLock.Close() +} diff --git a/open_test.go b/open_test.go index bf518ff178..70cea575c6 100644 --- a/open_test.go +++ b/open_test.go @@ -133,6 +133,42 @@ func TestErrorIfNotExists(t *testing.T) { }) } +func TestOpenAlreadyLocked(t *testing.T) { + runTest := func(t *testing.T, dirname string, fs vfs.FS) { + opts := testingRandomized(&Options{FS: fs}) + var err error + opts.Lock, err = LockDirectory(dirname, fs) + require.NoError(t, err) + + d, err := Open(dirname, opts) + require.NoError(t, err) + require.NoError(t, d.Set([]byte("foo"), []byte("bar"), Sync)) + + // Try to open the same database reusing the Options containing the same + // Lock. It should error when it observes that it's already referenced. + _, err = Open(dirname, opts) + require.Error(t, err) + + // Close the database. + require.NoError(t, d.Close()) + + // Now Opening should succeed again. + d, err = Open(dirname, opts) + require.NoError(t, err) + require.NoError(t, d.Close()) + + require.NoError(t, opts.Lock.Close()) + // There should be no more remaining references. + require.Equal(t, int32(0), atomic.LoadInt32(&opts.Lock.atomic.refs)) + } + t.Run("memfs", func(t *testing.T) { + runTest(t, "", vfs.NewMem()) + }) + t.Run("disk", func(t *testing.T) { + runTest(t, t.TempDir(), vfs.Default) + }) +} + func TestNewDBFilenames(t *testing.T) { versions := map[FormatMajorVersion][]string{ FormatMostCompatible: { @@ -909,10 +945,11 @@ func TestCrashOpenCrashAfterWALCreation(t *testing.T) { } // TestOpenWALReplayReadOnlySeqNums tests opening a database: -// * in read-only mode -// * with multiple unflushed log files that must replayed -// * a MANIFEST that sets the last sequence number to a number greater than -// the unflushed log files +// - in read-only mode +// - with multiple unflushed log files that must replayed +// - a MANIFEST that sets the last sequence number to a number greater than +// the unflushed log files +// // See cockroachdb/cockroach#48660. func TestOpenWALReplayReadOnlySeqNums(t *testing.T) { const root = "" @@ -1121,9 +1158,9 @@ func TestOpen_ErrorIfUnknownFormatVersion(t *testing.T) { // // This function is intended to be used in tests with defer. // -// opts := &Options{FS: vfs.NewMem()} -// defer ensureFilesClosed(t, opts)() -// /* test code */ +// opts := &Options{FS: vfs.NewMem()} +// defer ensureFilesClosed(t, opts)() +// /* test code */ func ensureFilesClosed(t *testing.T, o *Options) func() { fs := &closeTrackingFS{ FS: o.FS, diff --git a/options.go b/options.go index a06a234160..158d4045d8 100644 --- a/options.go +++ b/options.go @@ -635,6 +635,17 @@ type Options struct { // The default value uses the underlying operating system's file system. FS vfs.FS + // Lock, if set, must be a database lock acquired through LockDirectory for + // the same directory passed to Open. If provided, Open will skip locking + // the directory. Closing the database will not release the lock, and it's + // the responsibility of the caller to release the lock after closing the + // database. + // + // Open will enforce that the Lock passed locks the same directory passed to + // Open. Concurrent calls to Open using the same Lock are detected and + // prohibited. + Lock *Lock + // The count of L0 files necessary to trigger an L0 compaction. L0CompactionFileThreshold int