Skip to content

Commit

Permalink
db: minor jobID cleanup
Browse files Browse the repository at this point in the history
Introduce a `JobID` type and a helper to create a new job ID.
  • Loading branch information
RaduBerinde committed Apr 1, 2024
1 parent 295c009 commit cc5b658
Show file tree
Hide file tree
Showing 13 changed files with 69 additions and 68 deletions.
14 changes: 7 additions & 7 deletions cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type obsoleteFile struct {
}

type cleanupJob struct {
jobID int
jobID JobID
obsoleteFiles []obsoleteFile
}

Expand Down Expand Up @@ -108,7 +108,7 @@ func (cm *cleanupManager) Close() {
}

// EnqueueJob adds a cleanup job to the manager's queue.
func (cm *cleanupManager) EnqueueJob(jobID int, obsoleteFiles []obsoleteFile) {
func (cm *cleanupManager) EnqueueJob(jobID JobID, obsoleteFiles []obsoleteFile) {
job := &cleanupJob{
jobID: jobID,
obsoleteFiles: obsoleteFiles,
Expand Down Expand Up @@ -226,7 +226,7 @@ func (cm *cleanupManager) maybePace(

// deleteObsoleteFile deletes a (non-object) file that is no longer needed.
func (cm *cleanupManager) deleteObsoleteFile(
fs vfs.FS, fileType fileType, jobID int, path string, fileNum base.DiskFileNum, fileSize uint64,
fs vfs.FS, fileType fileType, jobID JobID, path string, fileNum base.DiskFileNum, fileSize uint64,
) {
// TODO(peter): need to handle this error, probably by re-adding the
// file that couldn't be deleted to one of the obsolete slices map.
Expand All @@ -238,14 +238,14 @@ func (cm *cleanupManager) deleteObsoleteFile(
switch fileType {
case fileTypeLog:
cm.opts.EventListener.WALDeleted(WALDeleteInfo{
JobID: jobID,
JobID: int(jobID),
Path: path,
FileNum: fileNum,
Err: err,
})
case fileTypeManifest:
cm.opts.EventListener.ManifestDeleted(ManifestDeleteInfo{
JobID: jobID,
JobID: int(jobID),
Path: path,
FileNum: fileNum,
Err: err,
Expand All @@ -256,7 +256,7 @@ func (cm *cleanupManager) deleteObsoleteFile(
}

func (cm *cleanupManager) deleteObsoleteObject(
fileType fileType, jobID int, fileNum base.DiskFileNum,
fileType fileType, jobID JobID, fileNum base.DiskFileNum,
) {
if fileType != fileTypeTable {
panic("not an object")
Expand All @@ -277,7 +277,7 @@ func (cm *cleanupManager) deleteObsoleteObject(
switch fileType {
case fileTypeTable:
cm.opts.EventListener.TableDeleted(TableDeleteInfo{
JobID: jobID,
JobID: int(jobID),
Path: path,
FileNum: fileNum,
Err: err,
Expand Down
30 changes: 12 additions & 18 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,9 +353,9 @@ type compaction struct {
pickerMetrics compactionPickerMetrics
}

func (c *compaction) makeInfo(jobID int) CompactionInfo {
func (c *compaction) makeInfo(jobID JobID) CompactionInfo {
info := CompactionInfo{
JobID: jobID,
JobID: int(jobID),
Reason: c.kind.String(),
Input: make([]LevelInfo, 0, len(c.inputs)),
Annotations: []string{},
Expand Down Expand Up @@ -1648,10 +1648,9 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
}
d.addInProgressCompaction(c)

jobID := d.mu.nextJobID
d.mu.nextJobID++
jobID := d.newJobIDLocked()
d.opts.EventListener.FlushBegin(FlushInfo{
JobID: jobID,
JobID: int(jobID),
Input: inputs,
InputBytes: inputBytes,
Ingest: ingest,
Expand Down Expand Up @@ -1680,7 +1679,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
}

info := FlushInfo{
JobID: jobID,
JobID: int(jobID),
Input: inputs,
InputBytes: inputBytes,
Duration: d.timeNow().Sub(startTime),
Expand Down Expand Up @@ -2434,8 +2433,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
}()
}

jobID := d.mu.nextJobID
d.mu.nextJobID++
jobID := d.newJobIDLocked()
info := c.makeInfo(jobID)
d.opts.EventListener.CompactionBegin(info)
startTime := d.timeNow()
Expand Down Expand Up @@ -2537,7 +2535,7 @@ type compactStats struct {
// d.mu must be held when calling this method. The mutex will be released when
// doing IO.
func (d *DB) runCopyCompaction(
jobID int,
jobID JobID,
c *compaction,
inputMeta *fileMetadata,
objMeta objstorage.ObjectMetadata,
Expand Down Expand Up @@ -2696,7 +2694,7 @@ type compactionOutput struct {
// d.mu must be held when calling this, but the mutex may be dropped and
// re-acquired during the course of this method.
func (d *DB) runCompaction(
jobID int, c *compaction,
jobID JobID, c *compaction,
) (ve *versionEdit, pendingOutputs []compactionOutput, stats compactStats, retErr error) {
// As a sanity check, confirm that the smallest / largest keys for new and
// deleted files in the new versionEdit pass a validation function before
Expand Down Expand Up @@ -2972,7 +2970,7 @@ func (d *DB) runCompaction(
reason = "compacting"
}
d.opts.EventListener.TableCreated(TableCreateInfo{
JobID: jobID,
JobID: int(jobID),
Reason: reason,
Path: d.objProvider.Path(objMeta),
FileNum: diskFileNum,
Expand Down Expand Up @@ -3581,9 +3579,7 @@ func (d *DB) enableFileDeletions() {
if d.mu.disableFileDeletions > 0 {
return
}
jobID := d.mu.nextJobID
d.mu.nextJobID++
d.deleteObsoleteFiles(jobID)
d.deleteObsoleteFiles(d.newJobIDLocked())
}

type fileInfo = base.FileInfo
Expand All @@ -3594,7 +3590,7 @@ type fileInfo = base.FileInfo
//
// Does nothing if file deletions are disabled (see disableFileDeletions). A
// cleanup job will be scheduled when file deletions are re-enabled.
func (d *DB) deleteObsoleteFiles(jobID int) {
func (d *DB) deleteObsoleteFiles(jobID JobID) {
if d.mu.disableFileDeletions > 0 {
return
}
Expand Down Expand Up @@ -3701,9 +3697,7 @@ func (d *DB) maybeScheduleObsoleteTableDeletion() {

func (d *DB) maybeScheduleObsoleteTableDeletionLocked() {
if len(d.mu.versions.obsoleteTables) > 0 {
jobID := d.mu.nextJobID
d.mu.nextJobID++
d.deleteObsoleteFiles(jobID)
d.deleteObsoleteFiles(d.newJobIDLocked())
}
}

Expand Down
3 changes: 1 addition & 2 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1054,8 +1054,7 @@ func runDBDefineCmdReuseFS(td *datadriven.TestData, opts *Options) (*DB, error)
}

if len(ve.NewFiles) > 0 {
jobID := d.mu.nextJobID
d.mu.nextJobID++
jobID := d.newJobIDLocked()
d.mu.versions.logLock()
if err := d.mu.versions.logAndApply(jobID, ve, newFileMetrics(ve.NewFiles), false, func() []compactionInfo {
return nil
Expand Down
9 changes: 4 additions & 5 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ type DB struct {
// notifications and act as a mechanism for tying together the events and
// log messages for a single job such as a flush, compaction, or file
// ingestion. Job IDs are not serialized to disk or used for correctness.
nextJobID int
nextJobID JobID

// The collection of immutable versions and state about the log and visible
// sequence numbers. Use the pointer here to ensure the atomic fields in
Expand Down Expand Up @@ -1679,7 +1679,7 @@ func (d *DB) Close() error {
// Since we called d.readState.val.unrefLocked() above, we are expected to
// manually schedule deletion of obsolete files.
if len(d.mu.versions.obsoleteTables) > 0 {
d.deleteObsoleteFiles(d.mu.nextJobID)
d.deleteObsoleteFiles(d.newJobIDLocked())
}

d.mu.Unlock()
Expand Down Expand Up @@ -2745,8 +2745,7 @@ func (d *DB) recycleWAL() (newLogNum base.DiskFileNum, prevLogSize uint64) {
if d.opts.DisableWAL {
panic("pebble: invalid function call")
}
jobID := d.mu.nextJobID
d.mu.nextJobID++
jobID := d.newJobIDLocked()
newLogNum = d.mu.versions.getNextDiskFileNum()

d.mu.Unlock()
Expand All @@ -2770,7 +2769,7 @@ func (d *DB) recycleWAL() (newLogNum base.DiskFileNum, prevLogSize uint64) {
}

d.mu.Unlock()
writer, err := d.mu.log.manager.Create(wal.NumWAL(newLogNum), jobID)
writer, err := d.mu.log.manager.Create(wal.NumWAL(newLogNum), int(jobID))
if err != nil {
panic(err)
}
Expand Down
18 changes: 18 additions & 0 deletions db_internals.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2012 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package pebble

// JobID identifies a job (like a compaction). Job IDs are passed to event
// listener notifications and act as a mechanism for tying together the events
// and log messages for a single job such as a flush, compaction, or file
// ingestion. Job IDs are not serialized to disk or used for correctness.
type JobID int

// newJobIDLocked returns a new JobID; DB.mu must be held.
func (d *DB) newJobIDLocked() JobID {
res := d.mu.nextJobID
d.mu.nextJobID++
return res
}
5 changes: 2 additions & 3 deletions flushable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,12 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
for i := range paths {
pendingOutputs[i] = d.mu.versions.getNextFileNum()
}
jobID := d.mu.nextJobID
d.mu.nextJobID++
jobID := d.newJobIDLocked()
d.mu.Unlock()

// We can reuse the ingestLoad function for this test even if we're
// not actually ingesting a file.
lr, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, nil, nil, d.cacheID, pendingOutputs, d.objProvider, jobID)
lr, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, nil, nil, d.cacheID, pendingOutputs)
if err != nil {
panic(err)
}
Expand Down
3 changes: 1 addition & 2 deletions format_major_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,7 @@ var _ = (*DB)(nil).markFilesLocked
// markFilesLocked durably marks the files that match the given findFilesFunc for
// compaction.
func (d *DB) markFilesLocked(findFn findFilesFunc) error {
jobID := d.mu.nextJobID
d.mu.nextJobID++
jobID := d.newJobIDLocked()

// Acquire a read state to have a view of the LSM and a guarantee that none
// of the referenced files will be deleted until we've unreferenced the read
Expand Down
24 changes: 10 additions & 14 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,6 @@ func ingestLoad(
external []ExternalFile,
cacheID uint64,
pending []base.FileNum,
objProvider objstorage.Provider,
jobID int,
) (ingestLoadResult, error) {
localFileNums := pending[:len(paths)]
sharedFileNums := pending[len(paths) : len(paths)+len(shared)]
Expand Down Expand Up @@ -590,7 +588,7 @@ func ingestCleanup(objProvider objstorage.Provider, meta []ingestLocalMeta) erro
// ingestLinkLocal creates new objects which are backed by either hardlinks to or
// copies of the ingested files.
func ingestLinkLocal(
jobID int, opts *Options, objProvider objstorage.Provider, localMetas []ingestLocalMeta,
jobID JobID, opts *Options, objProvider objstorage.Provider, localMetas []ingestLocalMeta,
) error {
for i := range localMetas {
objMeta, err := objProvider.LinkOrCopyFromLocal(
Expand All @@ -605,7 +603,7 @@ func ingestLinkLocal(
}
if opts.EventListener.TableCreated != nil {
opts.EventListener.TableCreated(TableCreateInfo{
JobID: jobID,
JobID: int(jobID),
Reason: "ingesting",
Path: objProvider.Path(objMeta),
FileNum: base.PhysicalTableDiskFileNum(localMetas[i].FileNum),
Expand All @@ -622,7 +620,7 @@ func ingestLinkLocal(
//
// ingestUnprotectExternalBackings() must be called after this function (even in
// error cases).
func (d *DB) ingestAttachRemote(jobID int, lr ingestLoadResult) error {
func (d *DB) ingestAttachRemote(jobID JobID, lr ingestLoadResult) error {
remoteObjs := make([]objstorage.RemoteObjectToAttach, 0, len(lr.shared)+len(lr.external))
for i := range lr.shared {
backing, err := lr.shared[i].shared.Backing.Get()
Expand Down Expand Up @@ -705,7 +703,7 @@ func (d *DB) ingestAttachRemote(jobID int, lr ingestLoadResult) error {
if d.opts.EventListener.TableCreated != nil {
for i := range remoteObjMetas {
d.opts.EventListener.TableCreated(TableCreateInfo{
JobID: jobID,
JobID: int(jobID),
Reason: "ingesting",
Path: d.objProvider.Path(remoteObjMetas[i]),
FileNum: remoteObjMetas[i].DiskFileNum,
Expand Down Expand Up @@ -1459,13 +1457,12 @@ func (d *DB) ingest(
pendingOutputs[i] = d.mu.versions.getNextFileNum()
}

jobID := d.mu.nextJobID
d.mu.nextJobID++
jobID := d.newJobIDLocked()
d.mu.Unlock()

// Load the metadata for all the files being ingested. This step detects
// and elides empty sstables.
loadResult, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, shared, external, d.cacheID, pendingOutputs, d.objProvider, jobID)
loadResult, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, shared, external, d.cacheID, pendingOutputs)
if err != nil {
return IngestOperationStats{}, err
}
Expand Down Expand Up @@ -1755,7 +1752,7 @@ func (d *DB) ingest(
}

info := TableIngestInfo{
JobID: jobID,
JobID: int(jobID),
Err: err,
flushable: asFlushable,
}
Expand Down Expand Up @@ -2248,7 +2245,7 @@ func (d *DB) ingestSplit(
}

func (d *DB) ingestApply(
jobID int,
jobID JobID,
lr ingestLoadResult,
findTargetLevel ingestTargetLevelFunc,
mut *memTable,
Expand Down Expand Up @@ -2589,8 +2586,7 @@ func (d *DB) validateSSTables() {
pending := d.mu.tableValidation.pending
d.mu.tableValidation.pending = nil
d.mu.tableValidation.validating = true
jobID := d.mu.nextJobID
d.mu.nextJobID++
jobID := d.newJobIDLocked()
rs := d.loadReadState()

// Drop DB.mu before performing IO.
Expand Down Expand Up @@ -2661,7 +2657,7 @@ func (d *DB) validateSSTables() {
}

d.opts.EventListener.TableValidated(TableValidatedInfo{
JobID: jobID,
JobID: int(jobID),
Meta: f.Meta,
})
}
Expand Down
6 changes: 3 additions & 3 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestIngestLoad(t *testing.T) {
Comparer: DefaultComparer,
FS: mem,
}).WithFSDefaults()
lr, err := ingestLoad(opts, dbVersion, []string{"ext"}, nil, nil, 0, []base.FileNum{1}, nil, 0)
lr, err := ingestLoad(opts, dbVersion, []string{"ext"}, nil, nil, 0, []base.FileNum{1})
if err != nil {
return err.Error()
}
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestIngestLoadRand(t *testing.T) {
Comparer: DefaultComparer,
FS: mem,
}).WithFSDefaults()
lr, err := ingestLoad(opts, version, paths, nil, nil, 0, pending, nil, 0)
lr, err := ingestLoad(opts, version, paths, nil, nil, 0, pending)
require.NoError(t, err)

for _, m := range lr.local {
Expand All @@ -238,7 +238,7 @@ func TestIngestLoadInvalid(t *testing.T) {
Comparer: DefaultComparer,
FS: mem,
}).WithFSDefaults()
if _, err := ingestLoad(opts, internalFormatNewest, []string{"invalid"}, nil, nil, 0, []base.FileNum{1}, nil, 0); err == nil {
if _, err := ingestLoad(opts, internalFormatNewest, []string{"invalid"}, nil, nil, 0, []base.FileNum{1}); err == nil {
t.Fatalf("expected error, but found success")
}
}
Expand Down
Loading

0 comments on commit cc5b658

Please sign in to comment.