Skip to content

Commit

Permalink
vfs: add write category during file creation
Browse files Browse the repository at this point in the history
This commit modifies the vfs file creation interface to
support grouping disk write metrics by the write source.

Informs cockroachdb/cockroach#115434
  • Loading branch information
CheranMahalingam committed Feb 22, 2024
1 parent 2b9ea0c commit e6e8b35
Show file tree
Hide file tree
Showing 64 changed files with 299 additions and 217 deletions.
2 changes: 1 addition & 1 deletion checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (d *DB) writeCheckpointManifest(
}
defer src.Close()

dst, err := fs.Create(destPath)
dst, err := fs.Create(destPath, vfs.WriteCategoryUnspecified)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestCleaner(t *testing.T) {
if len(td.CmdArgs) != 1 {
return "create-bogus-file <db/file>"
}
dst, err := fs.Create(td.CmdArgs[0].String())
dst, err := fs.Create(td.CmdArgs[0].String(), vfs.WriteCategoryUnspecified)
require.NoError(t, err)
_, err = dst.Write([]byte("bogus data"))
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/pebble/fsbench.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ type fsBench struct {
// createFile can be used to create an empty file.
// Invariant: File shouldn't already exist.
func createFile(filepath string) vfs.File {
fh, err := fsConfig.fs.Create(filepath)
fh, err := fsConfig.fs.Create(filepath, vfs.WriteCategoryUnspecified)
if err != nil {
log.Fatalln(err)
}
Expand Down
2 changes: 1 addition & 1 deletion commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func TestCommitPipelineWALClose(t *testing.T) {
// rotate and close the log.

mem := vfs.NewMem()
f, err := mem.Create("test-wal")
f, err := mem.Create("test-wal", vfs.WriteCategoryUnspecified)
require.NoError(t, err)

// syncDelayFile will block on the done channel befor returning from Sync
Expand Down
16 changes: 16 additions & 0 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -3128,9 +3128,25 @@ func (d *DB) runCompaction(
ctx = objiotracing.WithReason(ctx, objiotracing.ForCompaction)
}
}
var writeCategory vfs.DiskWriteCategory
switch c.kind {
case compactionKindFlush:
if d.opts.EnableSQLRowSpillMetrics {
// In the scenario that the Pebble engine is used for SQL row spills the data written to
// the memtable will correspond to spills to disk and should be categorized as such.
writeCategory = "sql-row-spill"
} else {
writeCategory = "pebble-memtable-flush"
}
case compactionKindIngestedFlushable:
writeCategory = "pebble-ingestion"
default:
writeCategory = "pebble-compaction"
}
// Prefer shared storage if present.
createOpts := objstorage.CreateOptions{
PreferSharedStorage: remote.ShouldCreateShared(d.opts.Experimental.CreateOnShared, c.outputLevel.level),
WriteCategory: writeCategory,
}
diskFileNum := base.PhysicalTableDiskFileNum(fileNum)
writable, objMeta, err := d.objProvider.Create(ctx, fileTypeTable, diskFileNum, createOpts)
Expand Down
6 changes: 3 additions & 3 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2746,7 +2746,7 @@ func TestCompactionErrorCleanup(t *testing.T) {

ingest := func(keys ...string) {
t.Helper()
f, err := mem.Create("ext")
f, err := mem.Create("ext", vfs.WriteCategoryUnspecified)
require.NoError(t, err)

w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
Expand Down Expand Up @@ -3566,7 +3566,7 @@ func TestCompaction_LogAndApplyFails(t *testing.T) {
ingestKeys := func(db *DB) error {
// Create an SST for ingestion.
const fName = "ext"
f, err := db.opts.FS.Create(fName)
f, err := db.opts.FS.Create(fName, vfs.WriteCategoryUnspecified)
require.NoError(t, err)
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{})
require.NoError(t, w.Set(key, nil))
Expand Down Expand Up @@ -3770,7 +3770,7 @@ func TestCompactionErrorStats(t *testing.T) {

ingest := func(keys ...string) {
t.Helper()
f, err := mem.Create("ext")
f, err := mem.Create("ext", vfs.WriteCategoryUnspecified)
require.NoError(t, err)

w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
Expand Down
2 changes: 1 addition & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ func runBuildCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {

writeOpts := d.opts.MakeWriterOptions(0 /* level */, tableFormat)

f, err := fs.Create(path)
f, err := fs.Create(path, vfs.WriteCategoryUnspecified)
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ func TestDBConcurrentCompactClose(t *testing.T) {
// causing compactions to be running concurrently with the close below.
for j := 0; j < 10; j++ {
path := fmt.Sprintf("ext%d", j)
f, err := mem.Create(path)
f, err := mem.Create(path, vfs.WriteCategoryUnspecified)
require.NoError(t, err)
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
TableFormat: d.FormatMajorVersion().MaxTableFormat(),
Expand Down Expand Up @@ -1616,7 +1616,7 @@ func TestMemtableIngestInversion(t *testing.T) {
// cc
{
path := "ingest1.sst"
f, err := memFS.Create(path)
f, err := memFS.Create(path, vfs.WriteCategoryUnspecified)
require.NoError(t, err)
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
TableFormat: d.FormatMajorVersion().MaxTableFormat(),
Expand All @@ -1627,7 +1627,7 @@ func TestMemtableIngestInversion(t *testing.T) {
}
{
path := "ingest2.sst"
f, err := memFS.Create(path)
f, err := memFS.Create(path, vfs.WriteCategoryUnspecified)
require.NoError(t, err)
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
TableFormat: d.FormatMajorVersion().MaxTableFormat(),
Expand All @@ -1639,7 +1639,7 @@ func TestMemtableIngestInversion(t *testing.T) {
}
{
path := "ingest3.sst"
f, err := memFS.Create(path)
f, err := memFS.Create(path, vfs.WriteCategoryUnspecified)
require.NoError(t, err)
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
TableFormat: d.FormatMajorVersion().MaxTableFormat(),
Expand All @@ -1650,7 +1650,7 @@ func TestMemtableIngestInversion(t *testing.T) {
}
{
path := "ingest4.sst"
f, err := memFS.Create(path)
f, err := memFS.Create(path, vfs.WriteCategoryUnspecified)
require.NoError(t, err)
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
TableFormat: d.FormatMajorVersion().MaxTableFormat(),
Expand Down Expand Up @@ -1729,7 +1729,7 @@ func TestMemtableIngestInversion(t *testing.T) {
// cc
{
path := "ingest5.sst"
f, err := memFS.Create(path)
f, err := memFS.Create(path, vfs.WriteCategoryUnspecified)
require.NoError(t, err)
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
TableFormat: d.FormatMajorVersion().MaxTableFormat(),
Expand Down Expand Up @@ -1763,7 +1763,7 @@ func TestMemtableIngestInversion(t *testing.T) {
// cc
{
path := "ingest6.sst"
f, err := memFS.Create(path)
f, err := memFS.Create(path, vfs.WriteCategoryUnspecified)
require.NoError(t, err)
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
TableFormat: d.FormatMajorVersion().MaxTableFormat(),
Expand Down
4 changes: 2 additions & 2 deletions event_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestEventListener(t *testing.T) {

case "ingest":
memLog.Reset()
f, err := mem.Create("ext/0")
f, err := mem.Create("ext/0", vfs.WriteCategoryUnspecified)
if err != nil {
return err.Error()
}
Expand Down Expand Up @@ -168,7 +168,7 @@ func TestEventListener(t *testing.T) {
return err.Error()
}
writeTable := func(name string, key byte) error {
f, err := mem.Create(name)
f, err := mem.Create(name, vfs.WriteCategoryUnspecified)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions external_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func TestIterRandomizedMaybeFilteredKeys(t *testing.T) {
keys := make([][]byte, 0, numKeys)

filename := fmt.Sprintf("test-%d", fileIdx)
f0, err := mem.Create(filename)
f0, err := mem.Create(filename, vfs.WriteCategoryUnspecified)
require.NoError(t, err)

indexBlockSize := 4096
Expand Down Expand Up @@ -324,7 +324,7 @@ func BenchmarkExternalIter_NonOverlapping_Scan(b *testing.B) {
var keys [][]byte
for i := 0; i < fileCount; i++ {
filename := fmt.Sprintf("%03d.sst", i)
wf, err := fs.Create(filename)
wf, err := fs.Create(filename, vfs.WriteCategoryUnspecified)
require.NoError(b, err)
w := sstable.NewWriter(objstorageprovider.NewFileWritable(wf), writeOpts)
for j := 0; j < keyCount/fileCount; j++ {
Expand Down
Loading

0 comments on commit e6e8b35

Please sign in to comment.