Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify clean_removed handling #3827

Merged
merged 2 commits into from
Apr 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]
- Always use absolute path for event and registry. This can lead to issues when relative paths were used before. {pull}3328[3328]
- Remove code to convert states from 1.x. {pull}3767[3767]
- Remove deprecated config options force_close_files and close_older. {pull}3768[3768]
- Change clean_removed behaviour to also remove states for files which cannot be found anymore under the same name. {pull}3827[3827]

*Heartbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,14 +273,12 @@ NOTE: Every time a file is renamed, the file state is updated and the counter fo
[[clean-removed]]
===== clean_removed

When this option is enabled, Filebeat cleans files from the registry if they cannot be found on disk anymore. This setting does not apply to renamed files or files that were moved to another directory that is still visible to Filebeat. This option is enabled by default.

When this option is enabled, Filebeat cleans files from the registry if they cannot be found on disk anymore under the last known name. This means also files which were renamed after the harvester was finished will be removed. This option is enabled by default.

If a shared drive disappears for a short period and appears again, all files will be read again from the beginning because the states were removed from the registry file. In such cases, we recommend that you disable the `clean_removed` option.

You must disable this option if you also disable `close_removed`.


[[scan-frequency]]
===== scan_frequency

Expand Down
36 changes: 27 additions & 9 deletions filebeat/prospector/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,24 +94,42 @@ func (l *Log) Run() {
if l.config.CleanRemoved {
for _, state := range l.Prospector.states.GetStates() {
// os.Stat will return an error in case the file does not exist
_, err := os.Stat(state.Source)
stat, err := os.Stat(state.Source)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be more strict here and check vs os.PathError?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. What should we do in all other cases? Log an error and not remove the state?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea that's probably the best we can do. os.PathError is all that can return right now but you never now..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

// Only clean up files where state is Finished
if state.Finished {
state.TTL = 0
err := l.Prospector.updateState(input.NewEvent(state))
if err != nil {
logp.Err("File cleanup state update error: %s", err)
}
if os.IsNotExist(err) {
l.removeState(state)
logp.Debug("prospector", "Remove state for file as file removed: %s", state.Source)
} else {
logp.Debug("prospector", "State for file not removed because not finished: %s", state.Source)
logp.Err("Prospector state for %s was not removed: %s", state.Source, err)
}
} else {
// Check if existing source on disk and state are the same. Remove if not the case.
newState := file.NewState(stat, state.Source)
if !newState.FileStateOS.IsSame(state.FileStateOS) {
l.removeState(state)
logp.Debug("prospector", "Remove state for file as file removed or renamed: %s", state.Source)
}
}
}
}
}

func (l *Log) removeState(state file.State) {

// Only clean up files where state is Finished
if !state.Finished {
logp.Debug("prospector", "State for file not removed because harvester not finished: %s", state.Source)
return
}

state.TTL = 0
err := l.Prospector.updateState(input.NewEvent(state))
if err != nil {
logp.Err("File cleanup state update error: %s", err)
}

}

// getFiles returns all files which have to be harvested
// All globs are expanded and then directory and excluded files are removed
func (l *Log) getFiles() map[string]os.FileInfo {
Expand Down
4 changes: 3 additions & 1 deletion filebeat/tests/system/test_harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def test_close_renamed(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/test.log",
close_renamed="true",
clean_removed="false",
scan_frequency="0.1s"
)
os.mkdir(self.working_dir + "/log/")
Expand Down Expand Up @@ -620,7 +621,8 @@ def test_symlink_removed(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/symlink.log",
symlinks="true",
clean_removed="false"
clean_removed="false",
close_removed="false",
)

os.mkdir(self.working_dir + "/log/")
Expand Down
3 changes: 2 additions & 1 deletion filebeat/tests/system/test_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,8 @@ def test_rotating_file_with_restart(self):
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/input*",
scan_frequency="1s",
close_inactive="1s"
close_inactive="1s",
clean_removed="false"
)

if os.name == "nt":
Expand Down