From be00bccdcc4f2652631fb5077f07ea1fac813e81 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Thu, 22 Jun 2023 13:57:00 -0700 Subject: [PATCH] db: refactor obsolete file deletion code The code around obsolete file deletion is very convoluted and there is one known race (described in #2630). This change reworks the deletion code as follows: - we add a cleanup manager which has a background goroutine. - all file and object deletions happen in the manager's background goroutine. - callers can enqueue cleanup jobs with the manager - tests can wait for outstanding cleanup jobs to finish We also add a missing call to `deleteObsoleteObjects` from the ingest path (which now can cause files to be removed through excising). Finally, we fix an incorrect `Remove` call in the error path of `Checkpoint()` (which was missing a `PathJoin`). Fixes #2630. --- checkpoint.go | 6 +- checkpoint_test.go | 3 +- cleaner.go | 240 ++++++++++++++++++++++- cleaner_test.go | 2 + compaction.go | 238 +++++----------------- compaction_test.go | 49 ----- db.go | 53 ++--- db_test.go | 1 + event_listener_test.go | 2 + flush_test.go | 12 -- ingest.go | 3 + ingest_test.go | 3 + metrics_test.go | 9 +- open.go | 12 +- open_test.go | 1 + pacer.go | 35 +--- pacer_test.go | 142 +++++--------- read_state.go | 10 +- testdata/compaction_pacer_maybe_throttle | 98 --------- testdata/event_listener | 4 +- testdata/manual_flush | 20 -- version_set_test.go | 1 + 22 files changed, 388 insertions(+), 556 deletions(-) delete mode 100644 testdata/compaction_pacer_maybe_throttle diff --git a/checkpoint.go b/checkpoint.go index f395d0351c..94c9e6f3f5 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -215,11 +215,7 @@ func (d *DB) Checkpoint( } if ckErr != nil { // Attempt to cleanup on error. - paths, _ := fs.List(destDir) - for _, path := range paths { - _ = fs.Remove(path) - } - _ = fs.Remove(destDir) + _ = fs.RemoveAll(destDir) } }() dir, ckErr = mkdirAllAndSyncParents(fs, destDir) diff --git a/checkpoint_test.go b/checkpoint_test.go index f28fbeadf7..191737744e 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -87,6 +87,7 @@ func TestCheckpoint(t *testing.T) { if err := d.Compact(nil, []byte("\xff"), false); err != nil { return err.Error() } + d.TestOnlyWaitForCleaning() return memLog.String() case "flush": @@ -189,7 +190,7 @@ func TestCheckpointCompaction(t *testing.T) { defer close(check) defer wg.Done() for i := 0; ctx.Err() == nil && i < 200; i++ { - dir := fmt.Sprintf("checkpoint%6d", i) + dir := fmt.Sprintf("checkpoint%06d", i) if err := d.Checkpoint(dir); err != nil { t.Error(err) return diff --git a/cleaner.go b/cleaner.go index 384bff512b..03557ce3ee 100644 --- a/cleaner.go +++ b/cleaner.go @@ -4,7 +4,18 @@ package pebble -import "github.com/cockroachdb/pebble/internal/base" +import ( + "context" + "runtime/pprof" + "sync" + "time" + + "github.com/cockroachdb/errors/oserror" + "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/internal/invariants" + "github.com/cockroachdb/pebble/objstorage" + "github.com/cockroachdb/tokenbucket" +) // Cleaner exports the base.Cleaner type. type Cleaner = base.Cleaner @@ -14,3 +25,230 @@ type DeleteCleaner = base.DeleteCleaner // ArchiveCleaner exports the base.ArchiveCleaner type. type ArchiveCleaner = base.ArchiveCleaner + +type cleanupManager struct { + opts *Options + objProvider objstorage.Provider + onTableDeleteFn func(fileSize uint64) + deletePacer *deletionPacer + + // jobsCh is used as the cleanup job queue. + jobsCh chan *cleanupJob + // waitGroup is used to wait for the background goroutine to exit. + waitGroup sync.WaitGroup + + mu struct { + sync.Mutex + queuedJobs int + completedJobs int + completedJobsCond sync.Cond + } +} + +// In practice, we should rarely have more than a couple of jobs (in most cases +// we Wait() after queueing a job). +const jobsChLen = 1000 + +// obsoleteFile holds information about a file that needs to be deleted soon. +type obsoleteFile struct { + dir string + fileNum base.DiskFileNum + fileType fileType + fileSize uint64 +} + +type cleanupJob struct { + jobID int + obsoleteFiles []obsoleteFile +} + +// openCleanupManager creates a cleanupManager and starts its background goroutine. +// The cleanupManager must be Close()d. +func openCleanupManager( + opts *Options, + objProvider objstorage.Provider, + onTableDeleteFn func(fileSize uint64), + getDeletePacerInfo func() deletionPacerInfo, +) *cleanupManager { + cm := &cleanupManager{ + opts: opts, + objProvider: objProvider, + onTableDeleteFn: onTableDeleteFn, + deletePacer: newDeletionPacer(getDeletePacerInfo), + jobsCh: make(chan *cleanupJob, jobsChLen), + } + cm.mu.completedJobsCond.L = &cm.mu.Mutex + cm.waitGroup.Add(1) + + go func() { + pprof.Do(context.Background(), gcLabels, func(context.Context) { + cm.mainLoop() + }) + }() + + return cm +} + +// Close stops the background goroutine, waiting until all queued jobs are completed. +// Delete pacing is disabled for the remaining jobs. +func (cm *cleanupManager) Close() { + close(cm.jobsCh) + cm.waitGroup.Wait() +} + +// EnqueueJob adds a cleanup job to the manager's queue. +func (cm *cleanupManager) EnqueueJob(jobID int, obsoleteFiles []obsoleteFile) { + job := &cleanupJob{ + jobID: jobID, + obsoleteFiles: obsoleteFiles, + } + cm.mu.Lock() + cm.mu.queuedJobs++ + cm.mu.Unlock() + + if invariants.Enabled && len(cm.jobsCh) >= cap(cm.jobsCh)-2 { + panic("cleanup jobs queue full") + } + cm.jobsCh <- job +} + +// Wait until the completion of all jobs that were already queued. +// +// Does not wait for jobs that are enqueued during the call. +// +// Note that DB.mu should not be held while calling this method; the background +// goroutine needs to acquire DB.mu to update deleted table metrics. +func (cm *cleanupManager) Wait() { + cm.mu.Lock() + defer cm.mu.Unlock() + n := cm.mu.queuedJobs + for cm.mu.completedJobs < n { + cm.mu.completedJobsCond.Wait() + } +} + +// mainLoop runs the manager's background goroutine. +func (cm *cleanupManager) mainLoop() { + defer cm.waitGroup.Done() + useLimiter := false + var limiter tokenbucket.TokenBucket + + if r := cm.opts.TargetByteDeletionRate; r != 0 { + useLimiter = true + limiter.Init(tokenbucket.TokensPerSecond(r), tokenbucket.Tokens(r)) + } + + for job := range cm.jobsCh { + for _, of := range job.obsoleteFiles { + if of.fileType != fileTypeTable { + path := base.MakeFilepath(cm.opts.FS, of.dir, of.fileType, of.fileNum) + cm.deleteObsoleteFile(of.fileType, job.jobID, path, of.fileNum, of.fileSize) + } else { + if useLimiter { + cm.maybePace(&limiter, of.fileType, of.fileNum, of.fileSize) + } + cm.onTableDeleteFn(of.fileSize) + cm.deleteObsoleteObject(fileTypeTable, job.jobID, of.fileNum) + } + } + cm.mu.Lock() + cm.mu.completedJobs++ + cm.mu.completedJobsCond.Broadcast() + cm.mu.Unlock() + } +} + +// maybePace sleeps before deleting an object if appropriate. It is always +// called from the background goroutine. +func (cm *cleanupManager) maybePace( + limiter *tokenbucket.TokenBucket, + fileType base.FileType, + fileNum base.DiskFileNum, + fileSize uint64, +) { + meta, err := cm.objProvider.Lookup(fileType, fileNum) + if err != nil { + // The object was already removed from the provider; we won't actually + // delete anything, so we don't need to pace. + return + } + if meta.IsShared() { + // Don't throttle deletion of shared objects. + return + } + if !cm.deletePacer.shouldPace() { + // The deletion pacer decided that we shouldn't throttle; account + // for the operation but don't wait for tokens. + limiter.Adjust(-tokenbucket.Tokens(fileSize)) + return + } + // Wait for tokens. + for { + ok, d := limiter.TryToFulfill(tokenbucket.Tokens(fileSize)) + if ok { + break + } + time.Sleep(d) + } +} + +// deleteObsoleteFile deletes a (non-object) file that is no longer needed. +func (cm *cleanupManager) deleteObsoleteFile( + fileType fileType, jobID int, path string, fileNum base.DiskFileNum, fileSize uint64, +) { + // TODO(peter): need to handle this error, probably by re-adding the + // file that couldn't be deleted to one of the obsolete slices map. + err := cm.opts.Cleaner.Clean(cm.opts.FS, fileType, path) + if oserror.IsNotExist(err) { + return + } + + switch fileType { + case fileTypeLog: + cm.opts.EventListener.WALDeleted(WALDeleteInfo{ + JobID: jobID, + Path: path, + FileNum: fileNum.FileNum(), + Err: err, + }) + case fileTypeManifest: + cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{ + JobID: jobID, + Path: path, + FileNum: fileNum.FileNum(), + Err: err, + }) + case fileTypeTable: + panic("invalid deletion of object file") + } +} + +func (cm *cleanupManager) deleteObsoleteObject( + fileType fileType, jobID int, fileNum base.DiskFileNum, +) { + if fileType != fileTypeTable { + panic("not an object") + } + + var path string + meta, err := cm.objProvider.Lookup(fileType, fileNum) + if err != nil { + path = "" + } else { + path = cm.objProvider.Path(meta) + err = cm.objProvider.Remove(fileType, fileNum) + } + if cm.objProvider.IsNotExistError(err) { + return + } + + switch fileType { + case fileTypeTable: + cm.opts.EventListener.TableDeleted(TableDeleteInfo{ + JobID: jobID, + Path: path, + FileNum: fileNum.FileNum(), + Err: err, + }) + } +} diff --git a/cleaner_test.go b/cleaner_test.go index 762745156a..db7017a448 100644 --- a/cleaner_test.go +++ b/cleaner_test.go @@ -113,6 +113,8 @@ func TestCleaner(t *testing.T) { if err != nil { return err.Error() } + d.TestOnlyWaitForCleaning() + d.testingAlwaysWaitForCleanup = true dbs[dir] = d return memLog.String() diff --git a/compaction.go b/compaction.go index 8c4f36d742..897aaae418 100644 --- a/compaction.go +++ b/compaction.go @@ -16,7 +16,6 @@ import ( "time" "github.com/cockroachdb/errors" - "github.com/cockroachdb/errors/oserror" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manifest" @@ -1712,6 +1711,14 @@ func (d *DB) getDeletionPacerInfo() deletionPacerInfo { return pacerInfo } +// onObsoleteTableDelete is called to update metrics when an sstable is deleted. +func (d *DB) onObsoleteTableDelete(fileSize uint64) { + d.mu.Lock() + d.mu.versions.metrics.Table.ObsoleteCount-- + d.mu.versions.metrics.Table.ObsoleteSize -= fileSize + d.mu.Unlock() +} + // maybeScheduleFlush schedules a flush if necessary. // // d.mu must be held when calling this. @@ -2108,14 +2115,9 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { flushed[i].readerUnrefLocked(true) } - d.deleteObsoleteFiles(jobID, false /* waitForOngoing */) + d.deleteObsoleteFiles(jobID) - // Mark all the memtables we flushed as flushed. Note that we do this last so - // that a synchronous call to DB.Flush() will not return until the deletion - // of obsolete files from this job have completed. This makes testing easier - // and provides similar behavior to manual compactions where the compaction - // is not marked as completed until the deletion of obsolete files job has - // completed. + // Mark all the memtables we flushed as flushed. for i := range flushed { close(flushed[i].flushed) } @@ -2633,7 +2635,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) { d.updateReadStateLocked(d.opts.DebugCheck) d.updateTableStatsLocked(ve.NewFiles) } - d.deleteObsoleteFiles(jobID, true /* waitForOngoing */) + d.deleteObsoleteFiles(jobID) return err } @@ -3479,75 +3481,27 @@ func (d *DB) scanObsoleteFiles(list []string) { // // d.mu must be held when calling this method. func (d *DB) disableFileDeletions() { - d.mu.cleaner.disabled++ - for d.mu.cleaner.cleaning { - d.mu.cleaner.cond.Wait() - } - d.mu.cleaner.cond.Broadcast() + d.mu.disableFileDeletions++ + d.mu.Unlock() + defer d.mu.Lock() + d.cleanupManager.Wait() } -// enableFileDeletions enables previously disabled file deletions. Note that if -// file deletions have been re-enabled, the current goroutine will be used to -// perform the queued up deletions. +// enableFileDeletions enables previously disabled file deletions. A cleanup job +// is queued if necessary. // // d.mu must be held when calling this method. func (d *DB) enableFileDeletions() { - if d.mu.cleaner.disabled <= 0 || d.mu.cleaner.cleaning { + if d.mu.disableFileDeletions <= 0 { panic("pebble: file deletion disablement invariant violated") } - d.mu.cleaner.disabled-- - if d.mu.cleaner.disabled > 0 { + d.mu.disableFileDeletions-- + if d.mu.disableFileDeletions > 0 { return } jobID := d.mu.nextJobID d.mu.nextJobID++ - d.deleteObsoleteFiles(jobID, true /* waitForOngoing */) -} - -// d.mu must be held when calling this. -func (d *DB) acquireCleaningTurn(waitForOngoing bool) bool { - // Only allow a single delete obsolete files job to run at a time. - for d.mu.cleaner.cleaning && d.mu.cleaner.disabled == 0 && waitForOngoing { - d.mu.cleaner.cond.Wait() - } - if d.mu.cleaner.cleaning { - return false - } - if d.mu.cleaner.disabled > 0 { - // File deletions are currently disabled. When they are re-enabled a new - // job will be created to catch up on file deletions. - return false - } - d.mu.cleaner.cleaning = true - return true -} - -// d.mu must be held when calling this. -func (d *DB) releaseCleaningTurn() { - d.mu.cleaner.cleaning = false - d.mu.cleaner.cond.Broadcast() -} - -// deleteObsoleteFiles deletes those files and objects that are no longer -// needed. If waitForOngoing is true, it waits for any ongoing cleaning turns to -// complete, and if false, it returns rightaway if a cleaning turn is ongoing. -// -// d.mu must be held when calling this, but the mutex may be dropped and -// re-acquired during the course of this method. -func (d *DB) deleteObsoleteFiles(jobID int, waitForOngoing bool) { - if !d.acquireCleaningTurn(waitForOngoing) { - return - } - d.doDeleteObsoleteFiles(jobID) - d.releaseCleaningTurn() -} - -// obsoleteFile holds information about a file that needs to be deleted soon. -type obsoleteFile struct { - dir string - fileNum base.DiskFileNum - fileType fileType - fileSize uint64 + d.deleteObsoleteFiles(jobID) } type fileInfo struct { @@ -3555,16 +3509,16 @@ type fileInfo struct { fileSize uint64 } -// d.mu must be held when calling this, but the mutex may be dropped and -// re-acquired during the course of this method. -func (d *DB) doDeleteObsoleteFiles(jobID int) { - var obsoleteTables []fileInfo - - defer func() { - for _, tbl := range obsoleteTables { - delete(d.mu.versions.zombieTables, tbl.fileNum) - } - }() +// deleteObsoleteFiles enqueues a cleanup job to the cleanup manager, if necessary. +// +// d.mu must be held when calling this. The function will release and re-aquire the mutex. +// +// Does nothing if file deletions are disabled (see disableFileDeletions). A +// cleanup job will be scheduled when file deletions are re-enabled. +func (d *DB) deleteObsoleteFiles(jobID int) { + if d.mu.disableFileDeletions > 0 { + return + } var obsoleteLogs []fileInfo for i := range d.mu.log.queue { @@ -3580,9 +3534,13 @@ func (d *DB) doDeleteObsoleteFiles(jobID int) { } } - obsoleteTables = append(obsoleteTables, d.mu.versions.obsoleteTables...) + obsoleteTables := append([]fileInfo(nil), d.mu.versions.obsoleteTables...) d.mu.versions.obsoleteTables = nil + for _, tbl := range obsoleteTables { + delete(d.mu.versions.zombieTables, tbl.fileNum) + } + // Sort the manifests cause we want to delete some contiguous prefix // of the older manifests. sort.Slice(d.mu.versions.obsoleteManifests, func(i, j int) bool { @@ -3603,7 +3561,7 @@ func (d *DB) doDeleteObsoleteFiles(jobID int) { obsoleteOptions := d.mu.versions.obsoleteOptions d.mu.versions.obsoleteOptions = nil - // Release d.mu while doing I/O + // Release d.mu while preparing the cleanup job and possibly waiting. // Note the unusual order: Unlock and then Lock. d.mu.Unlock() defer d.mu.Lock() @@ -3618,7 +3576,7 @@ func (d *DB) doDeleteObsoleteFiles(jobID int) { {fileTypeOptions, obsoleteOptions}, } _, noRecycle := d.opts.Cleaner.(base.NeedsFileContents) - filesToDelete := make([]obsoleteFile, 0, len(files)) + filesToDelete := make([]obsoleteFile, 0, len(obsoleteLogs)+len(obsoleteTables)+len(obsoleteManifests)+len(obsoleteOptions)) for _, f := range files { // We sort to make the order of deletions deterministic, which is nice for // tests. @@ -3646,126 +3604,24 @@ func (d *DB) doDeleteObsoleteFiles(jobID int) { } } if len(filesToDelete) > 0 { - d.deleters.Add(1) - // Delete asynchronously if that could get held up in the pacer. - if d.opts.TargetByteDeletionRate > 0 { - go d.paceAndDeleteObsoleteFiles(jobID, filesToDelete) - } else { - d.paceAndDeleteObsoleteFiles(jobID, filesToDelete) - } + d.cleanupManager.EnqueueJob(jobID, filesToDelete) } -} - -// Paces and eventually deletes the list of obsolete files passed in. db.mu -// must NOT be held when calling this method. -func (d *DB) paceAndDeleteObsoleteFiles(jobID int, files []obsoleteFile) { - defer d.deleters.Done() - var pacer *deletionPacer - if d.deletionLimiter != nil { - pacer = newDeletionPacer(d.deletionLimiter, d.getDeletionPacerInfo) - } - - for _, of := range files { - path := base.MakeFilepath(d.opts.FS, of.dir, of.fileType, of.fileNum) - if of.fileType == fileTypeTable { - // Don't throttle deletion of shared objects. - meta, err := d.objProvider.Lookup(of.fileType, of.fileNum) - // If we get an error here, deleteObsoleteObject won't actually delete - // anything, so we don't need to throttle. - if pacer != nil && err == nil && !meta.IsShared() { - pacer.maybeThrottle(of.fileSize) - } - d.mu.Lock() - d.mu.versions.metrics.Table.ObsoleteCount-- - d.mu.versions.metrics.Table.ObsoleteSize -= of.fileSize - d.mu.Unlock() - d.deleteObsoleteObject(fileTypeTable, jobID, of.fileNum) - } else { - d.deleteObsoleteFile(of.fileType, jobID, path, of.fileNum) - } + if d.testingAlwaysWaitForCleanup { + d.cleanupManager.Wait() } } func (d *DB) maybeScheduleObsoleteTableDeletion() { d.mu.Lock() defer d.mu.Unlock() - - if len(d.mu.versions.obsoleteTables) == 0 { - return - } - if !d.acquireCleaningTurn(false) { - return - } - - go func() { - pprof.Do(context.Background(), gcLabels, func(context.Context) { - d.mu.Lock() - defer d.mu.Unlock() - - jobID := d.mu.nextJobID - d.mu.nextJobID++ - d.doDeleteObsoleteFiles(jobID) - d.releaseCleaningTurn() - }) - }() + d.maybeScheduleObsoleteTableDeletionLocked() } -func (d *DB) deleteObsoleteObject(fileType fileType, jobID int, fileNum base.DiskFileNum) { - if fileType != fileTypeTable { - panic("not an object") - } - - var path string - meta, err := d.objProvider.Lookup(fileType, fileNum) - if err != nil { - path = "" - } else { - path = d.objProvider.Path(meta) - err = d.objProvider.Remove(fileType, fileNum) - } - if d.objProvider.IsNotExistError(err) { - return - } - - switch fileType { - case fileTypeTable: - d.opts.EventListener.TableDeleted(TableDeleteInfo{ - JobID: jobID, - Path: path, - FileNum: fileNum.FileNum(), - Err: err, - }) - } -} - -// deleteObsoleteFile deletes a (non-object) file that is no longer needed. -func (d *DB) deleteObsoleteFile( - fileType fileType, jobID int, path string, fileNum base.DiskFileNum, -) { - // TODO(peter): need to handle this error, probably by re-adding the - // file that couldn't be deleted to one of the obsolete slices map. - err := d.opts.Cleaner.Clean(d.opts.FS, fileType, path) - if oserror.IsNotExist(err) { - return - } - - switch fileType { - case fileTypeLog: - d.opts.EventListener.WALDeleted(WALDeleteInfo{ - JobID: jobID, - Path: path, - FileNum: fileNum.FileNum(), - Err: err, - }) - case fileTypeManifest: - d.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{ - JobID: jobID, - Path: path, - FileNum: fileNum.FileNum(), - Err: err, - }) - case fileTypeTable: - panic("invalid deletion of object file") +func (d *DB) maybeScheduleObsoleteTableDeletionLocked() { + if len(d.mu.versions.obsoleteTables) > 0 { + jobID := d.mu.nextJobID + d.mu.nextJobID++ + d.deleteObsoleteFiles(jobID) } } diff --git a/compaction_test.go b/compaction_test.go index 60e2c72257..99301127c6 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -18,7 +18,6 @@ import ( "sort" "strconv" "strings" - "sync" "sync/atomic" "testing" "time" @@ -3341,54 +3340,6 @@ func TestCompactFlushQueuedLargeBatch(t *testing.T) { require.NoError(t, d.Close()) } -// Regression test for #747. Test a problematic series of "cleaner" operations -// that could previously lead to DB.disableFileDeletions blocking forever even -// though no cleaning was in progress. -func TestCleanerCond(t *testing.T) { - d, err := Open("", testingRandomized(t, &Options{ - FS: vfs.NewMem(), - }).WithFSDefaults()) - require.NoError(t, err) - - for i := 0; i < 10; i++ { - d.mu.Lock() - require.True(t, d.acquireCleaningTurn(true)) - d.mu.Unlock() - - var wg sync.WaitGroup - wg.Add(2) - - go func() { - defer wg.Done() - d.mu.Lock() - if d.acquireCleaningTurn(true) { - d.releaseCleaningTurn() - } - d.mu.Unlock() - }() - - runtime.Gosched() - - go func() { - defer wg.Done() - d.mu.Lock() - d.disableFileDeletions() - d.enableFileDeletions() - d.mu.Unlock() - }() - - runtime.Gosched() - - d.mu.Lock() - d.releaseCleaningTurn() - d.mu.Unlock() - - wg.Wait() - } - - require.NoError(t, d.Close()) -} - func TestFlushError(t *testing.T) { // Error the first five times we try to write a sstable. var errorOps atomic.Int32 diff --git a/db.go b/db.go index 20f8b27873..d8b6c7c6fb 100644 --- a/db.go +++ b/db.go @@ -21,7 +21,6 @@ import ( "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/internal/manual" - "github.com/cockroachdb/pebble/internal/rate" "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/rangekey" "github.com/cockroachdb/pebble/record" @@ -307,15 +306,10 @@ type DB struct { closed *atomic.Value closedCh chan struct{} - // deletionLimiter is set when TargetByteDeletionRate is set. - deletionLimiter *rate.Limiter - - // Async deletion jobs spawned by cleaners increment this WaitGroup, and - // call Done when completed. Once `d.mu.cleaning` is false, the db.Close() - // goroutine needs to call Wait on this WaitGroup to ensure all cleaning - // and deleting goroutines have finished running. As deletion goroutines - // could grab db.mu, it must *not* be held while deleters.Wait() is called. - deleters sync.WaitGroup + cleanupManager *cleanupManager + // testingAlwaysWaitForCleanup is set by some tests to force waiting for + // obsolete file deletion (to make events deterministic). + testingAlwaysWaitForCleanup bool // During an iterator close, we may asynchronously schedule read compactions. // We want to wait for those goroutines to finish, before closing the DB. @@ -448,20 +442,10 @@ type DB struct { noOngoingFlushStartTime time.Time } - cleaner struct { - // Condition variable used to signal the completion of a file cleaning - // operation or an increment to the value of disabled. File cleaning operations are - // serialized, and a caller trying to do a file cleaning operation may wait - // until the ongoing one is complete. - cond sync.Cond - // True when a file cleaning operation is in progress. False does not necessarily - // mean all cleaning jobs have completed; see the comment on d.deleters. - cleaning bool - // Non-zero when file cleaning is disabled. The disabled count acts as a - // reference count to prohibit file cleaning. See - // DB.{disable,Enable}FileDeletions(). - disabled int - } + // Non-zero when file cleaning is disabled. The disabled count acts as a + // reference count to prohibit file cleaning. See + // DB.{disable,Enable}FileDeletions(). + disableFileDeletions int snapshots struct { // The list of active snapshots. @@ -516,11 +500,7 @@ var _ Writer = (*DB)(nil) // TestOnlyWaitForCleaning MUST only be used in tests. func (d *DB) TestOnlyWaitForCleaning() { - d.mu.Lock() - defer d.mu.Unlock() - for d.mu.cleaner.cleaning { - d.mu.cleaner.cond.Wait() - } + d.cleanupManager.Wait() } // Get gets the value for the given key. It returns ErrNotFound if the DB does @@ -1560,21 +1540,12 @@ func (d *DB) Close() error { err = firstError(err, errors.Errorf("leaked memtable reservation: %d", errors.Safe(reserved))) } - // No more cleaning can start. Wait for any async cleaning to complete. - for d.mu.cleaner.cleaning { - d.mu.cleaner.cond.Wait() - } - // There may still be obsolete tables if an existing async cleaning job - // prevented a new cleaning job when a readState was unrefed. If needed, - // synchronously delete obsolete files. - if len(d.mu.versions.obsoleteTables) > 0 { - d.deleteObsoleteFiles(d.mu.nextJobID, true /* waitForOngoing */) - } - // Wait for all the deletion goroutines spawned by cleaning jobs to finish. d.mu.Unlock() - d.deleters.Wait() d.compactionSchedulers.Wait() + // Wait for all cleaning jobs to finish. + d.cleanupManager.Close() + // Sanity check metrics. if invariants.Enabled { m := d.Metrics() diff --git a/db_test.go b/db_test.go index 7cb851d294..7621770108 100644 --- a/db_test.go +++ b/db_test.go @@ -987,6 +987,7 @@ func TestRollManifest(t *testing.T) { require.NoError(t, d.Set([]byte("a"), nil, nil)) require.NoError(t, d.Flush()) } + d.TestOnlyWaitForCleaning() num := manifestFileNumber() if lastManifestNum == num { t.Fatalf("manifest failed to roll %d: %d == %d", i, lastManifestNum, num) diff --git a/event_listener_test.go b/event_listener_test.go index ad31c2ff81..fc324a08a8 100644 --- a/event_listener_test.go +++ b/event_listener_test.go @@ -58,6 +58,7 @@ func TestEventListener(t *testing.T) { t = t.Add(time.Second) return t } + d.testingAlwaysWaitForCleanup = true return memLog.String() case "close": @@ -113,6 +114,7 @@ func TestEventListener(t *testing.T) { defer d.mu.Unlock() d.enableFileDeletions() }() + d.TestOnlyWaitForCleaning() return memLog.String() case "ingest": diff --git a/flush_test.go b/flush_test.go index edbf1a45c8..0031420e9b 100644 --- a/flush_test.go +++ b/flush_test.go @@ -76,18 +76,6 @@ func TestManualFlush(t *testing.T) { d.mu.Unlock() return s - case "acquire-cleaning-turn": - d.mu.Lock() - d.acquireCleaningTurn(false) - d.mu.Unlock() - return "" - - case "release-cleaning-turn": - d.mu.Lock() - d.releaseCleaningTurn() - d.mu.Unlock() - return "" - case "reset": if err := d.Close(); err != nil { return err.Error() diff --git a/ingest.go b/ingest.go index ae36234343..19c7d31ffb 100644 --- a/ingest.go +++ b/ingest.go @@ -1707,6 +1707,9 @@ func (d *DB) ingestApply( d.mu.versions.metrics.Ingest.Count++ d.updateReadStateLocked(d.opts.DebugCheck) + // updateReadStateLocked could have generated obsolete tables, schedule a + // cleanup job if necessary. + d.deleteObsoleteFiles(jobID) d.updateTableStatsLocked(ve.NewFiles) // The ingestion may have pushed a level over the threshold for compaction, // so check to see if one is necessary and schedule it. diff --git a/ingest_test.go b/ingest_test.go index d9c3f60718..e7d611887b 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -458,6 +458,7 @@ func TestOverlappingIngestedSSTs(t *testing.T) { var err error d, err = Open(dir, opts) require.NoError(t, err) + d.TestOnlyWaitForCleaning() } waitForFlush := func() { if d == nil { @@ -545,6 +546,7 @@ func TestOverlappingIngestedSSTs(t *testing.T) { closed = false require.NoError(t, err) waitForFlush() + d.TestOnlyWaitForCleaning() return "" case "blockFlush": @@ -564,6 +566,7 @@ func TestOverlappingIngestedSSTs(t *testing.T) { case "flush": d.maybeScheduleFlush() waitForFlush() + d.TestOnlyWaitForCleaning() return "" case "get": diff --git a/metrics_test.go b/metrics_test.go index 9a84656f26..9bc32be54e 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -211,13 +211,8 @@ func TestMetrics(t *testing.T) { } // The deletion of obsolete files happens asynchronously when an iterator - // is closed. Wait for the obsolete tables to be deleted. Note that - // waiting on cleaner.cond isn't precisely correct. - d.mu.Lock() - for d.mu.cleaner.cleaning || len(d.mu.versions.obsoleteTables) > 0 { - d.mu.cleaner.cond.Wait() - } - d.mu.Unlock() + // is closed. Wait for the obsolete tables to be deleted. + d.cleanupManager.Wait() return "" case "iter-new": diff --git a/open.go b/open.go index de5f3fb4d1..50390a7ea2 100644 --- a/open.go +++ b/open.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/internal/manual" - "github.com/cockroachdb/pebble/internal/rate" "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/objstorage/objstorageprovider" "github.com/cockroachdb/pebble/record" @@ -194,6 +193,9 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { t.arenaBuf = nil } } + if d.cleanupManager != nil { + d.cleanupManager.Close() + } if d.objProvider != nil { d.objProvider.Close() } @@ -209,15 +211,11 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { apply: d.commitApply, write: d.commitWrite, }) - if r := d.opts.TargetByteDeletionRate; r != 0 { - d.deletionLimiter = rate.NewLimiter(float64(r), float64(r)) - } d.mu.nextJobID = 1 d.mu.mem.nextSize = opts.MemTableSize if d.mu.mem.nextSize > initialMemTableSize { d.mu.mem.nextSize = initialMemTableSize } - d.mu.cleaner.cond.L = &d.mu.Mutex d.mu.compact.cond.L = &d.mu.Mutex d.mu.compact.inProgress = make(map[*compaction]struct{}) d.mu.compact.noOngoingFlushStartTime = time.Now() @@ -306,6 +304,8 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { return nil, err } + d.cleanupManager = openCleanupManager(opts, d.objProvider, d.onObsoleteTableDelete, d.getDeletionPacerInfo) + if manifestExists { curVersion := d.mu.versions.currentVersion() if err := checkConsistency(curVersion, dirname, d.objProvider); err != nil { @@ -507,7 +507,7 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { if !d.opts.ReadOnly { d.scanObsoleteFiles(ls) - d.deleteObsoleteFiles(jobID, true /* waitForOngoing */) + d.deleteObsoleteFiles(jobID) } else { // All the log files are obsolete. d.mu.versions.metrics.WAL.Files = int64(len(logFiles)) diff --git a/open_test.go b/open_test.go index ee6ede0ff5..abd6c01142 100644 --- a/open_test.go +++ b/open_test.go @@ -1007,6 +1007,7 @@ func TestOpenWALReplayReadOnlySeqNums(t *testing.T) { } d.mu.Unlock() + d.TestOnlyWaitForCleaning() // While the MANIFEST is still in this state, copy all the files in the // database to a new directory. replayDir := mem.PathJoin(root, "replay") diff --git a/pacer.go b/pacer.go index 46c2986e4c..80c9f1229f 100644 --- a/pacer.go +++ b/pacer.go @@ -4,12 +4,6 @@ package pebble -import ( - "time" - - "github.com/cockroachdb/pebble/internal/rate" -) - // deletionPacerInfo contains any info from the db necessary to make deletion // pacing decisions (to limit background IO usage so that it does not contend // with foreground traffic). @@ -25,21 +19,17 @@ type deletionPacerInfo struct { // negatively impacted if too many blocks are deleted very quickly, so this // mechanism helps mitigate that. type deletionPacer struct { - limiter *rate.Limiter freeSpaceThreshold uint64 obsoleteBytesMaxRatio float64 getInfo func() deletionPacerInfo - - testingSleepFn func(delay time.Duration) } // newDeletionPacer instantiates a new deletionPacer for use when deleting // obsolete files. The limiter passed in must be a singleton shared across this // pebble instance. -func newDeletionPacer(limiter *rate.Limiter, getInfo func() deletionPacerInfo) *deletionPacer { +func newDeletionPacer(getInfo func() deletionPacerInfo) *deletionPacer { return &deletionPacer{ - limiter: limiter, // If there are less than freeSpaceThreshold bytes of free space on // disk, do not pace deletions at all. freeSpaceThreshold: 16 << 30, // 16 GB @@ -51,25 +41,14 @@ func newDeletionPacer(limiter *rate.Limiter, getInfo func() deletionPacerInfo) * } } -// limit applies rate limiting if the current free disk space is more than -// freeSpaceThreshold, and the ratio of obsolete to live bytes is less than -// obsoleteBytesMaxRatio. -func (p *deletionPacer) limit(amount uint64, info deletionPacerInfo) { +// shouldPace returns true if we should apply rate limiting; this is the +// case when the current free disk space is more than freeSpaceThreshold, and +// the ratio of obsolete to live bytes is less than obsoleteBytesMaxRatio. +func (p *deletionPacer) shouldPace() bool { + info := p.getInfo() obsoleteBytesRatio := float64(1.0) if info.liveBytes > 0 { obsoleteBytesRatio = float64(info.obsoleteBytes) / float64(info.liveBytes) } - paceDeletions := info.freeBytes > p.freeSpaceThreshold && - obsoleteBytesRatio < p.obsoleteBytesMaxRatio - if paceDeletions { - p.limiter.Wait(float64(amount)) - } else { - p.limiter.Remove(float64(amount)) - } -} - -// maybeThrottle slows down a deletion of this file if it's faster than -// opts.TargetByteDeletionRate. -func (p *deletionPacer) maybeThrottle(bytesToDelete uint64) { - p.limit(bytesToDelete, p.getInfo()) + return info.freeBytes > p.freeSpaceThreshold && obsoleteBytesRatio < p.obsoleteBytesMaxRatio } diff --git a/pacer_test.go b/pacer_test.go index 54de5bb430..7c94d4db47 100644 --- a/pacer_test.go +++ b/pacer_test.go @@ -5,103 +5,65 @@ package pebble import ( - "bytes" "fmt" - "strconv" - "strings" "testing" - "time" - "github.com/cockroachdb/datadriven" - "github.com/cockroachdb/pebble/internal/rate" + "github.com/stretchr/testify/require" ) -func TestCompactionPacerMaybeThrottle(t *testing.T) { - now := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) - var buf bytes.Buffer - nowFn := func() time.Time { - return now +func TestDeletionPacerMaybeThrottle(t *testing.T) { + testCases := []struct { + freeSpaceThreshold uint64 + freeBytes uint64 + obsoleteBytes uint64 + liveBytes uint64 + shouldPace bool + }{ + { + freeSpaceThreshold: 10, + freeBytes: 100, + obsoleteBytes: 1, + liveBytes: 100, + shouldPace: true, + }, + // As freeBytes < freeSpaceThreshold, there should be no throttling. + { + freeSpaceThreshold: 10, + freeBytes: 5, + obsoleteBytes: 1, + liveBytes: 100, + shouldPace: false, + }, + // As obsoleteBytesRatio > 0.20, there should be no throttling. + { + freeSpaceThreshold: 10, + freeBytes: 500, + obsoleteBytes: 50, + liveBytes: 100, + shouldPace: false, + }, + // When obsolete ratio unknown, there should be no throttling. + { + freeSpaceThreshold: 10, + freeBytes: 500, + obsoleteBytes: 0, + liveBytes: 0, + shouldPace: false, + }, } - sleepFn := func(d time.Duration) { - fmt.Fprintf(&buf, "wait: %s", d) - now = now.Add(d) - } - - var pacer *deletionPacer - - datadriven.RunTest(t, "testdata/compaction_pacer_maybe_throttle", - func(t *testing.T, d *datadriven.TestData) string { - buf.Reset() - switch d.Cmd { - case "init": - burst := uint64(1) - var slowdownThreshold uint64 - var freeBytes, liveBytes, obsoleteBytes uint64 - for _, data := range strings.Split(d.Input, "\n") { - parts := strings.Split(data, ":") - if len(parts) != 2 { - return fmt.Sprintf("malformed test:\n%s", d.Input) - } - varKey := parts[0] - varValue, err := strconv.ParseUint(strings.TrimSpace(parts[1]), 10, 64) - if err != nil { - return err.Error() - } - - switch varKey { - case "burst": - burst = varValue - case "slowdownThreshold": - slowdownThreshold = varValue - case "freeBytes": - freeBytes = varValue - case "liveBytes": - liveBytes = varValue - case "obsoleteBytes": - obsoleteBytes = varValue - default: - return fmt.Sprintf("unknown argument: %s", varKey) - } - } - - getInfo := func() deletionPacerInfo { - return deletionPacerInfo{ - freeBytes: freeBytes, - liveBytes: liveBytes, - obsoleteBytes: obsoleteBytes, - } + for tcIdx, tc := range testCases { + t.Run(fmt.Sprintf("%d", tcIdx), func(t *testing.T) { + getInfo := func() deletionPacerInfo { + return deletionPacerInfo{ + freeBytes: tc.freeBytes, + liveBytes: tc.liveBytes, + obsoleteBytes: tc.obsoleteBytes, } - mockLimiter := rate.NewLimiterWithCustomTime(float64(burst), float64(burst), nowFn, sleepFn) - pacer = newDeletionPacer(mockLimiter, getInfo) - pacer.testingSleepFn = sleepFn - pacer.freeSpaceThreshold = slowdownThreshold - return "" - - case "delete": - var bytesToDelete uint64 - for _, data := range strings.Split(d.Input, "\n") { - parts := strings.Split(data, ":") - if len(parts) != 2 { - return fmt.Sprintf("malformed test:\n%s", d.Input) - } - varKey := parts[0] - varValue, err := strconv.ParseUint(strings.TrimSpace(parts[1]), 10, 64) - if err != nil { - return err.Error() - } - - switch varKey { - case "bytesToDelete": - bytesToDelete = varValue - default: - return fmt.Sprintf("unknown command: %s", varKey) - } - } - pacer.maybeThrottle(bytesToDelete) - return buf.String() - - default: - return fmt.Sprintf("unknown command: %s", d.Cmd) } + pacer := newDeletionPacer(getInfo) + pacer.freeSpaceThreshold = tc.freeSpaceThreshold + result := pacer.shouldPace() + require.Equal(t, tc.shouldPace, result) }) + } } diff --git a/read_state.go b/read_state.go index 35e7e77458..d3a78baa9e 100644 --- a/read_state.go +++ b/read_state.go @@ -50,8 +50,9 @@ func (s *readState) unref() { // unrefLocked removes a reference to the readState. If this was the last // reference, the reference the readState holds on the version is -// released. Requires DB.mu is held as version.unrefLocked() requires it. See -// unref() if DB.mu is NOT held by the caller. +// released. +// +// DB.mu must be held. See unref() if DB.mu is NOT held by the caller. func (s *readState) unrefLocked() { if s.refcnt.Add(-1) != 0 { return @@ -61,9 +62,8 @@ func (s *readState) unrefLocked() { mem.readerUnrefLocked(true) } - // NB: Unlike readState.unref(), we don't attempt to cleanup newly obsolete - // tables as unrefLocked() is only called during DB shutdown to release the - // current readState. + // In this code path, the caller is responsible for scheduling obsolete table + // deletion as necessary. } // loadReadState returns the current readState. The returned readState must be diff --git a/testdata/compaction_pacer_maybe_throttle b/testdata/compaction_pacer_maybe_throttle deleted file mode 100644 index d749197e3f..0000000000 --- a/testdata/compaction_pacer_maybe_throttle +++ /dev/null @@ -1,98 +0,0 @@ -init -burst: 10 -slowdownThreshold: 10 -freeBytes: 100 -obsoleteBytes: 1 -liveBytes: 100 ----- - -delete -bytesToDelete: 1 ----- - -delete -bytesToDelete: 1 ----- - -# As freeBytes > slowdownThreshold and obsoleteBytesRatio < 0.20, -# the deletions should be throttled. - -init deletion -burst: 10 -slowdownThreshold: 10 -freeBytes: 100 -obsoleteBytes: 1 -liveBytes: 100 ----- - -delete -bytesToDelete: 10 ----- - -delete -bytesToDelete: 5 ----- -wait: 500ms - -delete -bytesToDelete: 50 ----- -wait: 1s - -delete -bytesToDelete: 10 ----- -wait: 5s - -# As freeBytes < slowdownThreshold, there should be no throttling. - -init deletion -burst: 10 -slowdownThreshold: 10 -freeBytes: 5 -obsoleteBytes: 1 -liveBytes: 100 ----- - -delete -bytesToDelete: 50 ----- - -delete -bytesToDelete: 50 ----- - - -# As obsoleteBytesRatio > 0.20, there should be no throttling. - -init deletion -burst: 10 -slowdownThreshold: 10 -freeBytes: 500 -obsoleteBytes: 50 -liveBytes: 100 ----- - -delete -bytesToDelete: 50 ----- - -delete -bytesToDelete: 50 ----- - -# When obsolete ratio unknown, there should be no throttling. - -init deletion -burst: 10 -slowdownThreshold: 10 -freeBytes: 500 ----- - -delete -bytesToDelete: 50 ----- - -delete -bytesToDelete: 50 ----- diff --git a/testdata/event_listener b/testdata/event_listener index 67732111bb..c1da34e65b 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -261,6 +261,8 @@ close: db/marker.manifest.000006.MANIFEST-000016 remove: db/marker.manifest.000005.MANIFEST-000014 sync: db [JOB 12] MANIFEST created 000016 +remove: db/MANIFEST-000011 +[JOB 12] MANIFEST deleted 000011 remove: ext/0 [JOB 12] ingested L0:000015 (717 B) @@ -342,8 +344,6 @@ close: db/000022.sst sync: db sync: db/MANIFEST-000016 [JOB 17] flushed 1 memtable to L0 [000022] (662 B), in 1.0s (2.0s total), output rate 662 B/s -remove: db/MANIFEST-000011 -[JOB 17] MANIFEST deleted 000011 [JOB 18] flushing 2 ingested tables create: db/MANIFEST-000023 close: db/MANIFEST-000016 diff --git a/testdata/manual_flush b/testdata/manual_flush index 44a07f4b5a..1cb0d98bed 100644 --- a/testdata/manual_flush +++ b/testdata/manual_flush @@ -75,23 +75,3 @@ async-flush ---- 0.0: 000005:[a#10,SET-b#11,SET] - -# Test that synchronous flushes can happen even when a cleaning turn is held. -reset ----- - -acquire-cleaning-turn ----- - -batch -set a 1 -set b 2 ----- - -flush ----- -0.0: - 000005:[a#10,SET-b#11,SET] - -release-cleaning-turn ----- diff --git a/version_set_test.go b/version_set_test.go index 8915bdbb1e..3bf2b929df 100644 --- a/version_set_test.go +++ b/version_set_test.go @@ -451,6 +451,7 @@ func TestVersionSetSeqNums(t *testing.T) { d, err = Open("", opts) require.NoError(t, err) defer d.Close() + d.TestOnlyWaitForCleaning() // Check that the manifest has the correct LastSeqNum, equalling the highest // observed SeqNum.