Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: add local table size metrics #3407

Merged
merged 1 commit into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type ArchiveCleaner = base.ArchiveCleaner
type cleanupManager struct {
opts *Options
objProvider objstorage.Provider
onTableDeleteFn func(fileSize uint64)
onTableDeleteFn func(fileSize uint64, isLocal bool)
deletePacer *deletionPacer

// jobsCh is used as the cleanup job queue.
Expand All @@ -56,6 +56,7 @@ type deletableFile struct {
dir string
fileNum base.DiskFileNum
fileSize uint64
isLocal bool
}

// obsoleteFile holds information about a file that needs to be deleted soon.
Expand All @@ -77,7 +78,7 @@ type cleanupJob struct {
func openCleanupManager(
opts *Options,
objProvider objstorage.Provider,
onTableDeleteFn func(fileSize uint64),
onTableDeleteFn func(fileSize uint64, isLocal bool),
getDeletePacerInfo func() deletionPacerInfo,
) *cleanupManager {
cm := &cleanupManager{
Expand Down Expand Up @@ -162,7 +163,7 @@ func (cm *cleanupManager) mainLoop() {
switch of.fileType {
case fileTypeTable:
cm.maybePace(&tb, of.fileType, of.nonLogFile.fileNum, of.nonLogFile.fileSize)
cm.onTableDeleteFn(of.nonLogFile.fileSize)
cm.onTableDeleteFn(of.nonLogFile.fileSize, of.nonLogFile.isLocal)
cm.deleteObsoleteObject(fileTypeTable, job.jobID, of.nonLogFile.fileNum)
case fileTypeLog:
cm.deleteObsoleteFile(of.logFile.FS, fileTypeLog, job.jobID, of.logFile.Path,
Expand Down
113 changes: 80 additions & 33 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,11 +439,7 @@ func newCompaction(
isRemote := false
// We should always be passed a provider, except in some unit tests.
if provider != nil {
objMeta, err := provider.Lookup(fileTypeTable, meta.FileBacking.DiskFileNum)
if err != nil {
panic(errors.Wrapf(err, "cannot lookup table %s in provider", meta.FileBacking.DiskFileNum))
}
isRemote = objMeta.IsRemote()
isRemote = !objstorage.MustIsLocalTable(provider, meta.FileBacking.DiskFileNum)
}
// Avoid a trivial move or copy if all of these are true, as rewriting a
// new file is better:
Expand Down Expand Up @@ -1292,10 +1288,13 @@ func (d *DB) getDeletionPacerInfo() deletionPacerInfo {
}

// onObsoleteTableDelete is called to update metrics when an sstable is deleted.
func (d *DB) onObsoleteTableDelete(fileSize uint64) {
func (d *DB) onObsoleteTableDelete(fileSize uint64, isLocal bool) {
d.mu.Lock()
d.mu.versions.metrics.Table.ObsoleteCount--
d.mu.versions.metrics.Table.ObsoleteSize -= fileSize
if isLocal {
d.mu.versions.metrics.Table.Local.ObsoleteSize -= fileSize
}
d.mu.Unlock()
}

Expand Down Expand Up @@ -1609,7 +1608,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
startTime := d.timeNow()

var ve *manifest.VersionEdit
var pendingOutputs []*fileMetadata
var pendingOutputs []compactionOutput
var stats compactStats
// To determine the target level of the files in the ingestedFlushable, we
// need to acquire the logLock, and not release it for that duration. Since,
Expand Down Expand Up @@ -1701,9 +1700,12 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
// physical files on disk. This property might not hold once
// https://github.com/cockroachdb/pebble/issues/389 is
// implemented if #389 creates virtual sstables as output files.
d.mu.versions.obsoleteTables = append(d.mu.versions.obsoleteTables, fileInfo{
FileNum: base.PhysicalTableDiskFileNum(f.FileNum),
FileSize: f.Size,
d.mu.versions.obsoleteTables = append(d.mu.versions.obsoleteTables, tableInfo{
fileInfo: fileInfo{
FileNum: base.PhysicalTableDiskFileNum(f.meta.FileNum),
FileSize: f.meta.Size,
},
isLocal: f.isLocal,
})
}
d.mu.versions.updateObsoleteTableMetricsLocked()
Expand Down Expand Up @@ -2358,9 +2360,12 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
// physical files on disk. This property might not hold once
// https://github.com/cockroachdb/pebble/issues/389 is
// implemented if #389 creates virtual sstables as output files.
d.mu.versions.obsoleteTables = append(d.mu.versions.obsoleteTables, fileInfo{
FileNum: base.PhysicalTableDiskFileNum(f.FileNum),
FileSize: f.Size,
d.mu.versions.obsoleteTables = append(d.mu.versions.obsoleteTables, tableInfo{
fileInfo: fileInfo{
FileNum: base.PhysicalTableDiskFileNum(f.meta.FileNum),
FileSize: f.meta.Size,
},
isLocal: f.isLocal,
})
}
d.mu.versions.updateObsoleteTableMetricsLocked()
Expand Down Expand Up @@ -2422,7 +2427,7 @@ func (d *DB) runCopyCompaction(
inputMeta *fileMetadata,
objMeta objstorage.ObjectMetadata,
versionEdit *versionEdit,
) (ve *versionEdit, pendingOutputs []*fileMetadata, retErr error) {
) (ve *versionEdit, pendingOutputs []compactionOutput, retErr error) {
ctx := context.TODO()

ve = versionEdit
Expand Down Expand Up @@ -2498,7 +2503,7 @@ func (d *DB) runCopyCompaction(
}
defer src.Close()

w, _, err := d.objProvider.Create(
w, outObjMeta, err := d.objProvider.Create(
ctx, fileTypeTable, base.PhysicalTableDiskFileNum(newMeta.FileNum),
objstorage.CreateOptions{
PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level),
Expand All @@ -2507,7 +2512,10 @@ func (d *DB) runCopyCompaction(
if err != nil {
return ve, pendingOutputs, err
}
pendingOutputs = append(pendingOutputs, newMeta)
pendingOutputs = append(pendingOutputs, compactionOutput{
meta: newMeta,
isLocal: !outObjMeta.IsRemote(),
})

if err := objstorage.Copy(ctx, src, w, 0, uint64(src.Size())); err != nil {
w.Abort()
Expand All @@ -2517,8 +2525,10 @@ func (d *DB) runCopyCompaction(
return ve, pendingOutputs, err
}
} else {
pendingOutputs = append(pendingOutputs, newMeta.PhysicalMeta().FileMetadata)

pendingOutputs = append(pendingOutputs, compactionOutput{
meta: newMeta.PhysicalMeta().FileMetadata,
isLocal: true,
})
_, err := d.objProvider.LinkOrCopyFromLocal(context.TODO(), d.opts.FS,
d.objProvider.Path(objMeta), fileTypeTable, newMeta.FileBacking.DiskFileNum,
objstorage.CreateOptions{PreferSharedStorage: true})
Expand All @@ -2538,14 +2548,19 @@ func (d *DB) runCopyCompaction(
return ve, pendingOutputs, nil
}

type compactionOutput struct {
meta *fileMetadata
isLocal bool
}

// runCompactions runs a compaction that produces new on-disk tables from
// memtables or old on-disk tables.
//
// d.mu must be held when calling this, but the mutex may be dropped and
// re-acquired during the course of this method.
func (d *DB) runCompaction(
jobID int, c *compaction,
) (ve *versionEdit, pendingOutputs []*fileMetadata, stats compactStats, retErr error) {
) (ve *versionEdit, pendingOutputs []compactionOutput, stats compactStats, retErr error) {
// As a sanity check, confirm that the smallest / largest keys for new and
// deleted files in the new versionEdit pass a validation function before
// returning the edit.
Expand Down Expand Up @@ -2783,11 +2798,8 @@ func (d *DB) runCompaction(
if c.cancel.Load() {
return ErrCancelledCompaction
}
fileMeta := &fileMetadata{}
d.mu.Lock()
fileNum := d.mu.versions.getNextFileNum()
fileMeta.FileNum = fileNum
pendingOutputs = append(pendingOutputs, fileMeta.PhysicalMeta().FileMetadata)
d.mu.Unlock()

ctx := context.TODO()
Expand All @@ -2811,6 +2823,12 @@ func (d *DB) runCompaction(
if err != nil {
return err
}
fileMeta := &fileMetadata{}
fileMeta.FileNum = fileNum
pendingOutputs = append(pendingOutputs, compactionOutput{
meta: fileMeta.PhysicalMeta().FileMetadata,
isLocal: !objMeta.IsRemote(),
})

reason := "flushing"
if c.flushing == nil {
Expand Down Expand Up @@ -3335,7 +3353,7 @@ func (d *DB) scanObsoleteFiles(list []string) {

manifestFileNum := d.mu.versions.manifestFileNum

var obsoleteTables []fileInfo
var obsoleteTables []tableInfo
var obsoleteManifests []fileInfo
var obsoleteOptions []fileInfo

Expand Down Expand Up @@ -3383,14 +3401,17 @@ func (d *DB) scanObsoleteFiles(list []string) {
if size, err := d.objProvider.Size(obj); err == nil {
fileInfo.FileSize = uint64(size)
}
obsoleteTables = append(obsoleteTables, fileInfo)
obsoleteTables = append(obsoleteTables, tableInfo{
fileInfo: fileInfo,
isLocal: !obj.IsRemote(),
})

default:
// Ignore object types we don't know about.
}
}

d.mu.versions.obsoleteTables = merge(d.mu.versions.obsoleteTables, obsoleteTables)
d.mu.versions.obsoleteTables = mergeTableInfos(d.mu.versions.obsoleteTables, obsoleteTables)
d.mu.versions.updateObsoleteTableMetricsLocked()
d.mu.versions.obsoleteManifests = merge(d.mu.versions.obsoleteManifests, obsoleteManifests)
d.mu.versions.obsoleteOptions = merge(d.mu.versions.obsoleteOptions, obsoleteOptions)
Expand Down Expand Up @@ -3449,7 +3470,7 @@ func (d *DB) deleteObsoleteFiles(jobID int) {
panic(err)
}

obsoleteTables := append([]fileInfo(nil), d.mu.versions.obsoleteTables...)
obsoleteTables := append([]tableInfo(nil), d.mu.versions.obsoleteTables...)
d.mu.versions.obsoleteTables = nil

for _, tbl := range obsoleteTables {
Expand Down Expand Up @@ -3484,11 +3505,27 @@ func (d *DB) deleteObsoleteFiles(jobID int) {
for _, f := range obsoleteLogs {
filesToDelete = append(filesToDelete, obsoleteFile{fileType: fileTypeLog, logFile: f})
}
files := [3]struct {
// We sort to make the order of deletions deterministic, which is nice for
// tests.
slices.SortFunc(obsoleteTables, func(a, b tableInfo) int {
return cmp.Compare(a.FileNum, b.FileNum)
})
for _, f := range obsoleteTables {
d.tableCache.evict(f.FileNum)
filesToDelete = append(filesToDelete, obsoleteFile{
fileType: fileTypeTable,
nonLogFile: deletableFile{
dir: d.dirname,
fileNum: f.FileNum,
fileSize: f.FileSize,
isLocal: f.isLocal,
},
})
}
files := [2]struct {
fileType fileType
obsolete []fileInfo
}{
{fileTypeTable, obsoleteTables},
{fileTypeManifest, obsoleteManifests},
{fileTypeOptions, obsoleteOptions},
}
Expand All @@ -3500,17 +3537,13 @@ func (d *DB) deleteObsoleteFiles(jobID int) {
})
for _, fi := range f.obsolete {
dir := d.dirname
switch f.fileType {
case fileTypeTable:
d.tableCache.evict(fi.FileNum)
}

filesToDelete = append(filesToDelete, obsoleteFile{
fileType: f.fileType,
nonLogFile: deletableFile{
dir: dir,
fileNum: fi.FileNum,
fileSize: fi.FileSize,
isLocal: true,
},
})
}
Expand Down Expand Up @@ -3550,3 +3583,17 @@ func merge(a, b []fileInfo) []fileInfo {
return a.FileNum == b.FileNum
})
}

func mergeTableInfos(a, b []tableInfo) []tableInfo {
if len(b) == 0 {
return a
}

a = append(a, b...)
slices.SortFunc(a, func(a, b tableInfo) int {
return cmp.Compare(a.FileNum, b.FileNum)
})
return slices.CompactFunc(a, func(a, b tableInfo) bool {
return a.FileNum == b.FileNum
})
}
7 changes: 5 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2128,8 +2128,11 @@ func (d *DB) Metrics() *Metrics {
}
}
metrics.Table.ZombieCount = int64(len(d.mu.versions.zombieTables))
for _, size := range d.mu.versions.zombieTables {
metrics.Table.ZombieSize += size
for _, info := range d.mu.versions.zombieTables {
metrics.Table.ZombieSize += info.FileSize
if info.isLocal {
metrics.Table.Local.ZombieSize += info.FileSize
}
}
metrics.private.optionsFileSize = d.optionsFileSize

Expand Down
29 changes: 21 additions & 8 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,16 @@ type Metrics struct {
BackingTableCount uint64
// The sum of the sizes of the BackingTableCount sstables that are backing virtual tables.
BackingTableSize uint64

// Local file sizes.
Local struct {
// LiveSize is the number of bytes in live tables.
LiveSize uint64
// ObsoleteSize is the number of bytes in obsolete tables.
ObsoleteSize uint64
// ZombieSize is the number of bytes in zombie tables.
ZombieSize uint64
}
}

TableCache CacheMetrics
Expand Down Expand Up @@ -329,18 +339,19 @@ var (
)

// DiskSpaceUsage returns the total disk space used by the database in bytes,
// including live and obsolete files.
// including live and obsolete files. This only includes local files, i.e.,
// remote files (as known to objstorage.Provider) are not included.
func (m *Metrics) DiskSpaceUsage() uint64 {
var usageBytes uint64
usageBytes += m.WAL.PhysicalSize
usageBytes += m.WAL.ObsoletePhysicalSize
for _, lm := range m.Levels {
usageBytes += uint64(lm.Size)
}
usageBytes += m.Table.ObsoleteSize
usageBytes += m.Table.ZombieSize
usageBytes += m.Table.Local.LiveSize
usageBytes += m.Table.Local.ObsoleteSize
usageBytes += m.Table.Local.ZombieSize
usageBytes += m.private.optionsFileSize
usageBytes += m.private.manifestFileSize
// TODO(sumeer): InProgressBytes does not distinguish between local and
// remote files. This causes a small error. Fix.
usageBytes += uint64(m.Compact.InProgressBytes)
return usageBytes
}
Expand Down Expand Up @@ -571,16 +582,18 @@ func (m *Metrics) SafeFormat(w redact.SafePrinter, _ rune) {
redact.Safe(m.MemTable.ZombieCount),
humanize.Bytes.Uint64(m.MemTable.ZombieSize))

w.Printf("Zombie tables: %d (%s)\n",
w.Printf("Zombie tables: %d (%s, local: %s)\n",
redact.Safe(m.Table.ZombieCount),
humanize.Bytes.Uint64(m.Table.ZombieSize))
humanize.Bytes.Uint64(m.Table.ZombieSize),
humanize.Bytes.Uint64(m.Table.Local.ZombieSize))

w.Printf("Backing tables: %d (%s)\n",
redact.Safe(m.Table.BackingTableCount),
humanize.Bytes.Uint64(m.Table.BackingTableSize))
w.Printf("Virtual tables: %d (%s)\n",
redact.Safe(m.NumVirtual()),
humanize.Bytes.Uint64(m.VirtualSize()))
w.Printf("Local tables size: %s\n", humanize.Bytes.Uint64(m.Table.Local.LiveSize))

formatCacheMetrics := func(m *CacheMetrics, name redact.SafeString) {
w.Printf("%s: %s entries (%s) hit rate: %.1f%%\n",
Expand Down
Loading
Loading