Skip to content

Commit

Permalink
Fix rotation issue with ignore_older elastic#1528 (elastic#1532)
Browse files Browse the repository at this point in the history
Following changes were made
* All state information is fetched in advance for later comparison to prevent race conditions
* State is also written to registrar file if offset is 0, same for path
* Test case to validate fix added
  • Loading branch information
ruflin authored and tsg committed May 2, 2016
1 parent 18d57f9 commit 794a7f9
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 29 deletions.
75 changes: 49 additions & 26 deletions filebeat/crawler/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Prospector struct {
registrar *Registrar
missingFiles map[string]os.FileInfo
running bool
oldStates map[string]oldState
}

// Init sets up default config for prospector
Expand Down Expand Up @@ -244,6 +245,15 @@ func (p *Prospector) isFileExcluded(file string) bool {
return false
}

type oldState struct {
fileinfo os.FileInfo
lastinfo harvester.FileStat
newInfo *harvester.FileStat
isKnown bool
offset int64
resuming bool
}

// Scans the specific path which can be a glob (/**/**/*.log)
// For all found files it is checked if a harvester should be started
func (p *Prospector) scan(path string, output chan *input.FileEvent) {
Expand All @@ -258,6 +268,7 @@ func (p *Prospector) scan(path string, output chan *input.FileEvent) {
}

p.missingFiles = map[string]os.FileInfo{}
p.oldStates = map[string]oldState{}

// Check any matched files to see if we need to start a harvester
for _, file := range matches {
Expand All @@ -278,44 +289,58 @@ func (p *Prospector) scan(path string, output chan *input.FileEvent) {
continue
}

newFile := input.File{
FileInfo: fileinfo,
}

if newFile.FileInfo.IsDir() {
if fileinfo.IsDir() {
logp.Debug("prospector", "Skipping directory: %s", file)
continue
}

// Check the current info against p.prospectorinfo[file]
lastinfo, isKnown := p.prospectorList[file]

oldFile := input.File{
FileInfo: lastinfo.Fileinfo,
// Create a new prospector info with the stat info for comparison
newInfo := harvester.NewFileStat(fileinfo, p.iteration)

// Call crawler if there if there exists a state for the given file
offset, resuming := p.registrar.fetchState(file, newInfo.Fileinfo)

p.oldStates[file] = oldState{
fileinfo: fileinfo,
lastinfo: lastinfo,
isKnown: isKnown,
newInfo: newInfo,
offset: offset,
resuming: resuming,
}
}

// Create a new prospector info with the stat info for comparison
newInfo := harvester.NewFileStat(newFile.FileInfo, p.iteration)
for file, oldState := range p.oldStates {
newFile := input.File{
FileInfo: oldState.fileinfo,
}

oldFile := input.File{
FileInfo: oldState.lastinfo.Fileinfo,
}

// Conditions for starting a new harvester:
// - file path hasn't been seen before
// - the file's inode or device changed
if !isKnown {
p.checkNewFile(newInfo, file, output)
if !oldState.isKnown {
p.checkNewFile(oldState.newInfo, file, output, oldState)
} else {
newInfo.Continue(&lastinfo)
p.checkExistingFile(newInfo, &newFile, &oldFile, file, output)
oldState.newInfo.Continue(&oldState.lastinfo)
p.checkExistingFile(oldState.newInfo, &newFile, &oldFile, file, output, oldState)
}

// Track the stat data for this file for later comparison to check for
// rotation/etc
p.prospectorList[file] = *newInfo
p.prospectorList[file] = *oldState.newInfo
} // for each file matched by the glob
}

// Check if harvester for new file has to be started
// For a new file the following options exist:
func (p *Prospector) checkNewFile(newinfo *harvester.FileStat, file string, output chan *input.FileEvent) {
func (p *Prospector) checkNewFile(newinfo *harvester.FileStat, file string, output chan *input.FileEvent, oldState oldState) {

logp.Debug("prospector", "Start harvesting unknown file: %s", file)

Expand All @@ -327,9 +352,6 @@ func (p *Prospector) checkNewFile(newinfo *harvester.FileStat, file string, outp
return
}

// Call crawler if there if there exists a state for the given file
offset, resuming := p.registrar.fetchState(file, newinfo.Fileinfo)

// Check for unmodified time, but only if the file modification time is before the last scan started
// This ensures we don't skip genuine creations with dead times less than 10s
if newinfo.Fileinfo.ModTime().Before(p.lastscan) &&
Expand All @@ -341,10 +363,10 @@ func (p *Prospector) checkNewFile(newinfo *harvester.FileStat, file string, outp
// Are we resuming a dead file? We have to resume even if dead so we catch any old updates to the file
// This is safe as the harvester, once it hits the EOF and a timeout, will stop harvesting
// Once we detect changes again we can resume another harvester again - this keeps number of go routines to a minimum
if resuming {
if oldState.resuming {
logp.Debug("prospector", "Resuming harvester on a previously harvested file: %s", file)

h.SetOffset(offset)
h.SetOffset(oldState.offset)
h.Start()
} else {
// Old file, skip it, but push offset of file size so we start from the end if this file changes and needs picking up
Expand All @@ -357,21 +379,21 @@ func (p *Prospector) checkNewFile(newinfo *harvester.FileStat, file string, outp
p.registrar.Persist <- h.GetState()
} else if previousFile, err := p.getPreviousFile(file, newinfo.Fileinfo); err == nil {
// This file was simply renamed (known inode+dev) - link the same harvester channel as the old file
logp.Debug("prospector", "File rename was detected: %s -> %s", previousFile, file)
logp.Debug("prospector", "File rename was detected, not a new file: %s -> %s", previousFile, file)
lastinfo := p.prospectorList[previousFile]
newinfo.Continue(&lastinfo)
p.registrar.Persist <- h.GetState()
} else {

// Are we resuming a file or is this a completely new file?
if resuming {
if oldState.resuming {
logp.Debug("prospector", "Resuming harvester on a previously harvested file: %s", file)
} else {
logp.Debug("prospector", "Launching harvester on new file: %s", file)
}

// Launch the harvester
h.SetOffset(offset)
h.SetOffset(oldState.offset)
h.Start()
p.registrar.Persist <- h.GetState()
}
Expand All @@ -384,7 +406,7 @@ func (p *Prospector) checkNewFile(newinfo *harvester.FileStat, file string, outp
// * The new file is not the same as the old file, means file was renamed
// ** New file is actually really a new file, start a new harvester
// ** Renamed file has a state, continue there
func (p *Prospector) checkExistingFile(newinfo *harvester.FileStat, newFile *input.File, oldFile *input.File, file string, output chan *input.FileEvent) {
func (p *Prospector) checkExistingFile(newinfo *harvester.FileStat, newFile *input.File, oldFile *input.File, file string, output chan *input.FileEvent, oldState oldState) {

logp.Debug("prospector", "Update existing file for harvesting: %s", file)

Expand All @@ -400,15 +422,16 @@ func (p *Prospector) checkExistingFile(newinfo *harvester.FileStat, newFile *inp

if previousFile, err := p.getPreviousFile(file, newinfo.Fileinfo); err == nil {
// This file was renamed from another file we know - link the same harvester channel as the old file
logp.Debug("prospector", "File rename was detected: %s -> %s", previousFile, file)
logp.Debug("prospector", "File rename was detected, existing file: %s -> %s", previousFile, file)
logp.Debug("prospector", "Launching harvester on renamed file: %s", file)

lastinfo := p.prospectorList[previousFile]
h.SetOffset(p.oldStates[previousFile].offset)
newinfo.Continue(&lastinfo)
p.registrar.Persist <- h.GetState()
} else {
// File is not the same file we saw previously, it must have rotated and is a new file
logp.Debug("prospector", "Launching harvester on rotated file: %s", file)
logp.Debug("prospector", "Launching harvester on new file: %s. Old file was probably rotated", file)

// Forget about the previous harvester and let it continue on the old file - so start a new channel to use with the new harvester
newinfo.Ignore()
Expand Down
2 changes: 1 addition & 1 deletion filebeat/crawler/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (r *Registrar) fetchState(filePath string, fileInfo os.FileInfo) (int64, bo
lastState, isFound := r.GetFileState(filePath)

if isFound && input.IsSameFile(filePath, fileInfo) {
logp.Debug("registar", "Same file as before found. Fetch the state and persist it.")
logp.Debug("registar", "Same file as before found. Fetch the state.")
// We're resuming - throw the last state back downstream so we resave it
// And return the offset - also force harvest in case the file is old and we're about to skip it
return lastState.Offset, true
Expand Down
4 changes: 2 additions & 2 deletions filebeat/input/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ type FileEvent struct {
}

type FileState struct {
Source string `json:"source,omitempty"`
Offset int64 `json:"offset,omitempty"`
Source string `json:"source"`
Offset int64 `json:"offset"`
FileStateOS *FileStateOS
}

Expand Down
85 changes: 85 additions & 0 deletions filebeat/tests/system/test_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,3 +353,88 @@ def test_rotating_file_with_shutdown(self):

# Check that 2 files are part of the registrar file. The deleted file should never have been detected
assert len(data) == 2


def test_state_after_rotation(self):
"""
Checks that the state is written correctly after rotation
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/input*",
ignoreOlder="2m",
scan_frequency="1s"
)


os.mkdir(self.working_dir + "/log/")
testfile1 = self.working_dir + "/log/input"
testfile2 = self.working_dir + "/log/input.1"
testfile3 = self.working_dir + "/log/input.2"

with open(testfile1, 'w') as f:
f.write("entry10\n")

with open(testfile2, 'w') as f:
f.write("entry0\n")

# Change modification time so file extends ignore_older
yesterday = time.time() - 3600*24
os.utime(testfile2, (yesterday, yesterday))

filebeat = self.start_filebeat()

self.wait_until(
lambda: self.output_has(lines=1),
max_timeout=10)

# Wait a moment to make sure file exists
time.sleep(1)
data = self.get_dot_filebeat()

# Check that offsets are correct
if os.name == "nt":
# Under windows offset is +1 because of additional newline char
assert data[os.path.abspath(testfile1)]["offset"] == 9
else:
assert data[os.path.abspath(testfile1)]["offset"] == 8

assert data[os.path.abspath(testfile2)]["offset"] == 0

# Rotate files and remove old one
os.rename(testfile2, testfile3)
os.rename(testfile1, testfile2)

with open(testfile1, 'w') as f:
f.write("entry200\n")

# Remove file afterwards to make sure not inode reuse happens
os.remove(testfile3)

# Now wait until rotation is detected
self.wait_until(
lambda: self.log_contains(
"File rename was detected, existing file"),
max_timeout=10)


c = self.log_contains_count("states written")

self.wait_until(
lambda: self.log_contains_count(
"Registry file updated. 2 states written.") >= 4,
max_timeout=15)

filebeat.kill_and_wait()


data = self.get_dot_filebeat()

# Check that offsets are correct
if os.name == "nt":
# Under windows offset is +1 because of additional newline char
assert data[os.path.abspath(testfile1)]["offset"] == 10
assert data[os.path.abspath(testfile2)]["offset"] == 9
else:
assert data[os.path.abspath(testfile1)]["offset"] == 9
assert data[os.path.abspath(testfile2)]["offset"] == 8

0 comments on commit 794a7f9

Please sign in to comment.