From d6ddf663d03e7a98757c4869e3ef7e38af401d60 Mon Sep 17 00:00:00 2001 From: Nicolas Ruflin Date: Mon, 24 Oct 2016 17:31:15 +0200 Subject: [PATCH] Fix state loading in case a state falls under ignore_older after restart (#2830) Filebeat sets all states by default to Finished: false. On loading states during restart from the registry file, all prospector states are set to Finished: true on setup. These initial updates were not propagated to the registry file which had the effect, that the registry file was having a states with Finished: false until an update came from the prospector. This is now changed in the way that on Init each prospector sends an update to the registry for all states read. To be on the save side for the TTL which could have experied during a restart or that the clean_* config option was changed during restart, the TTL is reset and only overwritten afterwards again in updateState of the propsector before sending the event. Closes https://github.com/elastic/beats/issues/2818 --- CHANGELOG.asciidoc | 1 + filebeat/prospector/prospector_log.go | 20 +++++---- filebeat/tests/system/test_registrar.py | 54 +++++++++++++++++++++++++ 3 files changed, 67 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 19d4ef18805..c4097b6f681 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -48,6 +48,7 @@ https://github.com/elastic/beats/compare/v5.0.0-rc1...master[Check the HEAD diff *Filebeat* - Fix issue when clean_removed and clean_inactive were used together that states were not directly removed from the registry. - Fix issue where upgrading a 1.x registry file resulted in duplicate state entries. {pull}2792[2792] +- Fix registry cleanup issue when files falling under ignore_older after restart. {issue}2818[2818] *Winlogbeat* diff --git a/filebeat/prospector/prospector_log.go b/filebeat/prospector/prospector_log.go index c5b30e51114..3e6dfb6e18e 100644 --- a/filebeat/prospector/prospector_log.go +++ b/filebeat/prospector/prospector_log.go @@ -41,13 +41,19 @@ func (p *ProspectorLog) Init() { fileStates := p.Prospector.states.GetStates() // Make sure all states are set as finished - for key, state := range fileStates { + for _, state := range fileStates { state.Finished = true - fileStates[key] = state + // Set all states again to infinity TTL to make sure only removed if config still same + // clean_inactive / clean_removed could have been changed between restarts + state.TTL = -1 + + // Update prospector states and send new states to registry + err := p.Prospector.updateState(input.NewEvent(state)) + if err != nil { + logp.Err("Problem putting initial state: %+v", err) + } } - // Overwrite prospector states - p.Prospector.states.SetStates(fileStates) p.lastClean = time.Now() logp.Info("Previous states loaded: %v", p.Prospector.states.Count()) @@ -74,8 +80,7 @@ func (p *ProspectorLog) Run() { // Only clean up files where state is Finished if state.Finished { state.TTL = 0 - event := input.NewEvent(state) - err := p.Prospector.updateState(event) + err := p.Prospector.updateState(input.NewEvent(state)) if err != nil { logp.Err("File cleanup state update error: %s", err) } @@ -234,8 +239,7 @@ func (p *ProspectorLog) harvestExistingFile(newState file.State, oldState file.S logp.Debug("prospector", "Updating state for renamed file: %s -> %s, Current offset: %v", oldState.Source, newState.Source, oldState.Offset) // Update state because of file rotation oldState.Source = newState.Source - event := input.NewEvent(oldState) - err := p.Prospector.updateState(event) + err := p.Prospector.updateState(input.NewEvent(oldState)) if err != nil { logp.Err("File rotation state update error: %s", err) } diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index cce3207f281..daba757c385 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -641,6 +641,8 @@ def test_migration_non_windows(self): self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/input*", + clean_removed="false", + clean_inactive="0", ) filebeat = self.start_beat() @@ -1016,3 +1018,55 @@ def test_invalid_state(self): max_timeout=10) filebeat.check_kill_and_wait(exit_code=1) + + def test_restart_state(self): + """ + Make sure that states are rewritten correctly on restart and cleaned + """ + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + close_inactive="1s", + ignore_older="3s", + clean_inactive="5s", + ) + os.mkdir(self.working_dir + "/log/") + + testfile1 = self.working_dir + "/log/test1.log" + testfile2 = self.working_dir + "/log/test2.log" + testfile3 = self.working_dir + "/log/test3.log" + testfile4 = self.working_dir + "/log/test4.log" + + with open(testfile1, 'w') as file: + file.write("Hello World\n") + with open(testfile2, 'w') as file: + file.write("Hello World\n") + with open(testfile3, 'w') as file: + file.write("Hello World\n") + + filebeat = self.start_beat() + + # Make sure states written appears one more time + self.wait_until( + lambda: self.log_contains("Ignore file because ignore_older"), + max_timeout=10) + + filebeat.check_kill_and_wait() + + filebeat = self.start_beat(output="filebeat2.log") + + # Write additional file + with open(testfile4, 'w') as file: + file.write("Hello World\n") + + # Make sure all 4 states are persisted + self.wait_until( + lambda: self.log_contains("Before: 4, After: 4", logfile="filebeat2.log"), + max_timeout=10) + + # Wait until registry file is cleaned + self.wait_until( + lambda: self.log_contains("Before: 0, After: 0", logfile="filebeat2.log"), + max_timeout=10) + + filebeat.check_kill_and_wait()