Skip to content

Commit

Permalink
util/log: Fix FetchEntriesFromFiles when multiple file groups match
Browse files Browse the repository at this point in the history
Prior to this patch, when `FetchEntriesFromFiles` was fetching entries
from multiple file groups (previously known as "secondary loggers"),
it mistakenly stopped after processing entries from just one logger.

This patch fixes it.

Note: this API is deprecated and should be replaced by something that
knows about file groups.

Release note: None
  • Loading branch information
knz committed Jan 8, 2021
1 parent d081fc0 commit 1d4ad8c
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 25 deletions.
103 changes: 80 additions & 23 deletions pkg/util/log/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,40 @@ func createSymlink(fname, symlink string) {
}
}

// listLogGroups returns slices of logpb.FileInfo structs.
// There is one logpb.FileInfo slice per file sink.
func listLogGroups() (logGroups [][]FileInfo, err error) {
mainDir, isSet := mainLog.logDir.get()
if !isSet {
// Shortcut.
return nil, nil
}
logFiles, err := mainLog.listLogFiles()
if err != nil {
return nil, err
}
logGroups = append(logGroups, logFiles)
secondaryLogRegistry.mu.Lock()
defer secondaryLogRegistry.mu.Unlock()
for _, logger := range secondaryLogRegistry.mu.loggers {
// For now, only gather logs from the main log directory.
// This is because the other APIs don't yet understand
// secondary log directories, and we don't want
// to list a file that cannot be retrieved.
thisLogDir, isSet := logger.logger.logDir.get()
if !isSet || thisLogDir != mainDir {
continue
}

thisLoggerFiles, err := logger.logger.listLogFiles()
if err != nil {
return nil, err
}
logGroups = append(logGroups, thisLoggerFiles)
}
return logGroups, nil
}

// ListLogFiles returns a slice of FileInfo structs for each log file
// on the local node, in any of the configured log directories.
func ListLogFiles() ([]FileInfo, error) {
Expand Down Expand Up @@ -426,10 +460,10 @@ func (a sortableFileInfoSlice) Less(i, j int) bool {
return a[i].Details.Time < a[j].Details.Time
}

// selectFiles selects all log files that have an timestamp before the
// selectFilesInGroup selects all log files that have an timestamp before the
// endTime. It then sorts them in decreasing order, with the most
// recent as the first one.
func selectFiles(logFiles []FileInfo, endTimestamp int64) []FileInfo {
func selectFilesInGroup(logFiles []FileInfo, endTimestamp int64) []FileInfo {
files := sortableFileInfoSlice{}
for _, logFile := range logFiles {
if logFile.Details.Time <= endTimestamp {
Expand All @@ -454,38 +488,61 @@ func FetchEntriesFromFiles(
pattern *regexp.Regexp,
editMode EditSensitiveData,
) ([]Entry, error) {
logFiles, err := ListLogFiles()
logGroups, err := listLogGroups()
if err != nil {
return nil, err
}

selectedFiles := selectFiles(logFiles, endTimestamp)

entries := []Entry{}
for _, file := range selectedFiles {
newEntries, entryBeforeStart, err := readAllEntriesFromFile(
file,
startTimestamp,
endTimestamp,
maxEntries-len(entries),
pattern,
editMode)
if err != nil {
return nil, err
}
entries = append(entries, newEntries...)
if len(entries) >= maxEntries {
break
numGroupsWithEntries := 0
for _, logFiles := range logGroups {
selectedFiles := selectFilesInGroup(logFiles, endTimestamp)

groupHasEntries := false
for _, file := range selectedFiles {
newEntries, entryBeforeStart, err := readAllEntriesFromFile(
file,
startTimestamp,
endTimestamp,
maxEntries-len(entries),
pattern,
editMode)
if err != nil {
return nil, err
}
groupHasEntries = true
entries = append(entries, newEntries...)
if len(entries) >= maxEntries {
break
}
if entryBeforeStart {
// Stop processing files inside the group that won't have any
// timestamps after startTime.
break
}
}
if entryBeforeStart {
// Stop processing files that won't have any timestamps after
// startTime.
break
if groupHasEntries {
numGroupsWithEntries++
}
}

// Within each group, entries are sorted. However if there were
// multiple groups, the final result is not sorted any more. Do it
// now.
if numGroupsWithEntries > 1 {
e := sortableEntries(entries)
sort.Stable(e)
}

return entries, nil
}

type sortableEntries []Entry

func (a sortableEntries) Len() int { return len(a) }
func (a sortableEntries) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a sortableEntries) Less(i, j int) bool { return a[i].Time > a[j].Time }

// readAllEntriesFromFile reads in all log entries from a given file that are
// between the 'startTimestamp' and 'endTimestamp' and match the 'pattern' if it
// exists. It returns the entries in the reverse chronological order. It also
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/log/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestSelectFiles(t *testing.T) {
}

for i, testCase := range testCases {
actualFiles := selectFiles(testFiles, testCase.EndTimestamp)
actualFiles := selectFilesInGroup(testFiles, testCase.EndTimestamp)
previousTimestamp := year2200.UnixNano()
if len(actualFiles) != testCase.ExpectedCount {
t.Errorf("%d: expected %d files, actual %d", i, testCase.ExpectedCount, len(actualFiles))
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/log/log_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (l *loggerT) gcOldFiles() {
}

logFilesCombinedMaxSize := atomic.LoadInt64(&LogFilesCombinedMaxSize)
files := selectFiles(allFiles, math.MaxInt64)
files := selectFilesInGroup(allFiles, math.MaxInt64)
if len(files) == 0 {
return
}
Expand Down

0 comments on commit 1d4ad8c

Please sign in to comment.