diff --git a/checkpoint_test.go b/checkpoint_test.go index 357f405e69..34117716ea 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -26,10 +26,10 @@ func TestCheckpoint(t *testing.T) { } }() - var buf syncedBuffer mem := vfs.NewMem() + var memLog base.InMemLogger opts := &Options{ - FS: loggingFS{mem, &buf}, + FS: vfs.WithLogging(mem, memLog.Infof), FormatMajorVersion: FormatNewest, L0CompactionThreshold: 10, } @@ -40,7 +40,7 @@ func TestCheckpoint(t *testing.T) { if len(td.CmdArgs) != 1 { return "batch " } - buf.Reset() + memLog.Reset() d := dbs[td.CmdArgs[0].String()] b := d.NewBatch() if err := runBatchDefineCmd(td, b); err != nil { @@ -49,7 +49,7 @@ func TestCheckpoint(t *testing.T) { if err := b.Commit(Sync); err != nil { return err.Error() } - return buf.String() + return memLog.String() case "checkpoint": if !(len(td.CmdArgs) == 2 || (len(td.CmdArgs) == 3 && td.CmdArgs[2].Key == "restrict")) { @@ -70,34 +70,34 @@ func TestCheckpoint(t *testing.T) { } opts = append(opts, WithRestrictToSpans(spans)) } - buf.Reset() + memLog.Reset() d := dbs[td.CmdArgs[0].String()] if err := d.Checkpoint(td.CmdArgs[1].String(), opts...); err != nil { return err.Error() } - return buf.String() + return memLog.String() case "compact": if len(td.CmdArgs) != 1 { return "compact " } - buf.Reset() + memLog.Reset() d := dbs[td.CmdArgs[0].String()] if err := d.Compact(nil, []byte("\xff"), false); err != nil { return err.Error() } - return buf.String() + return memLog.String() case "flush": if len(td.CmdArgs) != 1 { return "flush " } - buf.Reset() + memLog.Reset() d := dbs[td.CmdArgs[0].String()] if err := d.Flush(); err != nil { return err.Error() } - return buf.String() + return memLog.String() case "list": if len(td.CmdArgs) != 1 { @@ -108,9 +108,7 @@ func TestCheckpoint(t *testing.T) { return err.Error() } sort.Strings(paths) - buf.Reset() - fmt.Fprintf(&buf, "%s\n", strings.Join(paths, "\n")) - return buf.String() + return fmt.Sprintf("%s\n", strings.Join(paths, "\n")) case "open": if len(td.CmdArgs) != 1 && len(td.CmdArgs) != 2 { @@ -124,30 +122,30 @@ func TestCheckpoint(t *testing.T) { opts.ReadOnly = true } - buf.Reset() + memLog.Reset() dir := td.CmdArgs[0].String() d, err := Open(dir, opts) if err != nil { return err.Error() } dbs[dir] = d - return buf.String() + return memLog.String() case "scan": if len(td.CmdArgs) != 1 { return "scan " } - buf.Reset() + memLog.Reset() d := dbs[td.CmdArgs[0].String()] iter := d.NewIter(nil) for valid := iter.First(); valid; valid = iter.Next() { - fmt.Fprintf(&buf, "%s %s\n", iter.Key(), iter.Value()) + memLog.Infof("%s %s", iter.Key(), iter.Value()) } - fmt.Fprintf(&buf, ".\n") + memLog.Infof(".") if err := iter.Close(); err != nil { - fmt.Fprintf(&buf, "%v\n", err) + memLog.Infof("%v\n", err) } - return buf.String() + return memLog.String() default: return fmt.Sprintf("unknown command: %s", td.Cmd) diff --git a/cleaner_test.go b/cleaner_test.go index 0dc915274f..78cc8e7600 100644 --- a/cleaner_test.go +++ b/cleaner_test.go @@ -11,6 +11,7 @@ import ( "testing" "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/vfs" "github.com/stretchr/testify/require" ) @@ -23,11 +24,11 @@ func TestArchiveCleaner(t *testing.T) { } }() - var buf syncedBuffer mem := vfs.NewMem() + var memLog base.InMemLogger opts := (&Options{ Cleaner: ArchiveCleaner{}, - FS: loggingFS{mem, &buf}, + FS: vfs.WithLogging(mem, memLog.Infof), WALDir: "wal", }).WithFSDefaults() @@ -37,7 +38,7 @@ func TestArchiveCleaner(t *testing.T) { if len(td.CmdArgs) != 1 { return "batch " } - buf.Reset() + memLog.Reset() d := dbs[td.CmdArgs[0].String()] b := d.NewBatch() if err := runBatchDefineCmd(td, b); err != nil { @@ -46,29 +47,29 @@ func TestArchiveCleaner(t *testing.T) { if err := b.Commit(Sync); err != nil { return err.Error() } - return buf.String() + return memLog.String() case "compact": if len(td.CmdArgs) != 1 { return "compact " } - buf.Reset() + memLog.Reset() d := dbs[td.CmdArgs[0].String()] if err := d.Compact(nil, []byte("\xff"), false); err != nil { return err.Error() } - return buf.String() + return memLog.String() case "flush": if len(td.CmdArgs) != 1 { return "flush " } - buf.Reset() + memLog.Reset() d := dbs[td.CmdArgs[0].String()] if err := d.Flush(); err != nil { return err.Error() } - return buf.String() + return memLog.String() case "list": if len(td.CmdArgs) != 1 { @@ -79,9 +80,7 @@ func TestArchiveCleaner(t *testing.T) { return err.Error() } sort.Strings(paths) - buf.Reset() - fmt.Fprintf(&buf, "%s\n", strings.Join(paths, "\n")) - return buf.String() + return fmt.Sprintf("%s\n", strings.Join(paths, "\n")) case "open": if len(td.CmdArgs) != 1 && len(td.CmdArgs) != 2 { @@ -95,14 +94,14 @@ func TestArchiveCleaner(t *testing.T) { opts.ReadOnly = true } - buf.Reset() + memLog.Reset() dir := td.CmdArgs[0].String() d, err := Open(dir, opts) if err != nil { return err.Error() } dbs[dir] = d - return buf.String() + return memLog.String() default: return fmt.Sprintf("unknown command: %s", td.Cmd) diff --git a/event_listener_test.go b/event_listener_test.go index faa8e50675..c97084a1a3 100644 --- a/event_listener_test.go +++ b/event_listener_test.go @@ -7,10 +7,7 @@ package pebble import ( "bytes" "fmt" - "io" - "os" "reflect" - "runtime" "strings" "sync" "testing" @@ -25,136 +22,20 @@ import ( "github.com/stretchr/testify/require" ) -type syncedBuffer struct { - mu sync.Mutex - buf bytes.Buffer -} - -func (b *syncedBuffer) Reset() { - b.mu.Lock() - defer b.mu.Unlock() - b.buf.Reset() -} - -func (b *syncedBuffer) Write(p []byte) (n int, err error) { - b.mu.Lock() - defer b.mu.Unlock() - return b.buf.Write(p) -} - -func (b *syncedBuffer) Infof(format string, args ...interface{}) { - s := fmt.Sprintf(format, args...) - b.mu.Lock() - defer b.mu.Unlock() - b.buf.Write([]byte(s)) - if n := len(s); n == 0 || s[n-1] != '\n' { - b.buf.Write([]byte("\n")) - } -} - -func (b *syncedBuffer) Fatalf(format string, args ...interface{}) { - b.Infof(format, args...) - runtime.Goexit() -} - -func (b *syncedBuffer) String() string { - b.mu.Lock() - defer b.mu.Unlock() - return b.buf.String() -} - -type loggingFS struct { - vfs.FS - w io.Writer -} - -func (fs loggingFS) Create(name string) (vfs.File, error) { - fmt.Fprintf(fs.w, "create: %s\n", name) - f, err := fs.FS.Create(name) - if err != nil { - return nil, err - } - return loggingFile{f, name, fs.w}, nil -} - -func (fs loggingFS) Link(oldname, newname string) error { - fmt.Fprintf(fs.w, "link: %s -> %s\n", oldname, newname) - return fs.FS.Link(oldname, newname) -} - -func (fs loggingFS) OpenDir(name string) (vfs.File, error) { - fmt.Fprintf(fs.w, "open-dir: %s\n", name) - f, err := fs.FS.OpenDir(name) - if err != nil { - return nil, err - } - return loggingFile{f, name, fs.w}, nil -} - -func (fs loggingFS) Rename(oldname, newname string) error { - fmt.Fprintf(fs.w, "rename: %s -> %s\n", oldname, newname) - return fs.FS.Rename(oldname, newname) -} - -func (fs loggingFS) ReuseForWrite(oldname, newname string) (vfs.File, error) { - fmt.Fprintf(fs.w, "reuseForWrite: %s -> %s\n", oldname, newname) - f, err := fs.FS.ReuseForWrite(oldname, newname) - if err == nil { - f = loggingFile{f, newname, fs.w} - } - return f, err -} - -func (fs loggingFS) MkdirAll(dir string, perm os.FileMode) error { - fmt.Fprintf(fs.w, "mkdir-all: %s %#o\n", dir, perm) - return fs.FS.MkdirAll(dir, perm) -} - -func (fs loggingFS) Lock(name string) (io.Closer, error) { - fmt.Fprintf(fs.w, "lock: %s\n", name) - return fs.FS.Lock(name) -} - -type loggingFile struct { - vfs.File - name string - w io.Writer -} - -func (f loggingFile) Close() error { - fmt.Fprintf(f.w, "close: %s\n", f.name) - return f.File.Close() -} - -func (f loggingFile) Sync() error { - fmt.Fprintf(f.w, "sync: %s\n", f.name) - return f.File.Sync() -} - -func (f loggingFile) SyncData() error { - fmt.Fprintf(f.w, "sync-data: %s\n", f.name) - return f.File.SyncData() -} - -func (f loggingFile) SyncTo(length int64) (fullSync bool, err error) { - fmt.Fprintf(f.w, "sync-to(%d): %s\n", length, f.name) - return f.File.SyncTo(length) -} - // Verify event listener actions, as well as expected filesystem operations. func TestEventListener(t *testing.T) { var d *DB - var buf syncedBuffer + var memLog base.InMemLogger mem := vfs.NewMem() require.NoError(t, mem.MkdirAll("ext", 0755)) datadriven.RunTest(t, "testdata/event_listener", func(t *testing.T, td *datadriven.TestData) string { switch td.Cmd { case "open": - buf.Reset() - lel := MakeLoggingEventListener(&buf) + memLog.Reset() + lel := MakeLoggingEventListener(&memLog) opts := &Options{ - FS: loggingFS{mem, &buf}, + FS: vfs.WithLogging(mem, memLog.Infof), FormatMajorVersion: FormatNewest, EventListener: &lel, MaxManifestFileSize: 1, @@ -176,65 +57,65 @@ func TestEventListener(t *testing.T) { t = t.Add(time.Second) return t } - return buf.String() + return memLog.String() case "close": - buf.Reset() + memLog.Reset() if err := d.Close(); err != nil { return err.Error() } - return buf.String() + return memLog.String() case "flush": - buf.Reset() + memLog.Reset() if err := d.Set([]byte("a"), nil, nil); err != nil { return err.Error() } if err := d.Flush(); err != nil { return err.Error() } - return buf.String() + return memLog.String() case "compact": - buf.Reset() + memLog.Reset() if err := d.Set([]byte("a"), nil, nil); err != nil { return err.Error() } if err := d.Compact([]byte("a"), []byte("b"), false); err != nil { return err.Error() } - return buf.String() + return memLog.String() case "checkpoint": - buf.Reset() + memLog.Reset() if err := d.Checkpoint("checkpoint"); err != nil { return err.Error() } - return buf.String() + return memLog.String() case "disable-file-deletions": - buf.Reset() + memLog.Reset() d.mu.Lock() d.disableFileDeletions() d.mu.Unlock() - return buf.String() + return memLog.String() case "enable-file-deletions": - buf.Reset() + memLog.Reset() func() { defer func() { if r := recover(); r != nil { - fmt.Fprint(&buf, r) + memLog.Infof("%v", r) } }() d.mu.Lock() defer d.mu.Unlock() d.enableFileDeletions() }() - return buf.String() + return memLog.String() case "ingest": - buf.Reset() + memLog.Reset() f, err := mem.Create("ext/0") if err != nil { return err.Error() @@ -251,7 +132,7 @@ func TestEventListener(t *testing.T) { if err := d.Ingest([]string{"ext/0"}); err != nil { return err.Error() } - return buf.String() + return memLog.String() case "metrics": // The asynchronous loading of table stats can change metrics, so @@ -299,7 +180,7 @@ func TestWriteStallEvents(t *testing.T) { t.Run("", func(t *testing.T) { stallEnded := make(chan struct{}, 1) createReleased := make(chan struct{}, flushCount) - var buf syncedBuffer + var log base.InMemLogger var delayOnce sync.Once listener := &EventListener{ TableCreated: func(info TableCreateInfo) { @@ -310,11 +191,11 @@ func TestWriteStallEvents(t *testing.T) { } }, WriteStallBegin: func(info WriteStallBeginInfo) { - fmt.Fprintln(&buf, info.String()) + log.Infof("%s", info.String()) createReleased <- struct{}{} }, WriteStallEnd: func() { - fmt.Fprintln(&buf, writeStallEnd) + log.Infof("%s", writeStallEnd) select { case stallEnded <- struct{}{}: default: @@ -347,13 +228,13 @@ func TestWriteStallEvents(t *testing.T) { if !c.delayFlush { <-ch } - if strings.Contains(buf.String(), c.expected) { + if strings.Contains(log.String(), c.expected) { break } } <-stallEnded - events := buf.String() + events := log.String() require.Contains(t, events, c.expected) require.Contains(t, events, writeStallEnd) if testing.Verbose() { @@ -381,7 +262,7 @@ func TestEventListenerRedact(t *testing.T) { // The vast majority of event listener fields logged are safe and do not // need to be redacted. Verify that the rare, unsafe error does appear in // the log redacted. - var log syncedBuffer + var log base.InMemLogger l := MakeLoggingEventListener(redactLogger{logger: &log}) l.WALDeleted(WALDeleteInfo{ JobID: 5, diff --git a/ingest_test.go b/ingest_test.go index c1fbed6be6..f89a1ff563 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -996,9 +996,8 @@ func TestIngestIdempotence(t *testing.T) { } func TestIngestCompact(t *testing.T) { - var buf syncedBuffer mem := vfs.NewMem() - lel := MakeLoggingEventListener(&buf) + lel := MakeLoggingEventListener(&base.InMemLogger{}) d, err := Open("", &Options{ EventListener: &lel, FS: mem, diff --git a/internal/base/logger.go b/internal/base/logger.go index 3f539b8c8b..62b403634b 100644 --- a/internal/base/logger.go +++ b/internal/base/logger.go @@ -5,9 +5,12 @@ package base import ( + "bytes" "fmt" "log" "os" + "runtime" + "sync" ) // Logger defines an interface for writing log messages. @@ -32,3 +35,45 @@ func (defaultLogger) Fatalf(format string, args ...interface{}) { _ = log.Output(2, fmt.Sprintf(format, args...)) os.Exit(1) } + +// InMemLogger implements Logger using an in-memory buffer (used for testing). +// The buffer can be read via String() and cleared via Reset(). +type InMemLogger struct { + mu struct { + sync.Mutex + buf bytes.Buffer + } +} + +var _ Logger = (*InMemLogger)(nil) + +// Reset clears the internal buffer. +func (b *InMemLogger) Reset() { + b.mu.Lock() + defer b.mu.Unlock() + b.mu.buf.Reset() +} + +// String returns the current internal buffer. +func (b *InMemLogger) String() string { + b.mu.Lock() + defer b.mu.Unlock() + return b.mu.buf.String() +} + +// Infof is part of the Logger interface. +func (b *InMemLogger) Infof(format string, args ...interface{}) { + s := fmt.Sprintf(format, args...) + b.mu.Lock() + defer b.mu.Unlock() + b.mu.buf.Write([]byte(s)) + if n := len(s); n == 0 || s[n-1] != '\n' { + b.mu.buf.Write([]byte("\n")) + } +} + +// Fatalf is part of the Logger interface. +func (b *InMemLogger) Fatalf(format string, args ...interface{}) { + b.Infof(format, args...) + runtime.Goexit() +} diff --git a/open_test.go b/open_test.go index da9118dcf3..d5ee87b651 100644 --- a/open_test.go +++ b/open_test.go @@ -369,9 +369,9 @@ func TestOpenReadOnly(t *testing.T) { { // Opening a non-existent DB in read-only mode should result in no mutable // filesystem operations. - var buf syncedBuffer + var memLog base.InMemLogger _, err := Open("non-existent", testingRandomized(&Options{ - FS: loggingFS{mem, &buf}, + FS: vfs.WithLogging(mem, memLog.Infof), ReadOnly: true, WALDir: "non-existent-waldir", })) @@ -379,7 +379,7 @@ func TestOpenReadOnly(t *testing.T) { t.Fatalf("expected error, but found success") } const expected = `open-dir: non-existent` - if trimmed := strings.TrimSpace(buf.String()); expected != trimmed { + if trimmed := strings.TrimSpace(memLog.String()); expected != trimmed { t.Fatalf("expected %q, but found %q", expected, trimmed) } } @@ -387,9 +387,9 @@ func TestOpenReadOnly(t *testing.T) { { // Opening a DB with a non-existent WAL dir in read-only mode should result // in no mutable filesystem operations other than the LOCK. - var buf syncedBuffer + var memLog base.InMemLogger _, err := Open("", testingRandomized(&Options{ - FS: loggingFS{mem, &buf}, + FS: vfs.WithLogging(mem, memLog.Infof), ReadOnly: true, WALDir: "non-existent-waldir", })) @@ -397,7 +397,7 @@ func TestOpenReadOnly(t *testing.T) { t.Fatalf("expected error, but found success") } const expected = "open-dir: \nopen-dir: non-existent-waldir\nclose:" - if trimmed := strings.TrimSpace(buf.String()); expected != trimmed { + if trimmed := strings.TrimSpace(memLog.String()); expected != trimmed { t.Fatalf("expected %q, but found %q", expected, trimmed) } } diff --git a/testdata/checkpoint b/testdata/checkpoint index 0bce607650..8d807ae8fd 100644 --- a/testdata/checkpoint +++ b/testdata/checkpoint @@ -7,6 +7,7 @@ open-dir: db open-dir: db create: db/MANIFEST-000001 sync: db/MANIFEST-000001 +remove: db/temporary.000001.dbtmp create: db/temporary.000001.dbtmp sync: db/temporary.000001.dbtmp close: db/temporary.000001.dbtmp @@ -21,39 +22,50 @@ sync: db create: db/marker.format-version.000001.002 close: db/marker.format-version.000001.002 sync: db +remove: db/temporary.000000.dbtmp create: db/temporary.000000.dbtmp sync: db/temporary.000000.dbtmp close: db/temporary.000000.dbtmp rename: db/temporary.000000.dbtmp -> db/CURRENT create: db/marker.format-version.000002.003 close: db/marker.format-version.000002.003 +remove: db/marker.format-version.000001.002 sync: db create: db/marker.format-version.000003.004 close: db/marker.format-version.000003.004 +remove: db/marker.format-version.000002.003 sync: db create: db/marker.format-version.000004.005 close: db/marker.format-version.000004.005 +remove: db/marker.format-version.000003.004 sync: db create: db/marker.format-version.000005.006 close: db/marker.format-version.000005.006 +remove: db/marker.format-version.000004.005 sync: db create: db/marker.format-version.000006.007 close: db/marker.format-version.000006.007 +remove: db/marker.format-version.000005.006 sync: db create: db/marker.format-version.000007.008 close: db/marker.format-version.000007.008 +remove: db/marker.format-version.000006.007 sync: db create: db/marker.format-version.000008.009 close: db/marker.format-version.000008.009 +remove: db/marker.format-version.000007.008 sync: db create: db/marker.format-version.000009.010 close: db/marker.format-version.000009.010 +remove: db/marker.format-version.000008.009 sync: db create: db/marker.format-version.000010.011 close: db/marker.format-version.000010.011 +remove: db/marker.format-version.000009.010 sync: db create: db/marker.format-version.000011.012 close: db/marker.format-version.000011.012 +remove: db/marker.format-version.000010.011 sync: db create: db/temporary.000003.dbtmp sync: db/temporary.000003.dbtmp @@ -222,6 +234,9 @@ sync-data: db/000010.sst close: db/000010.sst sync: db sync: db/MANIFEST-000001 +remove: db/000005.sst +remove: db/000007.sst +remove: db/000009.sst batch db set h 11 diff --git a/testdata/cleaner b/testdata/cleaner index 9c8f9ac289..dff9a9122a 100644 --- a/testdata/cleaner +++ b/testdata/cleaner @@ -9,6 +9,7 @@ open-dir: db open-dir: db create: db/MANIFEST-000001 sync: db/MANIFEST-000001 +remove: db/temporary.000001.dbtmp create: db/temporary.000001.dbtmp sync: db/temporary.000001.dbtmp close: db/temporary.000001.dbtmp diff --git a/testdata/event_listener b/testdata/event_listener index b4eca8197a..88548a9a6e 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -9,6 +9,7 @@ open-dir: db open-dir: db create: db/MANIFEST-000001 sync: db/MANIFEST-000001 +remove: db/temporary.000001.dbtmp create: db/temporary.000001.dbtmp sync: db/temporary.000001.dbtmp close: db/temporary.000001.dbtmp @@ -26,48 +27,59 @@ create: db/marker.format-version.000001.002 close: db/marker.format-version.000001.002 sync: db upgraded to format version: 002 +remove: db/temporary.000000.dbtmp create: db/temporary.000000.dbtmp sync: db/temporary.000000.dbtmp close: db/temporary.000000.dbtmp rename: db/temporary.000000.dbtmp -> db/CURRENT create: db/marker.format-version.000002.003 close: db/marker.format-version.000002.003 +remove: db/marker.format-version.000001.002 sync: db upgraded to format version: 003 create: db/marker.format-version.000003.004 close: db/marker.format-version.000003.004 +remove: db/marker.format-version.000002.003 sync: db upgraded to format version: 004 create: db/marker.format-version.000004.005 close: db/marker.format-version.000004.005 +remove: db/marker.format-version.000003.004 sync: db upgraded to format version: 005 create: db/marker.format-version.000005.006 close: db/marker.format-version.000005.006 +remove: db/marker.format-version.000004.005 sync: db upgraded to format version: 006 create: db/marker.format-version.000006.007 close: db/marker.format-version.000006.007 +remove: db/marker.format-version.000005.006 sync: db upgraded to format version: 007 create: db/marker.format-version.000007.008 close: db/marker.format-version.000007.008 +remove: db/marker.format-version.000006.007 sync: db upgraded to format version: 008 create: db/marker.format-version.000008.009 close: db/marker.format-version.000008.009 +remove: db/marker.format-version.000007.008 sync: db upgraded to format version: 009 create: db/marker.format-version.000009.010 close: db/marker.format-version.000009.010 +remove: db/marker.format-version.000008.009 sync: db upgraded to format version: 010 create: db/marker.format-version.000010.011 close: db/marker.format-version.000010.011 +remove: db/marker.format-version.000009.010 sync: db upgraded to format version: 011 create: db/marker.format-version.000011.012 close: db/marker.format-version.000011.012 +remove: db/marker.format-version.000010.011 sync: db upgraded to format version: 012 create: db/temporary.000003.dbtmp @@ -95,6 +107,7 @@ close: db/MANIFEST-000001 sync: db/MANIFEST-000006 create: db/marker.manifest.000002.MANIFEST-000006 close: db/marker.manifest.000002.MANIFEST-000006 +remove: db/marker.manifest.000001.MANIFEST-000001 sync: db [JOB 5] MANIFEST created 000006 [JOB 5] flushed 1 memtable to L0 [000005] (770 B), in 1.0s (2.0s total), output rate 770 B/s @@ -118,9 +131,11 @@ close: db/MANIFEST-000006 sync: db/MANIFEST-000009 create: db/marker.manifest.000003.MANIFEST-000009 close: db/marker.manifest.000003.MANIFEST-000009 +remove: db/marker.manifest.000002.MANIFEST-000006 sync: db [JOB 7] MANIFEST created 000009 [JOB 7] flushed 1 memtable to L0 [000008] (770 B), in 1.0s (2.0s total), output rate 770 B/s +remove: db/MANIFEST-000001 [JOB 7] MANIFEST deleted 000001 [JOB 8] compacting(default) L0 [000005 000008] (1.5 K) + L6 [] (0 B) create: db/000010.sst @@ -133,11 +148,15 @@ close: db/MANIFEST-000009 sync: db/MANIFEST-000011 create: db/marker.manifest.000004.MANIFEST-000011 close: db/marker.manifest.000004.MANIFEST-000011 +remove: db/marker.manifest.000003.MANIFEST-000009 sync: db [JOB 8] MANIFEST created 000011 [JOB 8] compacted(default) L0 [000005 000008] (1.5 K) + L6 [] (0 B) -> L6 [000010] (770 B), in 1.0s (2.0s total), output rate 770 B/s +remove: db/000005.sst [JOB 8] sstable deleted 000005 +remove: db/000008.sst [JOB 8] sstable deleted 000008 +remove: db/MANIFEST-000006 [JOB 8] MANIFEST deleted 000006 disable-file-deletions @@ -162,12 +181,14 @@ close: db/MANIFEST-000011 sync: db/MANIFEST-000014 create: db/marker.manifest.000005.MANIFEST-000014 close: db/marker.manifest.000005.MANIFEST-000014 +remove: db/marker.manifest.000004.MANIFEST-000011 sync: db [JOB 10] MANIFEST created 000014 [JOB 10] flushed 1 memtable to L0 [000013] (770 B), in 1.0s (2.0s total), output rate 770 B/s enable-file-deletions ---- +remove: db/MANIFEST-000009 [JOB 11] MANIFEST deleted 000009 ingest @@ -180,9 +201,12 @@ close: db/MANIFEST-000014 sync: db/MANIFEST-000016 create: db/marker.manifest.000006.MANIFEST-000016 close: db/marker.manifest.000006.MANIFEST-000016 +remove: db/marker.manifest.000005.MANIFEST-000014 sync: db [JOB 12] MANIFEST created 000016 +remove: db/MANIFEST-000011 [JOB 12] MANIFEST deleted 000011 +remove: ext/0 [JOB 12] ingested L0:000015 (826 B) metrics diff --git a/vfs/logging_fs.go b/vfs/logging_fs.go new file mode 100644 index 0000000000..ee0c3e281c --- /dev/null +++ b/vfs/logging_fs.go @@ -0,0 +1,124 @@ +// Copyright 2021 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package vfs + +import ( + "io" + "os" +) + +// WithLogging wraps an FS and logs filesystem modification operations to the +// given logFn. +func WithLogging(fs FS, logFn LogFn) FS { + return &loggingFS{ + FS: fs, + logFn: logFn, + } +} + +// LogFn is a function that is used to capture a log when WithLogging is used. +type LogFn func(fmt string, args ...interface{}) + +type loggingFS struct { + FS + logFn LogFn +} + +var _ FS = (*loggingFS)(nil) + +func (fs *loggingFS) Create(name string) (File, error) { + fs.logFn("create: %s", name) + f, err := fs.FS.Create(name) + if err != nil { + return nil, err + } + return newLoggingFile(f, name, fs.logFn), nil +} + +func (fs *loggingFS) Link(oldname, newname string) error { + fs.logFn("link: %s -> %s", oldname, newname) + return fs.FS.Link(oldname, newname) +} + +func (fs *loggingFS) OpenDir(name string) (File, error) { + fs.logFn("open-dir: %s", name) + f, err := fs.FS.OpenDir(name) + if err != nil { + return nil, err + } + return newLoggingFile(f, name, fs.logFn), nil +} + +func (fs *loggingFS) Rename(oldname, newname string) error { + fs.logFn("rename: %s -> %s", oldname, newname) + return fs.FS.Rename(oldname, newname) +} + +func (fs *loggingFS) ReuseForWrite(oldname, newname string) (File, error) { + fs.logFn("reuseForWrite: %s -> %s", oldname, newname) + f, err := fs.FS.ReuseForWrite(oldname, newname) + if err != nil { + return nil, err + } + return newLoggingFile(f, newname, fs.logFn), nil +} + +func (fs *loggingFS) MkdirAll(dir string, perm os.FileMode) error { + fs.logFn("mkdir-all: %s %#o", dir, perm) + return fs.FS.MkdirAll(dir, perm) +} + +func (fs *loggingFS) Lock(name string) (io.Closer, error) { + fs.logFn("lock: %s", name) + return fs.FS.Lock(name) +} + +func (fs loggingFS) Remove(name string) error { + fs.logFn("remove: %s", name) + err := fs.FS.Remove(name) + return err +} + +func (fs loggingFS) RemoveAll(name string) error { + fs.logFn("remove-all: %s", name) + err := fs.FS.RemoveAll(name) + return err +} + +type loggingFile struct { + File + name string + logFn LogFn +} + +var _ File = (*loggingFile)(nil) + +func newLoggingFile(f File, name string, logFn LogFn) *loggingFile { + return &loggingFile{ + File: f, + name: name, + logFn: logFn, + } +} + +func (f *loggingFile) Close() error { + f.logFn("close: %s", f.name) + return f.File.Close() +} + +func (f *loggingFile) Sync() error { + f.logFn("sync: %s", f.name) + return f.File.Sync() +} + +func (f *loggingFile) SyncData() error { + f.logFn("sync-data: %s", f.name) + return f.File.SyncData() +} + +func (f *loggingFile) SyncTo(length int64) (fullSync bool, err error) { + f.logFn("sync-to(%d): %s", length, f.name) + return f.File.SyncTo(length) +} diff --git a/vfs/syncing_file_test.go b/vfs/syncing_file_test.go index 7173e9143c..14b449eaa3 100644 --- a/vfs/syncing_file_test.go +++ b/vfs/syncing_file_test.go @@ -86,7 +86,7 @@ close: test [] var buf bytes.Buffer tf := &mockSyncToFile{File: f, canSyncTo: c.canSyncTo} - lf := loggingFile{tf, "test", &buf} + lf := &vfsTestFSFile{tf, "test", &buf} s := NewSyncingFile(lf, SyncingFileOptions{BytesPerSync: 8 << 10 /* 8 KB */}).(*syncingFile) write := func(n int64) { @@ -100,7 +100,7 @@ close: test [] write(mb) write(mb) - fmt.Fprintf(lf.w, "pre-close: %s [offset=%d sync-offset=%d]\n", + fmt.Fprintf(&buf, "pre-close: %s [offset=%d sync-offset=%d]\n", lf.name, atomic.LoadInt64(&s.atomic.offset), atomic.LoadInt64(&s.atomic.syncOffset)) require.NoError(t, s.Close()) @@ -153,10 +153,8 @@ func TestSyncingFileNoSyncOnClose(t *testing.T) { f, err := Default.Create(filename) require.NoError(t, err) - var buf bytes.Buffer tf := &mockSyncToFile{f, c.useSyncTo} - lf := loggingFile{tf, "test", &buf} - s := NewSyncingFile(lf, SyncingFileOptions{NoSyncOnClose: true, BytesPerSync: 8 << 10}).(*syncingFile) + s := NewSyncingFile(tf, SyncingFileOptions{NoSyncOnClose: true, BytesPerSync: 8 << 10}).(*syncingFile) write := func(n int64) { t.Helper() diff --git a/vfs/vfs_test.go b/vfs/vfs_test.go index 0b1289a0d3..5af4f53059 100644 --- a/vfs/vfs_test.go +++ b/vfs/vfs_test.go @@ -36,27 +36,30 @@ func normalizeError(err error) error { return err } -type loggingFS struct { +// vfsTestFS is similar to loggingFS but is more specific to the vfs test. It +// logs more operations and logs return values and errors. +// It also supports injecting an error on Link. +type vfsTestFS struct { FS base string w io.Writer linkErr error } -func (fs loggingFS) stripBase(path string) string { +func (fs vfsTestFS) stripBase(path string) string { if strings.HasPrefix(path, fs.base+"/") { return path[len(fs.base)+1:] } return path } -func (fs loggingFS) Create(name string) (File, error) { +func (fs vfsTestFS) Create(name string) (File, error) { f, err := fs.FS.Create(name) fmt.Fprintf(fs.w, "create: %s [%v]\n", fs.stripBase(name), normalizeError(err)) - return loggingFile{f, fs.PathBase(name), fs.w}, err + return vfsTestFSFile{f, fs.PathBase(name), fs.w}, err } -func (fs loggingFS) Link(oldname, newname string) error { +func (fs vfsTestFS) Link(oldname, newname string) error { err := fs.linkErr if err == nil { err = fs.FS.Link(oldname, newname) @@ -66,71 +69,71 @@ func (fs loggingFS) Link(oldname, newname string) error { return err } -func (fs loggingFS) ReuseForWrite(oldname, newname string) (File, error) { +func (fs vfsTestFS) ReuseForWrite(oldname, newname string) (File, error) { f, err := fs.FS.ReuseForWrite(oldname, newname) if err == nil { - f = loggingFile{f, fs.PathBase(newname), fs.w} + f = vfsTestFSFile{f, fs.PathBase(newname), fs.w} } fmt.Fprintf(fs.w, "reuseForWrite: %s -> %s [%v]\n", fs.stripBase(oldname), fs.stripBase(newname), normalizeError(err)) return f, err } -func (fs loggingFS) MkdirAll(dir string, perm os.FileMode) error { +func (fs vfsTestFS) MkdirAll(dir string, perm os.FileMode) error { err := fs.FS.MkdirAll(dir, perm) fmt.Fprintf(fs.w, "mkdir: %s [%v]\n", fs.stripBase(dir), normalizeError(err)) return err } -func (fs loggingFS) Open(name string, opts ...OpenOption) (File, error) { +func (fs vfsTestFS) Open(name string, opts ...OpenOption) (File, error) { f, err := fs.FS.Open(name, opts...) fmt.Fprintf(fs.w, "open: %s [%v]\n", fs.stripBase(name), normalizeError(err)) - return loggingFile{f, fs.stripBase(name), fs.w}, err + return vfsTestFSFile{f, fs.stripBase(name), fs.w}, err } -func (fs loggingFS) Remove(name string) error { +func (fs vfsTestFS) Remove(name string) error { err := fs.FS.Remove(name) fmt.Fprintf(fs.w, "remove: %s [%v]\n", fs.stripBase(name), normalizeError(err)) return err } -func (fs loggingFS) RemoveAll(name string) error { +func (fs vfsTestFS) RemoveAll(name string) error { err := fs.FS.RemoveAll(name) fmt.Fprintf(fs.w, "remove-all: %s [%v]\n", fs.stripBase(name), normalizeError(err)) return err } -type loggingFile struct { +type vfsTestFSFile struct { File name string w io.Writer } -func (f loggingFile) Close() error { +func (f vfsTestFSFile) Close() error { err := f.File.Close() fmt.Fprintf(f.w, "close: %s [%v]\n", f.name, err) return err } -func (f loggingFile) Preallocate(off, n int64) error { +func (f vfsTestFSFile) Preallocate(off, n int64) error { err := f.File.Preallocate(off, n) fmt.Fprintf(f.w, "preallocate(off=%d,n=%d): %s [%v]\n", off, n, f.name, err) return err } -func (f loggingFile) Sync() error { +func (f vfsTestFSFile) Sync() error { err := f.File.Sync() fmt.Fprintf(f.w, "sync: %s [%v]\n", f.name, err) return err } -func (f loggingFile) SyncData() error { +func (f vfsTestFSFile) SyncData() error { err := f.File.SyncData() fmt.Fprintf(f.w, "sync-data: %s [%v]\n", f.name, err) return err } -func (f loggingFile) SyncTo(length int64) (fullSync bool, err error) { +func (f vfsTestFSFile) SyncTo(length int64) (fullSync bool, err error) { fullSync, err = f.File.SyncTo(length) fmt.Fprintf(f.w, "sync-to(%d): %s [%t,%v]\n", length, f.name, fullSync, err) return fullSync, err @@ -138,7 +141,7 @@ func (f loggingFile) SyncTo(length int64) (fullSync bool, err error) { func runTestVFS(t *testing.T, baseFS FS, dir string) { var buf bytes.Buffer - fs := loggingFS{FS: baseFS, base: dir, w: &buf} + fs := vfsTestFS{FS: baseFS, base: dir, w: &buf} datadriven.RunTest(t, "testdata/vfs", func(t *testing.T, td *datadriven.TestData) string { switch td.Cmd { @@ -182,9 +185,9 @@ func runTestVFS(t *testing.T, baseFS FS, dir string) { for _, p := range parts[3:] { switch p { case "disk": - dstFS = loggingFS{FS: Default, base: dir, w: &buf} + dstFS = vfsTestFS{FS: Default, base: dir, w: &buf} case "mem": - dstFS = loggingFS{FS: NewMem(), base: dir, w: &buf} + dstFS = vfsTestFS{FS: NewMem(), base: dir, w: &buf} case "link": opts = append(opts, CloneTryLink) case "sync":