Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log: create crdb-v2 log entry parser and integrate it into debug merge-logs #65633

Merged
merged 1 commit into from
Jul 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -1266,7 +1266,7 @@ func removeDeadReplicas(
return batch, nil
}

var debugMergeLogsCommand = &cobra.Command{
var debugMergeLogsCmd = &cobra.Command{
Use: "merge-logs <log file globs>",
Short: "merge multiple log files from different machines into a single stream",
Long: `
Expand Down Expand Up @@ -1294,6 +1294,7 @@ var debugMergeLogsOpts = struct {
prefix string
keepRedactable bool
redactInput bool
format string
}{
program: nil, // match everything
file: regexp.MustCompile(log.FilePattern),
Expand All @@ -1307,7 +1308,7 @@ func runDebugMergeLogs(cmd *cobra.Command, args []string) error {
inputEditMode := log.SelectEditMode(o.redactInput, o.keepRedactable)

s, err := newMergedStreamFromPatterns(context.Background(),
args, o.file, o.program, o.from, o.to, inputEditMode)
args, o.file, o.program, o.from, o.to, inputEditMode, o.format)
if err != nil {
return err
}
Expand Down Expand Up @@ -1341,7 +1342,7 @@ var debugCmds = append(DebugCmdsForRocksDB,
debugUnsafeRemoveDeadReplicasCmd,
debugEnvCmd,
debugZipCmd,
debugMergeLogsCommand,
debugMergeLogsCmd,
debugListFilesCmd,
debugResetQuorumCmd,
)
Expand Down Expand Up @@ -1439,7 +1440,7 @@ func init() {
f.IntSliceVar(&removeDeadReplicasOpts.deadStoreIDs, "dead-store-ids", nil,
"list of dead store IDs")

f = debugMergeLogsCommand.Flags()
f = debugMergeLogsCmd.Flags()
f.Var(flagutil.Time(&debugMergeLogsOpts.from), "from",
"time before which messages should be filtered")
// TODO(knz): the "to" should be named "until" - it's a time boundary, not a space boundary.
Expand All @@ -1458,6 +1459,8 @@ func init() {
"keep the output log file redactable")
f.BoolVar(&debugMergeLogsOpts.redactInput, "redact", debugMergeLogsOpts.redactInput,
"redact the input files to remove sensitive information")
f.StringVar(&debugMergeLogsOpts.format, "format", "",
"log format of the input files")

f = debugDecodeProtoCmd.Flags()
f.StringVar(&debugDecodeProtoName, "schema", "cockroach.sql.sqlbase.Descriptor",
Expand Down
39 changes: 29 additions & 10 deletions pkg/cli/debug_merge_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func newMergedStreamFromPatterns(
filePattern, programFilter *regexp.Regexp,
from, to time.Time,
editMode log.EditSensitiveData,
format string,
) (logStream, error) {
paths, err := expandPatterns(patterns)
if err != nil {
Expand All @@ -180,7 +181,7 @@ func newMergedStreamFromPatterns(
if err != nil {
return nil, err
}
return newMergedStream(ctx, files, from, to, editMode)
return newMergedStream(ctx, files, from, to, editMode, format)
}

func groupIndex(re *regexp.Regexp, groupName string) int {
Expand All @@ -193,7 +194,11 @@ func groupIndex(re *regexp.Regexp, groupName string) int {
}

func newMergedStream(
ctx context.Context, files []fileInfo, from, to time.Time, editMode log.EditSensitiveData,
ctx context.Context,
files []fileInfo,
from, to time.Time,
editMode log.EditSensitiveData,
format string,
) (*mergedStream, error) {
// TODO(ajwerner): think about clock movement and PID
const maxConcurrentFiles = 256 // should be far less than the FD limit
Expand All @@ -204,7 +209,7 @@ func newMergedStream(
return func() error {
sem <- struct{}{}
defer func() { <-sem }()
s, err := newFileLogStream(files[i], from, to, editMode)
s, err := newFileLogStream(files[i], from, to, editMode, format)
if s != nil {
res[i] = s
}
Expand Down Expand Up @@ -439,9 +444,10 @@ type fileLogStream struct {
prevTime int64
fi fileInfo
f *os.File
d *log.EntryDecoder
d log.EntryDecoder
read bool
editMode log.EditSensitiveData
format string

e logpb.Entry
err error
Expand All @@ -454,13 +460,14 @@ type fileLogStream struct {
// file is always closed before returning from this constructor so the initial
// peek does not consume resources.
func newFileLogStream(
fi fileInfo, from, to time.Time, editMode log.EditSensitiveData,
fi fileInfo, from, to time.Time, editMode log.EditSensitiveData, format string,
) (logStream, error) {
s := &fileLogStream{
fi: fi,
from: from,
to: to,
editMode: editMode,
format: format,
}
if _, ok := s.peek(); !ok {
if err := s.error(); err != io.EOF {
Expand All @@ -483,10 +490,13 @@ func (s *fileLogStream) open() bool {
if s.f, s.err = os.Open(s.fi.path); s.err != nil {
return false
}
if s.err = seekToFirstAfterFrom(s.f, s.from, s.editMode); s.err != nil {
if s.err = seekToFirstAfterFrom(s.f, s.from, s.editMode, s.format); s.err != nil {
return false
}
s.d = log.NewEntryDecoder(bufio.NewReaderSize(s.f, readBufSize), s.editMode)
var err error
if s.d, err = log.NewEntryDecoderWithFormat(bufio.NewReaderSize(s.f, readBufSize), s.editMode, s.format); err != nil {
panic(err)
}
return true
}

Expand Down Expand Up @@ -541,7 +551,9 @@ func (s *fileLogStream) error() error { return s.err }

// seekToFirstAfterFrom uses binary search to seek to an offset after all
// entries which occur before from.
func seekToFirstAfterFrom(f *os.File, from time.Time, editMode log.EditSensitiveData) (err error) {
func seekToFirstAfterFrom(
cameronnunez marked this conversation as resolved.
Show resolved Hide resolved
f *os.File, from time.Time, editMode log.EditSensitiveData, format string,
) (err error) {
if from.IsZero() {
return nil
}
Expand All @@ -560,8 +572,11 @@ func seekToFirstAfterFrom(f *os.File, from time.Time, editMode log.EditSensitive
panic(err)
}
var e logpb.Entry
err := log.NewEntryDecoder(f, editMode).Decode(&e)
d, err := log.NewEntryDecoderWithFormat(f, editMode, format)
if err != nil {
panic(err)
}
if err := d.Decode(&e); err != nil {
if err == io.EOF {
return true
}
Expand All @@ -573,7 +588,11 @@ func seekToFirstAfterFrom(f *os.File, from time.Time, editMode log.EditSensitive
return err
}
var e logpb.Entry
if err := log.NewEntryDecoder(f, editMode).Decode(&e); err != nil {
d, err := log.NewEntryDecoderWithFormat(f, editMode, format)
if err != nil {
return err
}
if err := d.Decode(&e); err != nil {
return err
}
_, err = f.Seek(int64(offset), io.SeekStart)
Expand Down
Loading