Skip to content

Commit

Permalink
util/log: create crdb-v2 log entry parser
Browse files Browse the repository at this point in the history
The crdb-v2 log file format does not currently have an entry parser.
This patch creates the new parser alongside the v1 parser.

Release note (cli change): Previously, the crdb-v2 log file format
lacked a parser. This has now changed.

Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
cameronnunez and ajwerner committed Jul 14, 2021
1 parent 835da51 commit 72e4e86
Show file tree
Hide file tree
Showing 79 changed files with 1,232 additions and 216 deletions.
11 changes: 7 additions & 4 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -1266,7 +1266,7 @@ func removeDeadReplicas(
return batch, nil
}

var debugMergeLogsCommand = &cobra.Command{
var debugMergeLogsCmd = &cobra.Command{
Use: "merge-logs <log file globs>",
Short: "merge multiple log files from different machines into a single stream",
Long: `
Expand Down Expand Up @@ -1294,6 +1294,7 @@ var debugMergeLogsOpts = struct {
prefix string
keepRedactable bool
redactInput bool
format string
}{
program: nil, // match everything
file: regexp.MustCompile(log.FilePattern),
Expand All @@ -1307,7 +1308,7 @@ func runDebugMergeLogs(cmd *cobra.Command, args []string) error {
inputEditMode := log.SelectEditMode(o.redactInput, o.keepRedactable)

s, err := newMergedStreamFromPatterns(context.Background(),
args, o.file, o.program, o.from, o.to, inputEditMode)
args, o.file, o.program, o.from, o.to, inputEditMode, o.format)
if err != nil {
return err
}
Expand Down Expand Up @@ -1341,7 +1342,7 @@ var debugCmds = append(DebugCmdsForRocksDB,
debugUnsafeRemoveDeadReplicasCmd,
debugEnvCmd,
debugZipCmd,
debugMergeLogsCommand,
debugMergeLogsCmd,
debugListFilesCmd,
debugResetQuorumCmd,
)
Expand Down Expand Up @@ -1439,7 +1440,7 @@ func init() {
f.IntSliceVar(&removeDeadReplicasOpts.deadStoreIDs, "dead-store-ids", nil,
"list of dead store IDs")

f = debugMergeLogsCommand.Flags()
f = debugMergeLogsCmd.Flags()
f.Var(flagutil.Time(&debugMergeLogsOpts.from), "from",
"time before which messages should be filtered")
// TODO(knz): the "to" should be named "until" - it's a time boundary, not a space boundary.
Expand All @@ -1458,6 +1459,8 @@ func init() {
"keep the output log file redactable")
f.BoolVar(&debugMergeLogsOpts.redactInput, "redact", debugMergeLogsOpts.redactInput,
"redact the input files to remove sensitive information")
f.StringVar(&debugMergeLogsOpts.format, "format", "",
"log format of the input files")

f = debugDecodeProtoCmd.Flags()
f.StringVar(&debugDecodeProtoName, "schema", "cockroach.sql.sqlbase.Descriptor",
Expand Down
39 changes: 29 additions & 10 deletions pkg/cli/debug_merge_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func newMergedStreamFromPatterns(
filePattern, programFilter *regexp.Regexp,
from, to time.Time,
editMode log.EditSensitiveData,
format string,
) (logStream, error) {
paths, err := expandPatterns(patterns)
if err != nil {
Expand All @@ -180,7 +181,7 @@ func newMergedStreamFromPatterns(
if err != nil {
return nil, err
}
return newMergedStream(ctx, files, from, to, editMode)
return newMergedStream(ctx, files, from, to, editMode, format)
}

func groupIndex(re *regexp.Regexp, groupName string) int {
Expand All @@ -193,7 +194,11 @@ func groupIndex(re *regexp.Regexp, groupName string) int {
}

func newMergedStream(
ctx context.Context, files []fileInfo, from, to time.Time, editMode log.EditSensitiveData,
ctx context.Context,
files []fileInfo,
from, to time.Time,
editMode log.EditSensitiveData,
format string,
) (*mergedStream, error) {
// TODO(ajwerner): think about clock movement and PID
const maxConcurrentFiles = 256 // should be far less than the FD limit
Expand All @@ -204,7 +209,7 @@ func newMergedStream(
return func() error {
sem <- struct{}{}
defer func() { <-sem }()
s, err := newFileLogStream(files[i], from, to, editMode)
s, err := newFileLogStream(files[i], from, to, editMode, format)
if s != nil {
res[i] = s
}
Expand Down Expand Up @@ -439,9 +444,10 @@ type fileLogStream struct {
prevTime int64
fi fileInfo
f *os.File
d *log.EntryDecoder
d log.EntryDecoder
read bool
editMode log.EditSensitiveData
format string

e logpb.Entry
err error
Expand All @@ -454,13 +460,14 @@ type fileLogStream struct {
// file is always closed before returning from this constructor so the initial
// peek does not consume resources.
func newFileLogStream(
fi fileInfo, from, to time.Time, editMode log.EditSensitiveData,
fi fileInfo, from, to time.Time, editMode log.EditSensitiveData, format string,
) (logStream, error) {
s := &fileLogStream{
fi: fi,
from: from,
to: to,
editMode: editMode,
format: format,
}
if _, ok := s.peek(); !ok {
if err := s.error(); err != io.EOF {
Expand All @@ -483,10 +490,13 @@ func (s *fileLogStream) open() bool {
if s.f, s.err = os.Open(s.fi.path); s.err != nil {
return false
}
if s.err = seekToFirstAfterFrom(s.f, s.from, s.editMode); s.err != nil {
if s.err = seekToFirstAfterFrom(s.f, s.from, s.editMode, s.format); s.err != nil {
return false
}
s.d = log.NewEntryDecoder(bufio.NewReaderSize(s.f, readBufSize), s.editMode)
var err error
if s.d, err = log.NewEntryDecoderWithFormat(bufio.NewReaderSize(s.f, readBufSize), s.editMode, s.format); err != nil {
panic(err)
}
return true
}

Expand Down Expand Up @@ -541,7 +551,9 @@ func (s *fileLogStream) error() error { return s.err }

// seekToFirstAfterFrom uses binary search to seek to an offset after all
// entries which occur before from.
func seekToFirstAfterFrom(f *os.File, from time.Time, editMode log.EditSensitiveData) (err error) {
func seekToFirstAfterFrom(
f *os.File, from time.Time, editMode log.EditSensitiveData, format string,
) (err error) {
if from.IsZero() {
return nil
}
Expand All @@ -560,8 +572,11 @@ func seekToFirstAfterFrom(f *os.File, from time.Time, editMode log.EditSensitive
panic(err)
}
var e logpb.Entry
err := log.NewEntryDecoder(f, editMode).Decode(&e)
d, err := log.NewEntryDecoderWithFormat(f, editMode, format)
if err != nil {
panic(err)
}
if err := d.Decode(&e); err != nil {
if err == io.EOF {
return true
}
Expand All @@ -573,7 +588,11 @@ func seekToFirstAfterFrom(f *os.File, from time.Time, editMode log.EditSensitive
return err
}
var e logpb.Entry
if err := log.NewEntryDecoder(f, editMode).Decode(&e); err != nil {
d, err := log.NewEntryDecoderWithFormat(f, editMode, format)
if err != nil {
return err
}
if err := d.Decode(&e); err != nil {
return err
}
_, err = f.Seek(int64(offset), io.SeekStart)
Expand Down
Loading

0 comments on commit 72e4e86

Please sign in to comment.