From 4560105d78d92c3a92f798d2870b6b93de26b53a Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Thu, 22 Feb 2024 14:48:19 -0500 Subject: [PATCH] tool: parse WAL files through wal.FileAccumulator in find Adjust the [find] debug tool to use wal.FileAccumulator to parse and accumulate physical log files. All parsing of WAL filenames should be delegated to the wal package which understands the physical log format. --- tool/find.go | 47 +++++++++++++++++++++++----------------------- tool/testdata/find | 6 +++--- wal/reader.go | 6 +++--- 3 files changed, 29 insertions(+), 30 deletions(-) diff --git a/tool/find.go b/tool/find.go index ac042646b3..9e083984c0 100644 --- a/tool/find.go +++ b/tool/find.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/pebble/internal/rangedel" "github.com/cockroachdb/pebble/record" "github.com/cockroachdb/pebble/sstable" + "github.com/cockroachdb/pebble/wal" "github.com/spf13/cobra" ) @@ -59,10 +60,7 @@ type findT struct { // List of version edits. edits []manifest.VersionEdit // List of WAL files sorted by disk file num. - // - // TODO(jackson): Support logical WALs that are split across multiple - // physical file segments. - logs []fileLoc + logs []wal.LogicalLog // List of manifest files sorted by disk file num. manifests []fileLoc // List of table files sorted by disk file num. @@ -180,16 +178,21 @@ func (f *findT) findFiles(stdout, stderr io.Writer, dir string) error { return err } + var walAccumulator wal.FileAccumulator walk(stderr, f.opts.FS, dir, func(path string) { - // TODO(sumeer): delegate FileTypeLog handling to wal package. + if isLogFile, err := walAccumulator.MaybeAccumulate(f.opts.FS, path); err != nil { + fmt.Fprintf(stderr, "%s: %v\n", path, err) + return + } else if isLogFile { + return + } + ft, fileNum, ok := base.ParseFilename(f.opts.FS, path) if !ok { return } fl := fileLoc{DiskFileNum: fileNum, path: path} switch ft { - case base.FileTypeLog: - f.logs = append(f.logs, fl) case base.FileTypeManifest: f.manifests = append(f.manifests, fl) case base.FileTypeTable: @@ -199,7 +202,10 @@ func (f *findT) findFiles(stdout, stderr io.Writer, dir string) error { } }) - slices.SortFunc(f.logs, cmpFileLoc) + // TODO(jackson): Provide a means of scanning the secondary WAL directory + // too. + + f.logs = walAccumulator.Finish() slices.SortFunc(f.manifests, cmpFileLoc) slices.SortFunc(f.tables, cmpFileLoc) @@ -309,17 +315,11 @@ func (f *findT) search(stdout io.Writer, key []byte) []findRef { // Search the logs for references to the specified key. func (f *findT) searchLogs(stdout io.Writer, searchKey []byte, refs []findRef) []findRef { cmp := f.opts.Comparer.Compare - for _, fl := range f.logs { + for _, ll := range f.logs { _ = func() (err error) { - lf, err := f.opts.FS.Open(fl.path) - if err != nil { - fmt.Fprintf(stdout, "%s\n", err) - return - } - defer lf.Close() - + rr := ll.OpenForRead() if f.verbose { - fmt.Fprintf(stdout, "%s", fl.path) + fmt.Fprintf(stdout, "%s", ll) defer fmt.Fprintf(stdout, "\n") } defer func() { @@ -337,7 +337,7 @@ func (f *findT) searchLogs(stdout io.Writer, searchKey []byte, refs []findRef) [ if f.verbose { fmt.Fprintf(stdout, ": %s", err) } else { - fmt.Fprintf(stdout, "%s: %s\n", fl.path, err) + fmt.Fprintf(stdout, "%s: %s\n", ll, err) } } } @@ -345,9 +345,8 @@ func (f *findT) searchLogs(stdout io.Writer, searchKey []byte, refs []findRef) [ var b pebble.Batch var buf bytes.Buffer - rr := record.NewReader(lf, fl.DiskFileNum) for { - r, err := rr.Next() + r, off, err := rr.NextRecord() if err == nil { buf.Reset() _, err = io.Copy(&buf, r) @@ -358,7 +357,7 @@ func (f *findT) searchLogs(stdout io.Writer, searchKey []byte, refs []findRef) [ b = pebble.Batch{} if err := b.SetRepr(buf.Bytes()); err != nil { - fmt.Fprintf(stdout, "%s: corrupt log file: %v", fl.path, err) + fmt.Fprintf(stdout, "%s: corrupt log file: %v", ll, err) continue } seqNum := b.SeqNum() @@ -366,7 +365,7 @@ func (f *findT) searchLogs(stdout io.Writer, searchKey []byte, refs []findRef) [ kind, ukey, value, ok, err := r.Next() if !ok { if err != nil { - fmt.Fprintf(stdout, "%s: corrupt log file: %v", fl.path, err) + fmt.Fprintf(stdout, "%s: corrupt log file: %v", ll, err) break } break @@ -395,8 +394,8 @@ func (f *findT) searchLogs(stdout io.Writer, searchKey []byte, refs []findRef) [ refs = append(refs, findRef{ key: ikey.Clone(), value: append([]byte(nil), value...), - fileNum: base.PhysicalTableFileNum(fl.DiskFileNum), - filename: filepath.Base(fl.path), + fileNum: base.FileNum(ll.Num), + filename: filepath.Base(off.PhysicalFile), }) } } diff --git a/tool/testdata/find b/tool/testdata/find index 7ae57ff9de..807482f57c 100644 --- a/tool/testdata/find +++ b/tool/testdata/find @@ -101,9 +101,9 @@ find-db 6 sstables find-db/MANIFEST-000001 9 edits -find-db/archive/000002.log -find-db/archive/000004.log -find-db/000009.log +000002: {(find-db/archive,000)} +000004: {(find-db/archive,000)} +000009: {(find-db,000)} find-db/archive/000005.sst find-db/archive/000006.sst: global seqnum: 15 find-db/archive/000007.sst: global seqnum: 16 diff --git a/wal/reader.go b/wal/reader.go index 689f6ec13e..be343dda8f 100644 --- a/wal/reader.go +++ b/wal/reader.go @@ -120,9 +120,9 @@ func (a *FileAccumulator) MaybeAccumulate(fs vfs.FS, path string) (isLogFile boo return a.maybeAccumulate(fs, dirname, filename) } -// Finish returns a sorted slice of LogicalLogs constructed from the physical -// files observed through MaybeAccumulate. -func (a *FileAccumulator) Finish() []LogicalLog { +// Finish returns a Logs constructed from the physical files observed through +// MaybeAccumulate. +func (a *FileAccumulator) Finish() Logs { wals := a.wals a.wals = nil return wals