Skip to content

Commit

Permalink
tool: parse WAL files through wal.FileAccumulator in find
Browse files Browse the repository at this point in the history
Adjust the [find] debug tool to use wal.FileAccumulator to parse and accumulate
physical log files. All parsing of WAL filenames should be delegated to the wal
package which understands the physical log format.
  • Loading branch information
jbowens committed Feb 23, 2024
1 parent e3e1c8d commit 4560105
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 30 deletions.
47 changes: 23 additions & 24 deletions tool/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/pebble/internal/rangedel"
"github.com/cockroachdb/pebble/record"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/wal"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -59,10 +60,7 @@ type findT struct {
// List of version edits.
edits []manifest.VersionEdit
// List of WAL files sorted by disk file num.
//
// TODO(jackson): Support logical WALs that are split across multiple
// physical file segments.
logs []fileLoc
logs []wal.LogicalLog
// List of manifest files sorted by disk file num.
manifests []fileLoc
// List of table files sorted by disk file num.
Expand Down Expand Up @@ -180,16 +178,21 @@ func (f *findT) findFiles(stdout, stderr io.Writer, dir string) error {
return err
}

var walAccumulator wal.FileAccumulator
walk(stderr, f.opts.FS, dir, func(path string) {
// TODO(sumeer): delegate FileTypeLog handling to wal package.
if isLogFile, err := walAccumulator.MaybeAccumulate(f.opts.FS, path); err != nil {
fmt.Fprintf(stderr, "%s: %v\n", path, err)
return
} else if isLogFile {
return
}

ft, fileNum, ok := base.ParseFilename(f.opts.FS, path)
if !ok {
return
}
fl := fileLoc{DiskFileNum: fileNum, path: path}
switch ft {
case base.FileTypeLog:
f.logs = append(f.logs, fl)
case base.FileTypeManifest:
f.manifests = append(f.manifests, fl)
case base.FileTypeTable:
Expand All @@ -199,7 +202,10 @@ func (f *findT) findFiles(stdout, stderr io.Writer, dir string) error {
}
})

slices.SortFunc(f.logs, cmpFileLoc)
// TODO(jackson): Provide a means of scanning the secondary WAL directory
// too.

f.logs = walAccumulator.Finish()
slices.SortFunc(f.manifests, cmpFileLoc)
slices.SortFunc(f.tables, cmpFileLoc)

Expand Down Expand Up @@ -309,17 +315,11 @@ func (f *findT) search(stdout io.Writer, key []byte) []findRef {
// Search the logs for references to the specified key.
func (f *findT) searchLogs(stdout io.Writer, searchKey []byte, refs []findRef) []findRef {
cmp := f.opts.Comparer.Compare
for _, fl := range f.logs {
for _, ll := range f.logs {
_ = func() (err error) {
lf, err := f.opts.FS.Open(fl.path)
if err != nil {
fmt.Fprintf(stdout, "%s\n", err)
return
}
defer lf.Close()

rr := ll.OpenForRead()
if f.verbose {
fmt.Fprintf(stdout, "%s", fl.path)
fmt.Fprintf(stdout, "%s", ll)
defer fmt.Fprintf(stdout, "\n")
}
defer func() {
Expand All @@ -337,17 +337,16 @@ func (f *findT) searchLogs(stdout io.Writer, searchKey []byte, refs []findRef) [
if f.verbose {
fmt.Fprintf(stdout, ": %s", err)
} else {
fmt.Fprintf(stdout, "%s: %s\n", fl.path, err)
fmt.Fprintf(stdout, "%s: %s\n", ll, err)
}
}
}
}()

var b pebble.Batch
var buf bytes.Buffer
rr := record.NewReader(lf, fl.DiskFileNum)
for {
r, err := rr.Next()
r, off, err := rr.NextRecord()
if err == nil {
buf.Reset()
_, err = io.Copy(&buf, r)
Expand All @@ -358,15 +357,15 @@ func (f *findT) searchLogs(stdout io.Writer, searchKey []byte, refs []findRef) [

b = pebble.Batch{}
if err := b.SetRepr(buf.Bytes()); err != nil {
fmt.Fprintf(stdout, "%s: corrupt log file: %v", fl.path, err)
fmt.Fprintf(stdout, "%s: corrupt log file: %v", ll, err)
continue
}
seqNum := b.SeqNum()
for r := b.Reader(); ; seqNum++ {
kind, ukey, value, ok, err := r.Next()
if !ok {
if err != nil {
fmt.Fprintf(stdout, "%s: corrupt log file: %v", fl.path, err)
fmt.Fprintf(stdout, "%s: corrupt log file: %v", ll, err)
break
}
break
Expand Down Expand Up @@ -395,8 +394,8 @@ func (f *findT) searchLogs(stdout io.Writer, searchKey []byte, refs []findRef) [
refs = append(refs, findRef{
key: ikey.Clone(),
value: append([]byte(nil), value...),
fileNum: base.PhysicalTableFileNum(fl.DiskFileNum),
filename: filepath.Base(fl.path),
fileNum: base.FileNum(ll.Num),
filename: filepath.Base(off.PhysicalFile),
})
}
}
Expand Down
6 changes: 3 additions & 3 deletions tool/testdata/find
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ find-db
6 sstables
find-db/MANIFEST-000001
9 edits
find-db/archive/000002.log
find-db/archive/000004.log
find-db/000009.log
000002: {(find-db/archive,000)}
000004: {(find-db/archive,000)}
000009: {(find-db,000)}
find-db/archive/000005.sst
find-db/archive/000006.sst: global seqnum: 15
find-db/archive/000007.sst: global seqnum: 16
Expand Down
6 changes: 3 additions & 3 deletions wal/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ func (a *FileAccumulator) MaybeAccumulate(fs vfs.FS, path string) (isLogFile boo
return a.maybeAccumulate(fs, dirname, filename)
}

// Finish returns a sorted slice of LogicalLogs constructed from the physical
// files observed through MaybeAccumulate.
func (a *FileAccumulator) Finish() []LogicalLog {
// Finish returns a Logs constructed from the physical files observed through
// MaybeAccumulate.
func (a *FileAccumulator) Finish() Logs {
wals := a.wals
a.wals = nil
return wals
Expand Down

0 comments on commit 4560105

Please sign in to comment.