Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tool: adapt find command to explicitly track paths, filenames #3315

Merged
merged 1 commit into from
Feb 20, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 69 additions & 55 deletions tool/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"cmp"
"fmt"
"io"
"path/filepath"
"slices"
"sort"

Expand All @@ -24,9 +25,10 @@ import (
)

type findRef struct {
key base.InternalKey
value []byte
fileNum base.FileNum
key base.InternalKey
value []byte
fileNum base.FileNum
filename string
}

// findT implements the find tool.
Expand All @@ -52,18 +54,19 @@ type findT struct {
fmtValue valueFormatter
verbose bool

// Map from file num to path on disk.
files map[base.DiskFileNum]string
// Map from file num to version edit index which references the file num.
editRefs map[base.DiskFileNum][]int
// List of version edits.
edits []manifest.VersionEdit
// Sorted list of WAL file nums.
logs []base.DiskFileNum
// Sorted list of manifest file nums.
manifests []base.DiskFileNum
// Sorted list of table file nums.
tables []base.FileNum
// List of WAL files sorted by disk file num.
//
// TODO(jackson): Support logical WALs that are split across multiple
// physical file segments.
logs []fileLoc
// List of manifest files sorted by disk file num.
manifests []fileLoc
// List of table files sorted by disk file num.
tables []fileLoc
// Set of tables that contains references to the search key.
tableRefs map[base.FileNum]bool
// Map from file num to table metadata.
Expand All @@ -72,6 +75,15 @@ type findT struct {
errors []string
}

type fileLoc struct {
base.DiskFileNum
path string
}

func cmpFileLoc(a, b fileLoc) int {
return cmp.Compare(a.DiskFileNum, b.DiskFileNum)
}

func newFind(
opts *pebble.Options,
comparers sstable.Comparers,
Expand Down Expand Up @@ -132,12 +144,12 @@ func (f *findT) run(cmd *cobra.Command, args []string) {
f.fmtValue.setForComparer(f.opts.Comparer.Name, f.comparers)

refs := f.search(stdout, key)
var lastFileNum base.FileNum
var lastFilename string
for i := range refs {
r := &refs[i]
if lastFileNum != r.fileNum {
lastFileNum = r.fileNum
fmt.Fprintf(stdout, "%s", f.opts.FS.PathBase(f.files[base.PhysicalTableDiskFileNum(r.fileNum)]))
if lastFilename != r.filename {
lastFilename = r.filename
fmt.Fprintf(stdout, "%s", r.filename)
if m := f.tableMeta[r.fileNum]; m != nil {
fmt.Fprintf(stdout, " ")
formatKeyRange(stdout, f.fmtKey, &m.Smallest, &m.Largest)
Expand All @@ -158,7 +170,6 @@ func (f *findT) run(cmd *cobra.Command, args []string) {

// Find all of the manifests, logs, and tables in the specified directory.
func (f *findT) findFiles(stdout, stderr io.Writer, dir string) error {
f.files = make(map[base.DiskFileNum]string)
f.editRefs = make(map[base.DiskFileNum][]int)
f.logs = nil
f.manifests = nil
Expand All @@ -175,22 +186,22 @@ func (f *findT) findFiles(stdout, stderr io.Writer, dir string) error {
if !ok {
return
}
fl := fileLoc{DiskFileNum: fileNum, path: path}
switch ft {
case base.FileTypeLog:
f.logs = append(f.logs, fileNum)
f.logs = append(f.logs, fl)
case base.FileTypeManifest:
f.manifests = append(f.manifests, fileNum)
f.manifests = append(f.manifests, fl)
case base.FileTypeTable:
f.tables = append(f.tables, base.PhysicalTableFileNum(fileNum))
f.tables = append(f.tables, fl)
default:
return
}
f.files[fileNum] = path
})

slices.Sort(f.logs)
slices.Sort(f.manifests)
slices.Sort(f.tables)
slices.SortFunc(f.logs, cmpFileLoc)
slices.SortFunc(f.manifests, cmpFileLoc)
slices.SortFunc(f.tables, cmpFileLoc)

if f.verbose {
fmt.Fprintf(stdout, "%s\n", dir)
Expand All @@ -204,33 +215,32 @@ func (f *findT) findFiles(stdout, stderr io.Writer, dir string) error {
// Read the manifests and populate the editRefs map which is used to determine
// the provenance and metadata of tables.
func (f *findT) readManifests(stdout io.Writer) {
for _, fileNum := range f.manifests {
for _, fl := range f.manifests {
func() {
path := f.files[fileNum]
mf, err := f.opts.FS.Open(path)
mf, err := f.opts.FS.Open(fl.path)
if err != nil {
fmt.Fprintf(stdout, "%s\n", err)
return
}
defer mf.Close()

if f.verbose {
fmt.Fprintf(stdout, "%s\n", path)
fmt.Fprintf(stdout, "%s\n", fl.path)
}

rr := record.NewReader(mf, 0 /* logNum */)
for {
r, err := rr.Next()
if err != nil {
if err != io.EOF {
fmt.Fprintf(stdout, "%s: %s\n", path, err)
fmt.Fprintf(stdout, "%s: %s\n", fl.path, err)
}
break
}

var ve manifest.VersionEdit
if err := ve.Decode(r); err != nil {
fmt.Fprintf(stdout, "%s: %s\n", path, err)
fmt.Fprintf(stdout, "%s: %s\n", fl.path, err)
break
}
i := len(f.edits)
Expand Down Expand Up @@ -288,26 +298,28 @@ func (f *findT) search(stdout io.Writer, key []byte) []findRef {
// log. Ideally, we'd show the key "a" from the log, then the key "b" from
// the ingested sstable, then key "c" from the log.
slices.SortStableFunc(refs, func(a, b findRef) int {
return cmp.Compare(a.fileNum, b.fileNum)
if v := cmp.Compare(a.fileNum, b.fileNum); v != 0 {
return v
}
return cmp.Compare(a.filename, b.filename)
})
return refs
}

// Search the logs for references to the specified key.
func (f *findT) searchLogs(stdout io.Writer, searchKey []byte, refs []findRef) []findRef {
cmp := f.opts.Comparer.Compare
for _, fileNum := range f.logs {
for _, fl := range f.logs {
_ = func() (err error) {
path := f.files[fileNum]
lf, err := f.opts.FS.Open(path)
lf, err := f.opts.FS.Open(fl.path)
if err != nil {
fmt.Fprintf(stdout, "%s\n", err)
return
}
defer lf.Close()

if f.verbose {
fmt.Fprintf(stdout, "%s", path)
fmt.Fprintf(stdout, "%s", fl.path)
defer fmt.Fprintf(stdout, "\n")
}
defer func() {
Expand All @@ -325,15 +337,15 @@ func (f *findT) searchLogs(stdout io.Writer, searchKey []byte, refs []findRef) [
if f.verbose {
fmt.Fprintf(stdout, ": %s", err)
} else {
fmt.Fprintf(stdout, "%s: %s\n", path, err)
fmt.Fprintf(stdout, "%s: %s\n", fl.path, err)
}
}
}
}()

var b pebble.Batch
var buf bytes.Buffer
rr := record.NewReader(lf, fileNum)
rr := record.NewReader(lf, fl.DiskFileNum)
for {
r, err := rr.Next()
if err == nil {
Expand All @@ -346,15 +358,15 @@ func (f *findT) searchLogs(stdout io.Writer, searchKey []byte, refs []findRef) [

b = pebble.Batch{}
if err := b.SetRepr(buf.Bytes()); err != nil {
fmt.Fprintf(stdout, "%s: corrupt log file: %v", path, err)
fmt.Fprintf(stdout, "%s: corrupt log file: %v", fl.path, err)
continue
}
seqNum := b.SeqNum()
for r := b.Reader(); ; seqNum++ {
kind, ukey, value, ok, err := r.Next()
if !ok {
if err != nil {
fmt.Fprintf(stdout, "%s: corrupt log file: %v", path, err)
fmt.Fprintf(stdout, "%s: corrupt log file: %v", fl.path, err)
break
}
break
Expand All @@ -381,9 +393,10 @@ func (f *findT) searchLogs(stdout io.Writer, searchKey []byte, refs []findRef) [
}

refs = append(refs, findRef{
key: ikey.Clone(),
value: append([]byte(nil), value...),
fileNum: base.PhysicalTableFileNum(fileNum),
key: ikey.Clone(),
value: append([]byte(nil), value...),
fileNum: base.PhysicalTableFileNum(fl.DiskFileNum),
filename: filepath.Base(fl.path),
})
}
}
Expand All @@ -398,18 +411,17 @@ func (f *findT) searchTables(stdout io.Writer, searchKey []byte, refs []findRef)
defer cache.Unref()

f.tableRefs = make(map[base.FileNum]bool)
for _, fileNum := range f.tables {
for _, fl := range f.tables {
_ = func() (err error) {
path := f.files[base.PhysicalTableDiskFileNum(fileNum)]
tf, err := f.opts.FS.Open(path)
tf, err := f.opts.FS.Open(fl.path)
if err != nil {
fmt.Fprintf(stdout, "%s\n", err)
return
}

m := f.tableMeta[fileNum]
m := f.tableMeta[base.PhysicalTableFileNum(fl.DiskFileNum)]
if f.verbose {
fmt.Fprintf(stdout, "%s", path)
fmt.Fprintf(stdout, "%s", fl.path)
if m != nil && m.SmallestSeqNum == m.LargestSeqNum {
fmt.Fprintf(stdout, ": global seqnum: %d", m.LargestSeqNum)
}
Expand All @@ -421,7 +433,7 @@ func (f *findT) searchTables(stdout io.Writer, searchKey []byte, refs []findRef)
if f.verbose {
fmt.Fprintf(stdout, ": %v", err)
} else {
fmt.Fprintf(stdout, "%s: %v\n", path, err)
fmt.Fprintf(stdout, "%s: %v\n", fl.path, err)
}
}
}()
Expand All @@ -438,7 +450,7 @@ func (f *findT) searchTables(stdout io.Writer, searchKey []byte, refs []findRef)
r, err := sstable.NewReader(readable, opts, f.comparers, f.mergers,
private.SSTableRawTombstonesOpt.(sstable.ReaderOption))
if err != nil {
f.errors = append(f.errors, fmt.Sprintf("Unable to decode sstable %s, %s", f.files[base.PhysicalTableDiskFileNum(fileNum)], err.Error()))
f.errors = append(f.errors, fmt.Sprintf("Unable to decode sstable %s, %s", fl.path, err.Error()))
// Ensure the error only gets printed once.
err = nil
return
Expand Down Expand Up @@ -509,19 +521,21 @@ func (f *findT) searchTables(stdout io.Writer, searchKey []byte, refs []findRef)
return err
}
refs = append(refs, findRef{
key: key.Clone(),
value: append([]byte(nil), v...),
fileNum: fileNum,
key: key.Clone(),
value: slices.Clone(v),
fileNum: base.PhysicalTableFileNum(fl.DiskFileNum),
filename: filepath.Base(fl.path),
})
key, value = iter.Next()
} else {
// Use rangedel.Encode to add a reference for each key
// within the span.
err := rangedel.Encode(rangeDel, func(k base.InternalKey, v []byte) error {
refs = append(refs, findRef{
key: k.Clone(),
value: append([]byte(nil), v...),
fileNum: fileNum,
key: k.Clone(),
value: slices.Clone(v),
fileNum: base.PhysicalTableFileNum(fl.DiskFileNum),
filename: filepath.Base(fl.path),
})
return nil
})
Expand All @@ -537,7 +551,7 @@ func (f *findT) searchTables(stdout io.Writer, searchKey []byte, refs []findRef)
}

if foundRef {
f.tableRefs[fileNum] = true
f.tableRefs[base.PhysicalTableFileNum(fl.DiskFileNum)] = true
}
return nil
}()
Expand Down
Loading