Skip to content

Commit

Permalink
Prevent file opening for files reaching ignore_older with same offset (
Browse files Browse the repository at this point in the history
…#1649)

Files which are under ignore_older but kept the same offset, are not opened anymore.

See https://discuss.elastic.co/t/on-restart-harversters-are-opened-for-ignored-files/50038/4
  • Loading branch information
ruflin authored and tsg committed May 17, 2016
1 parent 687efde commit 162fb25
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 6 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ https://github.com/elastic/beats/compare/v1.2.3...1.2[Check the HEAD diff]
=== Beats version 1.2.3
https://github.com/elastic/beats/compare/v1.2.2...v1.2.3[View commits]
==== Added
*Filebeat*
- Prevent file opening for files which reached ignore_older.
==== Bugfixes
*Topbeat*
Expand Down
7 changes: 6 additions & 1 deletion filebeat/crawler/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,11 @@ func (p *Prospector) checkNewFile(newinfo *harvester.FileStat, file string, outp
p.ProspectorConfig.IgnoreOlderDuration != 0 &&
time.Since(newinfo.Fileinfo.ModTime()) > p.ProspectorConfig.IgnoreOlderDuration {

if oldState.offset == newinfo.Fileinfo.Size() {
logp.Debug("prospector", "File size of ignore_file didn't change. Nothing to do: %s", file)
return
}

logp.Debug("prospector", "Fetching old state of file to resume: %s", file)

// Are we resuming a dead file? We have to resume even if dead so we catch any old updates to the file
Expand All @@ -374,7 +379,7 @@ func (p *Prospector) checkNewFile(newinfo *harvester.FileStat, file string, outp
p.ProspectorConfig.IgnoreOlderDuration,
time.Since(newinfo.Fileinfo.ModTime()),
file)
newinfo.Skip(newinfo.Fileinfo.Size())
h.SetOffset(newinfo.Fileinfo.Size())
}
p.registrar.Persist <- h.GetState()
} else if previousFile, err := p.getPreviousFile(file, newinfo.Fileinfo); err == nil {
Expand Down
4 changes: 0 additions & 4 deletions filebeat/harvester/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,3 @@ func (fs *FileStat) Continue(old *FileStat) {
fs.Return = old.Return
}
}

func (fs *FileStat) Skip(returnOffset int64) {
fs.Return <- returnOffset
}
3 changes: 2 additions & 1 deletion filebeat/tests/system/test_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,10 +395,11 @@ def test_state_after_rotation_ignore_older(self):
if os.name == "nt":
# Under windows offset is +1 because of additional newline char
assert data[os.path.abspath(testfile1)]["offset"] == 9
assert data[os.path.abspath(testfile2)]["offset"] == 8
else:
assert data[os.path.abspath(testfile1)]["offset"] == 8
assert data[os.path.abspath(testfile2)]["offset"] == 7

assert data[os.path.abspath(testfile2)]["offset"] == 0

# Rotate files and remove old one
os.rename(testfile2, testfile3)
Expand Down

0 comments on commit 162fb25

Please sign in to comment.