diff --git a/src/cmd/tools/read_index_segments/main/main.go b/src/cmd/tools/read_index_segments/main/main.go index 680e8019cc..7a927d9c9d 100644 --- a/src/cmd/tools/read_index_segments/main/main.go +++ b/src/cmd/tools/read_index_segments/main/main.go @@ -21,28 +21,38 @@ package main import ( - "io" - - "github.com/m3db/m3/src/x/unsafe" - "fmt" + "io" + "io/ioutil" golog "log" + "math" "os" + "runtime" + "sync" "time" "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/query/util/json" "github.com/m3db/m3/src/x/ident" + xsync "github.com/m3db/m3/src/x/sync" + "github.com/m3db/m3/src/x/unsafe" "github.com/pborman/getopt" "go.uber.org/zap" ) +var ( + halfCPUs = int(math.Max(float64(runtime.NumCPU()/2), 1)) + endlineBytes = []byte("\n") +) + func main() { var ( - optPathPrefix = getopt.StringLong("path-prefix", 'p', "/var/lib/m3db", "Path prefix [e.g. /var/lib/m3db]") - optOutputFile = getopt.StringLong("output-file", 'o', "", "Output JSON file of line delimited JSON objects for each segment") + optPathPrefix = getopt.StringLong("path-prefix", 'p', "/var/lib/m3db", "Path prefix [e.g. /var/lib/m3db]") + optOutputFile = getopt.StringLong("output-file", 'o', "", "Output JSON file of line delimited JSON objects for each segment") + optValidate = getopt.BoolLong("validate", 'v', "Validate the segments, do not print out metadata") + optValidateConcurrency = getopt.IntLong("validate-concurrency", 'c', halfCPUs, "Validation concurrency") ) getopt.Parse() @@ -52,23 +62,33 @@ func main() { golog.Fatalf("unable to create logger: %+v", err) } - if *optPathPrefix == "" || *optOutputFile == "" { + if *optOutputFile != "" && *optValidate { + log.Error("cannot write output and validate, do not set output file if validating") + getopt.Usage() + os.Exit(1) + } + + if *optPathPrefix == "" || (*optOutputFile == "" && !*optValidate) { getopt.Usage() os.Exit(1) } run(runOptions{ - filePathPrefix: *optPathPrefix, - outputFilePath: *optOutputFile, - log: log, + filePathPrefix: *optPathPrefix, + outputFilePath: *optOutputFile, + validate: *optValidate, + validateConcurrency: *optValidateConcurrency, + log: log, }) } type runOptions struct { - filePathPrefix string - outputFilePath string - log *zap.Logger + filePathPrefix string + outputFilePath string + validate bool + validateConcurrency int + log *zap.Logger } func run(opts runOptions) { @@ -87,21 +107,38 @@ func run(opts runOptions) { // Get all fileset files. log.Info("discovered namespaces", zap.Strings("namespaces", namespaces)) - out, err := os.Create(opts.outputFilePath) - if err != nil { - log.Fatal("unable to create output file", - zap.String("file", opts.outputFilePath), - zap.Error(err)) + var ( + out io.Writer + validateWorkerPool xsync.WorkerPool + ) + if opts.validate { + // Only validating, output to dev null. + out = ioutil.Discard + validateWorkerPool = xsync.NewWorkerPool(opts.validateConcurrency) + validateWorkerPool.Init() + log.Info("validating segment files", + zap.Int("concurrency", opts.validateConcurrency)) + } else { + // Output to file. + out, err = os.Create(opts.outputFilePath) + if err != nil { + log.Fatal("unable to create output file", + zap.String("file", opts.outputFilePath), + zap.Error(err)) + } + log.Info("writing output JSON line delimited", + zap.String("path", opts.outputFilePath)) } for _, namespace := range namespaces { log.Info("reading segments", zap.String("namespace", namespace)) ns := ident.StringID(namespace) - readNamespaceSegments(out, ns, fsOpts, log) + readNamespaceSegments(out, opts.validate, validateWorkerPool, + ns, fsOpts, log) // Separate by endline. - if _, err := out.WriteString("\n"); err != nil { + if _, err := out.Write(endlineBytes); err != nil { log.Fatal("could not write endline", zap.Error(err)) } } @@ -109,12 +146,17 @@ func run(opts runOptions) { func readNamespaceSegments( out io.Writer, + validate bool, + validateWorkerPool xsync.WorkerPool, nsID ident.ID, fsOpts fs.Options, log *zap.Logger, ) { - infoFiles := fs.ReadIndexInfoFiles(fsOpts.FilePathPrefix(), nsID, - fsOpts.InfoReaderBufferSize()) + var ( + infoFiles = fs.ReadIndexInfoFiles(fsOpts.FilePathPrefix(), nsID, + fsOpts.InfoReaderBufferSize()) + wg sync.WaitGroup + ) for _, infoFile := range infoFiles { if err := infoFile.Err.Error(); err != nil { @@ -126,180 +168,202 @@ func readNamespaceSegments( continue } - log.Info("reading block segments", - zap.String("namespace", nsID.String()), - zap.String("blockStart", infoFile.ID.BlockStart.String()), - zap.Int64("blockStartUnixNano", infoFile.ID.BlockStart.UnixNano()), - zap.Strings("files", infoFile.AbsoluteFilePaths)) - - segments, err := fs.ReadIndexSegments(fs.ReadIndexSegmentsOptions{ - ReaderOptions: fs.IndexReaderOpenOptions{ - Identifier: infoFile.ID, - FileSetType: persist.FileSetFlushType, - }, - FilesystemOptions: fsOpts, - }) - if err != nil { - log.Error("unable to read segments from index fileset", - zap.Stringer("namespace", nsID), - zap.Error(err), - zap.Time("blockStart", time.Unix(0, infoFile.Info.BlockStart)), - zap.Int("volumeIndex", infoFile.ID.VolumeIndex), - ) + if !validate { + readBlockSegments(out, nsID, infoFile, fsOpts, log) continue } - for i, seg := range segments { - jw := json.NewWriter(out) - jw.BeginObject() + // Validating, so use validation concurrency. + wg.Add(1) + validateWorkerPool.Go(func() { + defer wg.Done() + readBlockSegments(out, nsID, infoFile, fsOpts, log) + }) + } - jw.BeginObjectField("namespace") - jw.WriteString(nsID.String()) + // Wait for any concurrent validation. + wg.Wait() +} - jw.BeginObjectField("blockStart") - jw.WriteString(time.Unix(0, infoFile.Info.BlockStart).Format(time.RFC3339)) +func readBlockSegments( + out io.Writer, + nsID ident.ID, + infoFile fs.ReadIndexInfoFileResult, + fsOpts fs.Options, + log *zap.Logger, +) { + // Make sure if we fatal or error out the exact block is known. + log = log.With( + zap.String("namespace", nsID.String()), + zap.String("blockStart", infoFile.ID.BlockStart.String()), + zap.Int64("blockStartUnixNano", infoFile.ID.BlockStart.UnixNano()), + zap.Int("volumeIndex", infoFile.ID.VolumeIndex), + zap.Strings("files", infoFile.AbsoluteFilePaths)) + + log.Info("reading block segments") + + segments, err := fs.ReadIndexSegments(fs.ReadIndexSegmentsOptions{ + ReaderOptions: fs.IndexReaderOpenOptions{ + Identifier: infoFile.ID, + FileSetType: persist.FileSetFlushType, + }, + FilesystemOptions: fsOpts, + }) + if err != nil { + log.Error("unable to read segments from index fileset", zap.Error(err)) + return + } - jw.BeginObjectField("volumeIndex") - jw.WriteInt(infoFile.ID.VolumeIndex) + for i, seg := range segments { + jw := json.NewWriter(out) + jw.BeginObject() - jw.BeginObjectField("segmentIndex") - jw.WriteInt(i) + jw.BeginObjectField("namespace") + jw.WriteString(nsID.String()) - reader, err := seg.Reader() - if err != nil { - log.Fatal("unable to create segment reader", zap.Error(err)) - } + jw.BeginObjectField("blockStart") + jw.WriteString(time.Unix(0, infoFile.Info.BlockStart).Format(time.RFC3339)) - iter, err := reader.AllDocs() - if err != nil { - log.Fatal("unable to iterate segment docs", zap.Error(err)) - } + jw.BeginObjectField("volumeIndex") + jw.WriteInt(infoFile.ID.VolumeIndex) - jw.BeginObjectField("documents") - jw.BeginArray() - for postingsID := 0; iter.Next(); postingsID++ { - d := iter.Current() - jw.BeginObject() + jw.BeginObjectField("segmentIndex") + jw.WriteInt(i) + + reader, err := seg.Reader() + if err != nil { + log.Fatal("unable to create segment reader", zap.Error(err)) + } - jw.BeginObjectField("postingsID") - jw.WriteInt(postingsID) + iter, err := reader.AllDocs() + if err != nil { + log.Fatal("unable to iterate segment docs", zap.Error(err)) + } - jw.BeginObjectField("id") - unsafe.WithString(d.ID, func(str string) { - jw.WriteString(str) - }) + jw.BeginObjectField("documents") + jw.BeginArray() + for postingsID := 0; iter.Next(); postingsID++ { + d := iter.Current() + jw.BeginObject() - jw.BeginObjectField("fields") + jw.BeginObjectField("postingsID") + jw.WriteInt(postingsID) - jw.BeginArray() - for _, field := range d.Fields { - jw.BeginObject() + jw.BeginObjectField("id") + unsafe.WithString(d.ID, func(str string) { + jw.WriteString(str) + }) - jw.BeginObjectField("name") - unsafe.WithString(field.Name, func(str string) { - jw.WriteString(str) - }) + jw.BeginObjectField("fields") + + jw.BeginArray() + for _, field := range d.Fields { + jw.BeginObject() - jw.BeginObjectField("value") - unsafe.WithString(field.Name, func(str string) { - jw.WriteString(str) - }) + jw.BeginObjectField("name") + unsafe.WithString(field.Name, func(str string) { + jw.WriteString(str) + }) - jw.EndObject() - } - jw.EndArray() + jw.BeginObjectField("value") + unsafe.WithString(field.Name, func(str string) { + jw.WriteString(str) + }) jw.EndObject() } jw.EndArray() - if err := iter.Err(); err != nil { - log.Fatal("doc iterator error", zap.Error(err)) - } - if err := iter.Close(); err != nil { - log.Fatal("doc iterator close error", zap.Error(err)) - } + jw.EndObject() + } + jw.EndArray() - fieldsIter, err := seg.FieldsIterable().Fields() + if err := iter.Err(); err != nil { + log.Fatal("doc iterator error", zap.Error(err)) + } + if err := iter.Close(); err != nil { + log.Fatal("doc iterator close error", zap.Error(err)) + } + + fieldsIter, err := seg.FieldsIterable().Fields() + if err != nil { + log.Fatal("could not create fields iterator", zap.Error(err)) + } + + jw.BeginObjectField("fields") + jw.BeginArray() + for fieldsIter.Next() { + field := fieldsIter.Current() + + jw.BeginObject() + jw.BeginObjectField("field") + unsafe.WithString(field, func(str string) { + jw.WriteString(str) + }) + + termsIter, err := seg.TermsIterable().Terms(field) if err != nil { - log.Fatal("could not create fields iterator", zap.Error(err)) + log.Fatal("could not create terms iterator", zap.Error(err)) } - jw.BeginObjectField("fields") + jw.BeginObjectField("terms") jw.BeginArray() - for fieldsIter.Next() { - field := fieldsIter.Current() + for termsIter.Next() { + term, postingsList := termsIter.Current() jw.BeginObject() - jw.BeginObjectField("field") - unsafe.WithString(field, func(str string) { + jw.BeginObjectField("term") + unsafe.WithString(term, func(str string) { jw.WriteString(str) }) - termsIter, err := seg.TermsIterable().Terms(field) - if err != nil { - log.Fatal("could not create terms iterator", zap.Error(err)) - } + postingsIter := postingsList.Iterator() - jw.BeginObjectField("terms") + jw.BeginObjectField("postings") jw.BeginArray() - for termsIter.Next() { - term, postingsList := termsIter.Current() - - jw.BeginObject() - jw.BeginObjectField("term") - unsafe.WithString(term, func(str string) { - jw.WriteString(str) - }) - - postingsIter := postingsList.Iterator() - - jw.BeginObjectField("postings") - jw.BeginArray() - for postingsIter.Next() { - postingsID := postingsIter.Current() - jw.WriteInt(int(postingsID)) - } - jw.EndArray() - jw.EndObject() - - if err := postingsIter.Err(); err != nil { - log.Fatal("postings iterator error", zap.Error(err)) - } - - if err := postingsIter.Close(); err != nil { - log.Fatal("postings iterator close error", zap.Error(err)) - } + for postingsIter.Next() { + postingsID := postingsIter.Current() + jw.WriteInt(int(postingsID)) } jw.EndArray() jw.EndObject() - if err := termsIter.Err(); err != nil { - log.Fatal("field iterator error", zap.Error(err)) + if err := postingsIter.Err(); err != nil { + log.Fatal("postings iterator error", zap.Error(err)) } - if err := termsIter.Close(); err != nil { - log.Fatal("field iterator close error", zap.Error(err)) + if err := postingsIter.Close(); err != nil { + log.Fatal("postings iterator close error", zap.Error(err)) } } jw.EndArray() + jw.EndObject() - if err := fieldsIter.Err(); err != nil { + if err := termsIter.Err(); err != nil { log.Fatal("field iterator error", zap.Error(err)) } - if err := fieldsIter.Close(); err != nil { + if err := termsIter.Close(); err != nil { log.Fatal("field iterator close error", zap.Error(err)) } + } + jw.EndArray() - jw.EndObject() + if err := fieldsIter.Err(); err != nil { + log.Fatal("field iterator error", zap.Error(err)) + } - if err := jw.Flush(); err != nil { - log.Fatal("could not flush JSON writer", zap.Error(err)) - } - if err := jw.Close(); err != nil { - log.Fatal("could not close JSON writer", zap.Error(err)) - } + if err := fieldsIter.Close(); err != nil { + log.Fatal("field iterator close error", zap.Error(err)) + } + + jw.EndObject() + + if err := jw.Flush(); err != nil { + log.Fatal("could not flush JSON writer", zap.Error(err)) + } + if err := jw.Close(); err != nil { + log.Fatal("could not close JSON writer", zap.Error(err)) } } }