Skip to content

Commit

Permalink
db: add local table size metrics
Browse files Browse the repository at this point in the history
The Metrics object lacks a total size of local sstables, which causes
DiskSpaceUsage to overestimate the local disk space used. The new metrics
track live, zombie and obsolete local file sizes.

Fixes #3391
  • Loading branch information
sumeerbhola committed Mar 18, 2024
1 parent d4f5456 commit 8a54a28
Show file tree
Hide file tree
Showing 15 changed files with 645 additions and 160 deletions.
7 changes: 4 additions & 3 deletions cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type ArchiveCleaner = base.ArchiveCleaner
type cleanupManager struct {
opts *Options
objProvider objstorage.Provider
onTableDeleteFn func(fileSize uint64)
onTableDeleteFn func(fileSize uint64, isLocal bool)
deletePacer *deletionPacer

// jobsCh is used as the cleanup job queue.
Expand All @@ -56,6 +56,7 @@ type deletableFile struct {
dir string
fileNum base.DiskFileNum
fileSize uint64
isLocal bool
}

// obsoleteFile holds information about a file that needs to be deleted soon.
Expand All @@ -77,7 +78,7 @@ type cleanupJob struct {
func openCleanupManager(
opts *Options,
objProvider objstorage.Provider,
onTableDeleteFn func(fileSize uint64),
onTableDeleteFn func(fileSize uint64, isLocal bool),
getDeletePacerInfo func() deletionPacerInfo,
) *cleanupManager {
cm := &cleanupManager{
Expand Down Expand Up @@ -162,7 +163,7 @@ func (cm *cleanupManager) mainLoop() {
switch of.fileType {
case fileTypeTable:
cm.maybePace(&tb, of.fileType, of.nonLogFile.fileNum, of.nonLogFile.fileSize)
cm.onTableDeleteFn(of.nonLogFile.fileSize)
cm.onTableDeleteFn(of.nonLogFile.fileSize, of.nonLogFile.isLocal)
cm.deleteObsoleteObject(fileTypeTable, job.jobID, of.nonLogFile.fileNum)
case fileTypeLog:
cm.deleteObsoleteFile(of.logFile.FS, fileTypeLog, job.jobID, of.logFile.Path,
Expand Down
113 changes: 80 additions & 33 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,11 +439,7 @@ func newCompaction(
isRemote := false
// We should always be passed a provider, except in some unit tests.
if provider != nil {
objMeta, err := provider.Lookup(fileTypeTable, meta.FileBacking.DiskFileNum)
if err != nil {
panic(errors.Wrapf(err, "cannot lookup table %s in provider", meta.FileBacking.DiskFileNum))
}
isRemote = objMeta.IsRemote()
isRemote = !objstorage.MustIsLocalTable(provider, meta.FileBacking.DiskFileNum)
}
// Avoid a trivial move or copy if all of these are true, as rewriting a
// new file is better:
Expand Down Expand Up @@ -1292,10 +1288,13 @@ func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
}

// onObsoleteTableDelete is called to update metrics when an sstable is deleted.
func (d *DB) onObsoleteTableDelete(fileSize uint64) {
func (d *DB) onObsoleteTableDelete(fileSize uint64, isLocal bool) {
d.mu.Lock()
d.mu.versions.metrics.Table.ObsoleteCount--
d.mu.versions.metrics.Table.ObsoleteSize -= fileSize
if isLocal {
d.mu.versions.metrics.Table.Local.ObsoleteSize -= fileSize
}
d.mu.Unlock()
}

Expand Down Expand Up @@ -1609,7 +1608,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
startTime := d.timeNow()

var ve *manifest.VersionEdit
var pendingOutputs []*fileMetadata
var pendingOutputs []compactionOutput
var stats compactStats
// To determine the target level of the files in the ingestedFlushable, we
// need to acquire the logLock, and not release it for that duration. Since,
Expand Down Expand Up @@ -1701,9 +1700,12 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
// physical files on disk. This property might not hold once
// https://github.com/cockroachdb/pebble/issues/389 is
// implemented if #389 creates virtual sstables as output files.
d.mu.versions.obsoleteTables = append(d.mu.versions.obsoleteTables, fileInfo{
FileNum: base.PhysicalTableDiskFileNum(f.FileNum),
FileSize: f.Size,
d.mu.versions.obsoleteTables = append(d.mu.versions.obsoleteTables, tableInfo{
fileInfo: fileInfo{
FileNum: base.PhysicalTableDiskFileNum(f.meta.FileNum),
FileSize: f.meta.Size,
},
isLocal: f.isLocal,
})
}
d.mu.versions.updateObsoleteTableMetricsLocked()
Expand Down Expand Up @@ -2358,9 +2360,12 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
// physical files on disk. This property might not hold once
// https://github.com/cockroachdb/pebble/issues/389 is
// implemented if #389 creates virtual sstables as output files.
d.mu.versions.obsoleteTables = append(d.mu.versions.obsoleteTables, fileInfo{
FileNum: base.PhysicalTableDiskFileNum(f.FileNum),
FileSize: f.Size,
d.mu.versions.obsoleteTables = append(d.mu.versions.obsoleteTables, tableInfo{
fileInfo: fileInfo{
FileNum: base.PhysicalTableDiskFileNum(f.meta.FileNum),
FileSize: f.meta.Size,
},
isLocal: f.isLocal,
})
}
d.mu.versions.updateObsoleteTableMetricsLocked()
Expand Down Expand Up @@ -2422,7 +2427,7 @@ func (d *DB) runCopyCompaction(
inputMeta *fileMetadata,
objMeta objstorage.ObjectMetadata,
versionEdit *versionEdit,
) (ve *versionEdit, pendingOutputs []*fileMetadata, retErr error) {
) (ve *versionEdit, pendingOutputs []compactionOutput, retErr error) {
ctx := context.TODO()

ve = versionEdit
Expand Down Expand Up @@ -2498,7 +2503,7 @@ func (d *DB) runCopyCompaction(
}
defer src.Close()

w, _, err := d.objProvider.Create(
w, outObjMeta, err := d.objProvider.Create(
ctx, fileTypeTable, base.PhysicalTableDiskFileNum(newMeta.FileNum),
objstorage.CreateOptions{
PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level),
Expand All @@ -2507,7 +2512,10 @@ func (d *DB) runCopyCompaction(
if err != nil {
return ve, pendingOutputs, err
}
pendingOutputs = append(pendingOutputs, newMeta)
pendingOutputs = append(pendingOutputs, compactionOutput{
meta: newMeta,
isLocal: !outObjMeta.IsRemote(),
})

if err := objstorage.Copy(ctx, src, w, 0, uint64(src.Size())); err != nil {
w.Abort()
Expand All @@ -2517,8 +2525,10 @@ func (d *DB) runCopyCompaction(
return ve, pendingOutputs, err
}
} else {
pendingOutputs = append(pendingOutputs, newMeta.PhysicalMeta().FileMetadata)

pendingOutputs = append(pendingOutputs, compactionOutput{
meta: newMeta.PhysicalMeta().FileMetadata,
isLocal: true,
})
_, err := d.objProvider.LinkOrCopyFromLocal(context.TODO(), d.opts.FS,
d.objProvider.Path(objMeta), fileTypeTable, newMeta.FileBacking.DiskFileNum,
objstorage.CreateOptions{PreferSharedStorage: true})
Expand All @@ -2538,14 +2548,19 @@ func (d *DB) runCopyCompaction(
return ve, pendingOutputs, nil
}

type compactionOutput struct {
meta *fileMetadata
isLocal bool
}

// runCompactions runs a compaction that produces new on-disk tables from
// memtables or old on-disk tables.
//
// d.mu must be held when calling this, but the mutex may be dropped and
// re-acquired during the course of this method.
func (d *DB) runCompaction(
jobID int, c *compaction,
) (ve *versionEdit, pendingOutputs []*fileMetadata, stats compactStats, retErr error) {
) (ve *versionEdit, pendingOutputs []compactionOutput, stats compactStats, retErr error) {
// As a sanity check, confirm that the smallest / largest keys for new and
// deleted files in the new versionEdit pass a validation function before
// returning the edit.
Expand Down Expand Up @@ -2783,11 +2798,8 @@ func (d *DB) runCompaction(
if c.cancel.Load() {
return ErrCancelledCompaction
}
fileMeta := &fileMetadata{}
d.mu.Lock()
fileNum := d.mu.versions.getNextFileNum()
fileMeta.FileNum = fileNum
pendingOutputs = append(pendingOutputs, fileMeta.PhysicalMeta().FileMetadata)
d.mu.Unlock()

ctx := context.TODO()
Expand All @@ -2811,6 +2823,12 @@ func (d *DB) runCompaction(
if err != nil {
return err
}
fileMeta := &fileMetadata{}
fileMeta.FileNum = fileNum
pendingOutputs = append(pendingOutputs, compactionOutput{
meta: fileMeta.PhysicalMeta().FileMetadata,
isLocal: !objMeta.IsRemote(),
})

reason := "flushing"
if c.flushing == nil {
Expand Down Expand Up @@ -3335,7 +3353,7 @@ func (d *DB) scanObsoleteFiles(list []string) {

manifestFileNum := d.mu.versions.manifestFileNum

var obsoleteTables []fileInfo
var obsoleteTables []tableInfo
var obsoleteManifests []fileInfo
var obsoleteOptions []fileInfo

Expand Down Expand Up @@ -3383,14 +3401,17 @@ func (d *DB) scanObsoleteFiles(list []string) {
if size, err := d.objProvider.Size(obj); err == nil {
fileInfo.FileSize = uint64(size)
}
obsoleteTables = append(obsoleteTables, fileInfo)
obsoleteTables = append(obsoleteTables, tableInfo{
fileInfo: fileInfo,
isLocal: !obj.IsRemote(),
})

default:
// Ignore object types we don't know about.
}
}

d.mu.versions.obsoleteTables = merge(d.mu.versions.obsoleteTables, obsoleteTables)
d.mu.versions.obsoleteTables = mergeTableInfos(d.mu.versions.obsoleteTables, obsoleteTables)
d.mu.versions.updateObsoleteTableMetricsLocked()
d.mu.versions.obsoleteManifests = merge(d.mu.versions.obsoleteManifests, obsoleteManifests)
d.mu.versions.obsoleteOptions = merge(d.mu.versions.obsoleteOptions, obsoleteOptions)
Expand Down Expand Up @@ -3449,7 +3470,7 @@ func (d *DB) deleteObsoleteFiles(jobID int) {
panic(err)
}

obsoleteTables := append([]fileInfo(nil), d.mu.versions.obsoleteTables...)
obsoleteTables := append([]tableInfo(nil), d.mu.versions.obsoleteTables...)
d.mu.versions.obsoleteTables = nil

for _, tbl := range obsoleteTables {
Expand Down Expand Up @@ -3484,11 +3505,27 @@ func (d *DB) deleteObsoleteFiles(jobID int) {
for _, f := range obsoleteLogs {
filesToDelete = append(filesToDelete, obsoleteFile{fileType: fileTypeLog, logFile: f})
}
files := [3]struct {
// We sort to make the order of deletions deterministic, which is nice for
// tests.
slices.SortFunc(obsoleteTables, func(a, b tableInfo) int {
return cmp.Compare(a.FileNum, b.FileNum)
})
for _, f := range obsoleteTables {
d.tableCache.evict(f.FileNum)
filesToDelete = append(filesToDelete, obsoleteFile{
fileType: fileTypeTable,
nonLogFile: deletableFile{
dir: d.dirname,
fileNum: f.FileNum,
fileSize: f.FileSize,
isLocal: f.isLocal,
},
})
}
files := [2]struct {
fileType fileType
obsolete []fileInfo
}{
{fileTypeTable, obsoleteTables},
{fileTypeManifest, obsoleteManifests},
{fileTypeOptions, obsoleteOptions},
}
Expand All @@ -3500,17 +3537,13 @@ func (d *DB) deleteObsoleteFiles(jobID int) {
})
for _, fi := range f.obsolete {
dir := d.dirname
switch f.fileType {
case fileTypeTable:
d.tableCache.evict(fi.FileNum)
}

filesToDelete = append(filesToDelete, obsoleteFile{
fileType: f.fileType,
nonLogFile: deletableFile{
dir: dir,
fileNum: fi.FileNum,
fileSize: fi.FileSize,
isLocal: true,
},
})
}
Expand Down Expand Up @@ -3550,3 +3583,17 @@ func merge(a, b []fileInfo) []fileInfo {
return a.FileNum == b.FileNum
})
}

func mergeTableInfos(a, b []tableInfo) []tableInfo {
if len(b) == 0 {
return a
}

a = append(a, b...)
slices.SortFunc(a, func(a, b tableInfo) int {
return cmp.Compare(a.FileNum, b.FileNum)
})
return slices.CompactFunc(a, func(a, b tableInfo) bool {
return a.FileNum == b.FileNum
})
}
7 changes: 5 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2128,8 +2128,11 @@ func (d *DB) Metrics() *Metrics {
}
}
metrics.Table.ZombieCount = int64(len(d.mu.versions.zombieTables))
for _, size := range d.mu.versions.zombieTables {
metrics.Table.ZombieSize += size
for _, info := range d.mu.versions.zombieTables {
metrics.Table.ZombieSize += info.FileSize
if info.isLocal {
metrics.Table.Local.ZombieSize += info.FileSize
}
}
metrics.private.optionsFileSize = d.optionsFileSize

Expand Down
29 changes: 21 additions & 8 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,16 @@ type Metrics struct {
BackingTableCount uint64
// The sum of the sizes of the BackingTableCount sstables that are backing virtual tables.
BackingTableSize uint64

// Local file sizes.
Local struct {
// LiveSize is the number of bytes in live tables.
LiveSize uint64
// ObsoleteSize is the number of bytes in obsolete tables.
ObsoleteSize uint64
// ZombieSize is the number of bytes in zombie tables.
ZombieSize uint64
}
}

TableCache CacheMetrics
Expand Down Expand Up @@ -329,18 +339,19 @@ var (
)

// DiskSpaceUsage returns the total disk space used by the database in bytes,
// including live and obsolete files.
// including live and obsolete files. This only includes local files, i.e.,
// remote files (as known to objstorage.Provider) are not included.
func (m *Metrics) DiskSpaceUsage() uint64 {
var usageBytes uint64
usageBytes += m.WAL.PhysicalSize
usageBytes += m.WAL.ObsoletePhysicalSize
for _, lm := range m.Levels {
usageBytes += uint64(lm.Size)
}
usageBytes += m.Table.ObsoleteSize
usageBytes += m.Table.ZombieSize
usageBytes += m.Table.Local.LiveSize
usageBytes += m.Table.Local.ObsoleteSize
usageBytes += m.Table.Local.ZombieSize
usageBytes += m.private.optionsFileSize
usageBytes += m.private.manifestFileSize
// TODO(sumeer): InProgressBytes does not distinguish between local and
// remote files. This causes a small error. Fix.
usageBytes += uint64(m.Compact.InProgressBytes)
return usageBytes
}
Expand Down Expand Up @@ -571,16 +582,18 @@ func (m *Metrics) SafeFormat(w redact.SafePrinter, _ rune) {
redact.Safe(m.MemTable.ZombieCount),
humanize.Bytes.Uint64(m.MemTable.ZombieSize))

w.Printf("Zombie tables: %d (%s)\n",
w.Printf("Zombie tables: %d (%s, local: %s)\n",
redact.Safe(m.Table.ZombieCount),
humanize.Bytes.Uint64(m.Table.ZombieSize))
humanize.Bytes.Uint64(m.Table.ZombieSize),
humanize.Bytes.Uint64(m.Table.Local.ZombieSize))

w.Printf("Backing tables: %d (%s)\n",
redact.Safe(m.Table.BackingTableCount),
humanize.Bytes.Uint64(m.Table.BackingTableSize))
w.Printf("Virtual tables: %d (%s)\n",
redact.Safe(m.NumVirtual()),
humanize.Bytes.Uint64(m.VirtualSize()))
w.Printf("Local tables size: %s\n", humanize.Bytes.Uint64(m.Table.Local.LiveSize))

formatCacheMetrics := func(m *CacheMetrics, name redact.SafeString) {
w.Printf("%s: %s entries (%s) hit rate: %.1f%%\n",
Expand Down
Loading

0 comments on commit 8a54a28

Please sign in to comment.