From fb47507f73e59a8e693c1014d2225a0c95f19fa1 Mon Sep 17 00:00:00 2001 From: Nicolas Ruflin Date: Tue, 10 Jan 2017 14:57:16 +0100 Subject: [PATCH] Fix bug when migrating old states, 2 restarts are required (#3322) The state of migrated states was not properly updated in the registry file. This lead to the issue that after the first restart, the states were migrated but the prospector assumed the states were not finished and didn't start harvesting. A second restart resolved the problem. Discussion started here: https://discuss.elastic.co/t/filebeat-upgrade-requiring-multiple-restarts/70414/8 --- CHANGELOG.asciidoc | 1 + filebeat/registrar/registrar.go | 24 ++-- filebeat/tests/system/test_migration.py | 178 ++++++++++++++++++++++++ filebeat/tests/system/test_registrar.py | 108 -------------- 4 files changed, 194 insertions(+), 117 deletions(-) create mode 100644 filebeat/tests/system/test_migration.py diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 613aa582bf9..71de44dec74 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -46,6 +46,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff] *Filebeat* - Fix registry cleanup issue when files falling under ignore_older after restart. {issue}2818[2818] +- Fix registry migration issue from old states were files were only harvested after second restart. {pull}3322[3322] *Winlogbeat* - Fix for "The array bounds are invalid" error when reading large events. {issue}3076[3076] diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index 1c034db30b9..83d6d006e0a 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -118,15 +118,7 @@ func (r *Registrar) loadStates() error { return fmt.Errorf("Error decoding states: %s", err) } - // Set all states to finished and disable TTL on restart - // For all states covered by a prospector, TTL will be overwritten with the prospector value - for key, state := range states { - state.Finished = true - // Set ttl to -2 to easily spot which states are not managed by a prospector - state.TTL = -2 - states[key] = state - } - + states = resetStates(states) r.states.SetStates(states) logp.Info("States Loaded from registrar: %+v", len(states)) @@ -176,6 +168,7 @@ func (r *Registrar) loadAndConvertOldState(f *os.File) bool { // Convert old states to new states logp.Info("Old registry states found: %v", len(oldStates)) states := convertOldStates(oldStates) + states = resetStates(states) r.states.SetStates(states) // Rewrite registry in new format @@ -186,6 +179,19 @@ func (r *Registrar) loadAndConvertOldState(f *os.File) bool { return true } +// resetStates sets all states to finished and disable TTL on restart +// For all states covered by a prospector, TTL will be overwritten with the prospector value +func resetStates(states []file.State) []file.State { + + for key, state := range states { + state.Finished = true + // Set ttl to -2 to easily spot which states are not managed by a prospector + state.TTL = -2 + states[key] = state + } + return states +} + func convertOldStates(oldStates map[string]file.State) []file.State { // Convert old states to new states states := []file.State{} diff --git a/filebeat/tests/system/test_migration.py b/filebeat/tests/system/test_migration.py new file mode 100644 index 00000000000..62fe99c0198 --- /dev/null +++ b/filebeat/tests/system/test_migration.py @@ -0,0 +1,178 @@ +from filebeat import BaseTest + +import os +import platform +import time +import shutil +import json +import stat +from nose.plugins.skip import Skip, SkipTest + + +class Test(BaseTest): + + def test_migration_non_windows(self): + """ + Tests if migration from old filebeat registry to new format works + """ + + if os.name == "nt": + raise SkipTest + + registry_file = self.working_dir + '/registry' + + # Write old registry file + with open(registry_file, 'w') as f: + f.write('{"logs/hello.log":{"source":"logs/hello.log","offset":4,"FileStateOS":{"inode":30178938,"device":16777220}},"logs/log2.log":{"source":"logs/log2.log","offset":6,"FileStateOS":{"inode":30178958,"device":16777220}}}') + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/input*", + clean_removed="false", + clean_inactive="0", + ) + + filebeat = self.start_beat() + + self.wait_until( + lambda: self.log_contains("Old registry states found: 2"), + max_timeout=15) + + self.wait_until( + lambda: self.log_contains("Old states converted to new states and written to registrar: 2"), + max_timeout=15) + + filebeat.check_kill_and_wait() + + # Check if content is same as above + assert self.get_registry_entry_by_path("logs/hello.log")["offset"] == 4 + assert self.get_registry_entry_by_path("logs/log2.log")["offset"] == 6 + + # Compare first entry + oldJson = json.loads('{"source":"logs/hello.log","offset":4,"FileStateOS":{"inode":30178938,"device":16777220}}') + newJson = self.get_registry_entry_by_path("logs/hello.log") + del newJson["timestamp"] + del newJson["ttl"] + assert newJson == oldJson + + # Compare second entry + oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"inode":30178958,"device":16777220}}') + newJson = self.get_registry_entry_by_path("logs/log2.log") + del newJson["timestamp"] + del newJson["ttl"] + assert newJson == oldJson + + # Make sure the right number of entries is in + data = self.get_registry() + assert len(data) == 2 + + def test_migration_windows(self): + """ + Tests if migration from old filebeat registry to new format works + """ + + if os.name != "nt": + raise SkipTest + + registry_file = self.working_dir + '/registry' + + # Write old registry file + with open(registry_file, 'w') as f: + f.write('{"logs/hello.log":{"source":"logs/hello.log","offset":4,"FileStateOS":{"idxhi":1,"idxlo":12,"vol":34}},"logs/log2.log":{"source":"logs/log2.log","offset":6,"FileStateOS":{"idxhi":67,"idxlo":44,"vol":12}}}') + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/input*", + ) + + filebeat = self.start_beat() + + self.wait_until( + lambda: self.log_contains("Old registry states found: 2"), + max_timeout=15) + + self.wait_until( + lambda: self.log_contains("Old states converted to new states and written to registrar: 2"), + max_timeout=15) + + filebeat.check_kill_and_wait() + + # Check if content is same as above + assert self.get_registry_entry_by_path("logs/hello.log")["offset"] == 4 + assert self.get_registry_entry_by_path("logs/log2.log")["offset"] == 6 + + # Compare first entry + oldJson = json.loads('{"source":"logs/hello.log","offset":4,"FileStateOS":{"idxhi":1,"idxlo":12,"vol":34}}') + newJson = self.get_registry_entry_by_path("logs/hello.log") + del newJson["timestamp"] + del newJson["ttl"] + assert newJson == oldJson + + # Compare second entry + oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"idxhi":67,"idxlo":44,"vol":12}}') + newJson = self.get_registry_entry_by_path("logs/log2.log") + del newJson["timestamp"] + del newJson["ttl"] + assert newJson == oldJson + + # Make sure the right number of entries is in + data = self.get_registry() + assert len(data) == 2 + + def test_migration_continue_reading(self): + """ + Tests if after the migration filebeat keeps reading the file + """ + + os.mkdir(self.working_dir + "/log/") + testfile1 = self.working_dir + "/log/test.log" + + with open(testfile1, 'w') as f: + f.write("entry10\n") + + registry_file = self.working_dir + '/registry' + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + output_file_filename="filebeat_1", + ) + + # Run filebeat to create a registry + filebeat = self.start_beat(output="filebeat1.log") + self.wait_until( + lambda: self.output_has(lines=1, output_file="output/filebeat_1"), + max_timeout=10) + filebeat.check_kill_and_wait() + + # Create old registry file out of the new one + r = self.get_registry() + registry_entry = r[0] + del registry_entry["timestamp"] + del registry_entry["ttl"] + old_registry = {registry_entry["source"]: registry_entry} + + # Overwrite registry + with open(registry_file, 'w') as f: + json.dump(old_registry, f) + + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + output_file_filename="filebeat_2", + ) + + filebeat = self.start_beat(output="filebeat2.log") + + # Wait until state is migrated + self.wait_until( + lambda: self.log_contains( + "Old states converted to new states and written to registrar: 1", "filebeat2.log"), + max_timeout=10) + + with open(testfile1, 'a') as f: + f.write("entry12\n") + + # After restart new output file is created -> only 1 new entry + self.wait_until( + lambda: self.output_has(lines=1, output_file="output/filebeat_2"), + max_timeout=10) + + filebeat.check_kill_and_wait() diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index 6bdd8ed5fe4..04984139858 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -624,114 +624,6 @@ def test_state_after_rotation_ignore_older(self): assert self.get_registry_entry_by_path(os.path.abspath(testfile1))["offset"] == 9 assert self.get_registry_entry_by_path(os.path.abspath(testfile2))["offset"] == 8 - - def test_migration_non_windows(self): - """ - Tests if migration from old filebeat registry to new format works - """ - - if os.name == "nt": - raise SkipTest - - registry_file = self.working_dir + '/registry' - - # Write old registry file - with open(registry_file, 'w') as f: - f.write('{"logs/hello.log":{"source":"logs/hello.log","offset":4,"FileStateOS":{"inode":30178938,"device":16777220}},"logs/log2.log":{"source":"logs/log2.log","offset":6,"FileStateOS":{"inode":30178958,"device":16777220}}}') - - self.render_config_template( - path=os.path.abspath(self.working_dir) + "/log/input*", - clean_removed="false", - clean_inactive="0", - ) - - filebeat = self.start_beat() - - self.wait_until( - lambda: self.log_contains("Old registry states found: 2"), - max_timeout=15) - - self.wait_until( - lambda: self.log_contains("Old states converted to new states and written to registrar: 2"), - max_timeout=15) - - filebeat.check_kill_and_wait() - - # Check if content is same as above - assert self.get_registry_entry_by_path("logs/hello.log")["offset"] == 4 - assert self.get_registry_entry_by_path("logs/log2.log")["offset"] == 6 - - # Compare first entry - oldJson = json.loads('{"source":"logs/hello.log","offset":4,"FileStateOS":{"inode":30178938,"device":16777220}}') - newJson = self.get_registry_entry_by_path("logs/hello.log") - del newJson["timestamp"] - del newJson["ttl"] - assert newJson == oldJson - - # Compare second entry - oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"inode":30178958,"device":16777220}}') - newJson = self.get_registry_entry_by_path("logs/log2.log") - del newJson["timestamp"] - del newJson["ttl"] - assert newJson == oldJson - - # Make sure the right number of entries is in - data = self.get_registry() - assert len(data) == 2 - - def test_migration_windows(self): - """ - Tests if migration from old filebeat registry to new format works - """ - - if os.name != "nt": - raise SkipTest - - registry_file = self.working_dir + '/registry' - - # Write old registry file - with open(registry_file, 'w') as f: - f.write('{"logs/hello.log":{"source":"logs/hello.log","offset":4,"FileStateOS":{"idxhi":1,"idxlo":12,"vol":34}},"logs/log2.log":{"source":"logs/log2.log","offset":6,"FileStateOS":{"idxhi":67,"idxlo":44,"vol":12}}}') - - self.render_config_template( - path=os.path.abspath(self.working_dir) + "/log/input*", - ) - - filebeat = self.start_beat() - - self.wait_until( - lambda: self.log_contains("Old registry states found: 2"), - max_timeout=15) - - self.wait_until( - lambda: self.log_contains("Old states converted to new states and written to registrar: 2"), - max_timeout=15) - - filebeat.check_kill_and_wait() - - # Check if content is same as above - assert self.get_registry_entry_by_path("logs/hello.log")["offset"] == 4 - assert self.get_registry_entry_by_path("logs/log2.log")["offset"] == 6 - - # Compare first entry - oldJson = json.loads('{"source":"logs/hello.log","offset":4,"FileStateOS":{"idxhi":1,"idxlo":12,"vol":34}}') - newJson = self.get_registry_entry_by_path("logs/hello.log") - del newJson["timestamp"] - del newJson["ttl"] - assert newJson == oldJson - - # Compare second entry - oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"idxhi":67,"idxlo":44,"vol":12}}') - newJson = self.get_registry_entry_by_path("logs/log2.log") - del newJson["timestamp"] - del newJson["ttl"] - assert newJson == oldJson - - # Make sure the right number of entries is in - data = self.get_registry() - assert len(data) == 2 - - def test_clean_inactive(self): """ Checks that states are properly removed after clean_inactive