From cc5b658df07ce7d8ece5418c9703f0983f1822b0 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Sat, 30 Mar 2024 12:45:48 -0700 Subject: [PATCH] db: minor jobID cleanup Introduce a `JobID` type and a helper to create a new job ID. --- cleaner.go | 14 +++++++------- compaction.go | 30 ++++++++++++------------------ data_test.go | 3 +-- db.go | 9 ++++----- db_internals.go | 18 ++++++++++++++++++ flushable_test.go | 5 ++--- format_major_version.go | 3 +-- ingest.go | 24 ++++++++++-------------- ingest_test.go | 6 +++--- open.go | 7 +++---- table_cache_test.go | 3 +-- table_stats.go | 5 ++--- version_set.go | 10 +++++----- 13 files changed, 69 insertions(+), 68 deletions(-) create mode 100644 db_internals.go diff --git a/cleaner.go b/cleaner.go index d0dc44c884..81c19c3681 100644 --- a/cleaner.go +++ b/cleaner.go @@ -69,7 +69,7 @@ type obsoleteFile struct { } type cleanupJob struct { - jobID int + jobID JobID obsoleteFiles []obsoleteFile } @@ -108,7 +108,7 @@ func (cm *cleanupManager) Close() { } // EnqueueJob adds a cleanup job to the manager's queue. -func (cm *cleanupManager) EnqueueJob(jobID int, obsoleteFiles []obsoleteFile) { +func (cm *cleanupManager) EnqueueJob(jobID JobID, obsoleteFiles []obsoleteFile) { job := &cleanupJob{ jobID: jobID, obsoleteFiles: obsoleteFiles, @@ -226,7 +226,7 @@ func (cm *cleanupManager) maybePace( // deleteObsoleteFile deletes a (non-object) file that is no longer needed. func (cm *cleanupManager) deleteObsoleteFile( - fs vfs.FS, fileType fileType, jobID int, path string, fileNum base.DiskFileNum, fileSize uint64, + fs vfs.FS, fileType fileType, jobID JobID, path string, fileNum base.DiskFileNum, fileSize uint64, ) { // TODO(peter): need to handle this error, probably by re-adding the // file that couldn't be deleted to one of the obsolete slices map. @@ -238,14 +238,14 @@ func (cm *cleanupManager) deleteObsoleteFile( switch fileType { case fileTypeLog: cm.opts.EventListener.WALDeleted(WALDeleteInfo{ - JobID: jobID, + JobID: int(jobID), Path: path, FileNum: fileNum, Err: err, }) case fileTypeManifest: cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{ - JobID: jobID, + JobID: int(jobID), Path: path, FileNum: fileNum, Err: err, @@ -256,7 +256,7 @@ func (cm *cleanupManager) deleteObsoleteFile( } func (cm *cleanupManager) deleteObsoleteObject( - fileType fileType, jobID int, fileNum base.DiskFileNum, + fileType fileType, jobID JobID, fileNum base.DiskFileNum, ) { if fileType != fileTypeTable { panic("not an object") @@ -277,7 +277,7 @@ func (cm *cleanupManager) deleteObsoleteObject( switch fileType { case fileTypeTable: cm.opts.EventListener.TableDeleted(TableDeleteInfo{ - JobID: jobID, + JobID: int(jobID), Path: path, FileNum: fileNum, Err: err, diff --git a/compaction.go b/compaction.go index 1af2d79a53..da227ebba7 100644 --- a/compaction.go +++ b/compaction.go @@ -353,9 +353,9 @@ type compaction struct { pickerMetrics compactionPickerMetrics } -func (c *compaction) makeInfo(jobID int) CompactionInfo { +func (c *compaction) makeInfo(jobID JobID) CompactionInfo { info := CompactionInfo{ - JobID: jobID, + JobID: int(jobID), Reason: c.kind.String(), Input: make([]LevelInfo, 0, len(c.inputs)), Annotations: []string{}, @@ -1648,10 +1648,9 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { } d.addInProgressCompaction(c) - jobID := d.mu.nextJobID - d.mu.nextJobID++ + jobID := d.newJobIDLocked() d.opts.EventListener.FlushBegin(FlushInfo{ - JobID: jobID, + JobID: int(jobID), Input: inputs, InputBytes: inputBytes, Ingest: ingest, @@ -1680,7 +1679,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { } info := FlushInfo{ - JobID: jobID, + JobID: int(jobID), Input: inputs, InputBytes: inputBytes, Duration: d.timeNow().Sub(startTime), @@ -2434,8 +2433,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) { }() } - jobID := d.mu.nextJobID - d.mu.nextJobID++ + jobID := d.newJobIDLocked() info := c.makeInfo(jobID) d.opts.EventListener.CompactionBegin(info) startTime := d.timeNow() @@ -2537,7 +2535,7 @@ type compactStats struct { // d.mu must be held when calling this method. The mutex will be released when // doing IO. func (d *DB) runCopyCompaction( - jobID int, + jobID JobID, c *compaction, inputMeta *fileMetadata, objMeta objstorage.ObjectMetadata, @@ -2696,7 +2694,7 @@ type compactionOutput struct { // d.mu must be held when calling this, but the mutex may be dropped and // re-acquired during the course of this method. func (d *DB) runCompaction( - jobID int, c *compaction, + jobID JobID, c *compaction, ) (ve *versionEdit, pendingOutputs []compactionOutput, stats compactStats, retErr error) { // As a sanity check, confirm that the smallest / largest keys for new and // deleted files in the new versionEdit pass a validation function before @@ -2972,7 +2970,7 @@ func (d *DB) runCompaction( reason = "compacting" } d.opts.EventListener.TableCreated(TableCreateInfo{ - JobID: jobID, + JobID: int(jobID), Reason: reason, Path: d.objProvider.Path(objMeta), FileNum: diskFileNum, @@ -3581,9 +3579,7 @@ func (d *DB) enableFileDeletions() { if d.mu.disableFileDeletions > 0 { return } - jobID := d.mu.nextJobID - d.mu.nextJobID++ - d.deleteObsoleteFiles(jobID) + d.deleteObsoleteFiles(d.newJobIDLocked()) } type fileInfo = base.FileInfo @@ -3594,7 +3590,7 @@ type fileInfo = base.FileInfo // // Does nothing if file deletions are disabled (see disableFileDeletions). A // cleanup job will be scheduled when file deletions are re-enabled. -func (d *DB) deleteObsoleteFiles(jobID int) { +func (d *DB) deleteObsoleteFiles(jobID JobID) { if d.mu.disableFileDeletions > 0 { return } @@ -3701,9 +3697,7 @@ func (d *DB) maybeScheduleObsoleteTableDeletion() { func (d *DB) maybeScheduleObsoleteTableDeletionLocked() { if len(d.mu.versions.obsoleteTables) > 0 { - jobID := d.mu.nextJobID - d.mu.nextJobID++ - d.deleteObsoleteFiles(jobID) + d.deleteObsoleteFiles(d.newJobIDLocked()) } } diff --git a/data_test.go b/data_test.go index 9ce029d67a..5fc92fee35 100644 --- a/data_test.go +++ b/data_test.go @@ -1054,8 +1054,7 @@ func runDBDefineCmdReuseFS(td *datadriven.TestData, opts *Options) (*DB, error) } if len(ve.NewFiles) > 0 { - jobID := d.mu.nextJobID - d.mu.nextJobID++ + jobID := d.newJobIDLocked() d.mu.versions.logLock() if err := d.mu.versions.logAndApply(jobID, ve, newFileMetrics(ve.NewFiles), false, func() []compactionInfo { return nil diff --git a/db.go b/db.go index 94c7b8cc32..8d569e45d4 100644 --- a/db.go +++ b/db.go @@ -360,7 +360,7 @@ type DB struct { // notifications and act as a mechanism for tying together the events and // log messages for a single job such as a flush, compaction, or file // ingestion. Job IDs are not serialized to disk or used for correctness. - nextJobID int + nextJobID JobID // The collection of immutable versions and state about the log and visible // sequence numbers. Use the pointer here to ensure the atomic fields in @@ -1679,7 +1679,7 @@ func (d *DB) Close() error { // Since we called d.readState.val.unrefLocked() above, we are expected to // manually schedule deletion of obsolete files. if len(d.mu.versions.obsoleteTables) > 0 { - d.deleteObsoleteFiles(d.mu.nextJobID) + d.deleteObsoleteFiles(d.newJobIDLocked()) } d.mu.Unlock() @@ -2745,8 +2745,7 @@ func (d *DB) recycleWAL() (newLogNum base.DiskFileNum, prevLogSize uint64) { if d.opts.DisableWAL { panic("pebble: invalid function call") } - jobID := d.mu.nextJobID - d.mu.nextJobID++ + jobID := d.newJobIDLocked() newLogNum = d.mu.versions.getNextDiskFileNum() d.mu.Unlock() @@ -2770,7 +2769,7 @@ func (d *DB) recycleWAL() (newLogNum base.DiskFileNum, prevLogSize uint64) { } d.mu.Unlock() - writer, err := d.mu.log.manager.Create(wal.NumWAL(newLogNum), jobID) + writer, err := d.mu.log.manager.Create(wal.NumWAL(newLogNum), int(jobID)) if err != nil { panic(err) } diff --git a/db_internals.go b/db_internals.go new file mode 100644 index 0000000000..d5c1b09702 --- /dev/null +++ b/db_internals.go @@ -0,0 +1,18 @@ +// Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package pebble + +// JobID identifies a job (like a compaction). Job IDs are passed to event +// listener notifications and act as a mechanism for tying together the events +// and log messages for a single job such as a flush, compaction, or file +// ingestion. Job IDs are not serialized to disk or used for correctness. +type JobID int + +// newJobIDLocked returns a new JobID; DB.mu must be held. +func (d *DB) newJobIDLocked() JobID { + res := d.mu.nextJobID + d.mu.nextJobID++ + return res +} diff --git a/flushable_test.go b/flushable_test.go index 06b3d8f48a..34b83eed37 100644 --- a/flushable_test.go +++ b/flushable_test.go @@ -52,13 +52,12 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { for i := range paths { pendingOutputs[i] = d.mu.versions.getNextFileNum() } - jobID := d.mu.nextJobID - d.mu.nextJobID++ + jobID := d.newJobIDLocked() d.mu.Unlock() // We can reuse the ingestLoad function for this test even if we're // not actually ingesting a file. - lr, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, nil, nil, d.cacheID, pendingOutputs, d.objProvider, jobID) + lr, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, nil, nil, d.cacheID, pendingOutputs) if err != nil { panic(err) } diff --git a/format_major_version.go b/format_major_version.go index b795fa7996..b7c501f79d 100644 --- a/format_major_version.go +++ b/format_major_version.go @@ -451,8 +451,7 @@ var _ = (*DB)(nil).markFilesLocked // markFilesLocked durably marks the files that match the given findFilesFunc for // compaction. func (d *DB) markFilesLocked(findFn findFilesFunc) error { - jobID := d.mu.nextJobID - d.mu.nextJobID++ + jobID := d.newJobIDLocked() // Acquire a read state to have a view of the LSM and a guarantee that none // of the referenced files will be deleted until we've unreferenced the read diff --git a/ingest.go b/ingest.go index 19791497cc..6d5c6408fb 100644 --- a/ingest.go +++ b/ingest.go @@ -430,8 +430,6 @@ func ingestLoad( external []ExternalFile, cacheID uint64, pending []base.FileNum, - objProvider objstorage.Provider, - jobID int, ) (ingestLoadResult, error) { localFileNums := pending[:len(paths)] sharedFileNums := pending[len(paths) : len(paths)+len(shared)] @@ -590,7 +588,7 @@ func ingestCleanup(objProvider objstorage.Provider, meta []ingestLocalMeta) erro // ingestLinkLocal creates new objects which are backed by either hardlinks to or // copies of the ingested files. func ingestLinkLocal( - jobID int, opts *Options, objProvider objstorage.Provider, localMetas []ingestLocalMeta, + jobID JobID, opts *Options, objProvider objstorage.Provider, localMetas []ingestLocalMeta, ) error { for i := range localMetas { objMeta, err := objProvider.LinkOrCopyFromLocal( @@ -605,7 +603,7 @@ func ingestLinkLocal( } if opts.EventListener.TableCreated != nil { opts.EventListener.TableCreated(TableCreateInfo{ - JobID: jobID, + JobID: int(jobID), Reason: "ingesting", Path: objProvider.Path(objMeta), FileNum: base.PhysicalTableDiskFileNum(localMetas[i].FileNum), @@ -622,7 +620,7 @@ func ingestLinkLocal( // // ingestUnprotectExternalBackings() must be called after this function (even in // error cases). -func (d *DB) ingestAttachRemote(jobID int, lr ingestLoadResult) error { +func (d *DB) ingestAttachRemote(jobID JobID, lr ingestLoadResult) error { remoteObjs := make([]objstorage.RemoteObjectToAttach, 0, len(lr.shared)+len(lr.external)) for i := range lr.shared { backing, err := lr.shared[i].shared.Backing.Get() @@ -705,7 +703,7 @@ func (d *DB) ingestAttachRemote(jobID int, lr ingestLoadResult) error { if d.opts.EventListener.TableCreated != nil { for i := range remoteObjMetas { d.opts.EventListener.TableCreated(TableCreateInfo{ - JobID: jobID, + JobID: int(jobID), Reason: "ingesting", Path: d.objProvider.Path(remoteObjMetas[i]), FileNum: remoteObjMetas[i].DiskFileNum, @@ -1459,13 +1457,12 @@ func (d *DB) ingest( pendingOutputs[i] = d.mu.versions.getNextFileNum() } - jobID := d.mu.nextJobID - d.mu.nextJobID++ + jobID := d.newJobIDLocked() d.mu.Unlock() // Load the metadata for all the files being ingested. This step detects // and elides empty sstables. - loadResult, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, shared, external, d.cacheID, pendingOutputs, d.objProvider, jobID) + loadResult, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, shared, external, d.cacheID, pendingOutputs) if err != nil { return IngestOperationStats{}, err } @@ -1755,7 +1752,7 @@ func (d *DB) ingest( } info := TableIngestInfo{ - JobID: jobID, + JobID: int(jobID), Err: err, flushable: asFlushable, } @@ -2248,7 +2245,7 @@ func (d *DB) ingestSplit( } func (d *DB) ingestApply( - jobID int, + jobID JobID, lr ingestLoadResult, findTargetLevel ingestTargetLevelFunc, mut *memTable, @@ -2589,8 +2586,7 @@ func (d *DB) validateSSTables() { pending := d.mu.tableValidation.pending d.mu.tableValidation.pending = nil d.mu.tableValidation.validating = true - jobID := d.mu.nextJobID - d.mu.nextJobID++ + jobID := d.newJobIDLocked() rs := d.loadReadState() // Drop DB.mu before performing IO. @@ -2661,7 +2657,7 @@ func (d *DB) validateSSTables() { } d.opts.EventListener.TableValidated(TableValidatedInfo{ - JobID: jobID, + JobID: int(jobID), Meta: f.Meta, }) } diff --git a/ingest_test.go b/ingest_test.go index 10e9ddfd29..98b2d16daa 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -128,7 +128,7 @@ func TestIngestLoad(t *testing.T) { Comparer: DefaultComparer, FS: mem, }).WithFSDefaults() - lr, err := ingestLoad(opts, dbVersion, []string{"ext"}, nil, nil, 0, []base.FileNum{1}, nil, 0) + lr, err := ingestLoad(opts, dbVersion, []string{"ext"}, nil, nil, 0, []base.FileNum{1}) if err != nil { return err.Error() } @@ -218,7 +218,7 @@ func TestIngestLoadRand(t *testing.T) { Comparer: DefaultComparer, FS: mem, }).WithFSDefaults() - lr, err := ingestLoad(opts, version, paths, nil, nil, 0, pending, nil, 0) + lr, err := ingestLoad(opts, version, paths, nil, nil, 0, pending) require.NoError(t, err) for _, m := range lr.local { @@ -238,7 +238,7 @@ func TestIngestLoadInvalid(t *testing.T) { Comparer: DefaultComparer, FS: mem, }).WithFSDefaults() - if _, err := ingestLoad(opts, internalFormatNewest, []string{"invalid"}, nil, nil, 0, []base.FileNum{1}, nil, 0); err == nil { + if _, err := ingestLoad(opts, internalFormatNewest, []string{"invalid"}, nil, nil, 0, []base.FileNum{1}); err == nil { t.Fatalf("expected error, but found success") } } diff --git a/open.go b/open.go index 26ea720a87..bdc6f1eda5 100644 --- a/open.go +++ b/open.go @@ -286,8 +286,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) { d.mu.Lock() defer d.mu.Unlock() - jobID := d.mu.nextJobID - d.mu.nextJobID++ + jobID := d.newJobIDLocked() providerSettings := objstorageprovider.Settings{ Logger: opts.Logger, @@ -511,7 +510,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) { entry.readerUnrefLocked(true) } - d.mu.log.writer, err = d.mu.log.manager.Create(wal.NumWAL(newLogNum), jobID) + d.mu.log.writer, err = d.mu.log.manager.Create(wal.NumWAL(newLogNum), int(jobID)) if err != nil { return nil, err } @@ -754,7 +753,7 @@ func GetVersion(dir string, fs vfs.FS) (string, error) { // d.mu must be held when calling this, but the mutex may be dropped and // re-acquired during the course of this method. func (d *DB) replayWAL( - jobID int, ve *versionEdit, ll wal.LogicalLog, strictWALTail bool, + jobID JobID, ve *versionEdit, ll wal.LogicalLog, strictWALTail bool, ) (toFlush flushableList, maxSeqNum uint64, err error) { rr := ll.OpenForRead() defer rr.Close() diff --git a/table_cache_test.go b/table_cache_test.go index 81197d639b..c7895cee7e 100644 --- a/table_cache_test.go +++ b/table_cache_test.go @@ -356,8 +356,7 @@ func TestVirtualReadsWiring(t *testing.T) { applyVE := func(ve *versionEdit) error { d.mu.versions.logLock() - jobID := d.mu.nextJobID - d.mu.nextJobID++ + jobID := d.newJobIDLocked() err := d.mu.versions.logAndApply(jobID, ve, fileMetrics(ve), false, func() []compactionInfo { return d.getInProgressCompactionInfoLocked(nil) diff --git a/table_stats.go b/table_stats.go index 44f1f9ac55..ed589c55fa 100644 --- a/table_stats.go +++ b/table_stats.go @@ -91,8 +91,7 @@ func (d *DB) collectTableStats() bool { pending := d.mu.tableStats.pending d.mu.tableStats.pending = nil d.mu.tableStats.loading = true - jobID := d.mu.nextJobID - d.mu.nextJobID++ + jobID := d.newJobIDLocked() loadedInitial := d.mu.tableStats.loadedInitial // Drop DB.mu before performing IO. d.mu.Unlock() @@ -122,7 +121,7 @@ func (d *DB) collectTableStats() bool { if loadedInitial && !d.mu.tableStats.loadedInitial { d.mu.tableStats.loadedInitial = loadedInitial d.opts.EventListener.TableStatsLoaded(TableStatsInfo{ - JobID: jobID, + JobID: int(jobID), }) } diff --git a/version_set.go b/version_set.go index 9724c0717e..4b4cfc231c 100644 --- a/version_set.go +++ b/version_set.go @@ -161,7 +161,7 @@ func (vs *versionSet) init( // create creates a version set for a fresh DB. func (vs *versionSet) create( - jobID int, + jobID JobID, dirname string, provider objstorage.Provider, opts *Options, @@ -197,7 +197,7 @@ func (vs *versionSet) create( } vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{ - JobID: jobID, + JobID: int(jobID), Path: base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, vs.manifestFileNum), FileNum: vs.manifestFileNum, Err: err, @@ -425,7 +425,7 @@ func (vs *versionSet) logUnlock() { // inProgressCompactions is called while DB.mu is held, to get the list of // in-progress compactions. func (vs *versionSet) logAndApply( - jobID int, + jobID JobID, ve *versionEdit, metrics map[int]*LevelMetrics, forceRotation bool, @@ -567,7 +567,7 @@ func (vs *versionSet) logAndApply( if newManifestFileNum != 0 { if err := vs.createManifest(vs.dirname, newManifestFileNum, minUnflushedLogNum, nextFileNum, newManifestVirtualBackings); err != nil { vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{ - JobID: jobID, + JobID: int(jobID), Path: base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, newManifestFileNum), FileNum: newManifestFileNum, Err: err, @@ -601,7 +601,7 @@ func (vs *versionSet) logAndApply( return errors.Wrap(err, "MANIFEST set current failed") } vs.opts.EventListener.ManifestCreated(ManifestCreateInfo{ - JobID: jobID, + JobID: int(jobID), Path: base.MakeFilepath(vs.fs, vs.dirname, fileTypeManifest, newManifestFileNum), FileNum: newManifestFileNum, })