From 91fa70d2c08d678b1cdedd31881110c676636d98 Mon Sep 17 00:00:00 2001 From: ruflin Date: Fri, 17 Mar 2017 13:03:19 +0100 Subject: [PATCH] Remove convert old states code Filebeat 1.x had a different registry format from 5.x. So a conversion of the format happened. This code is now removed for 6.0 as the upgrade path to 6.0 is from 5.0. The code caused some issues in the past so it is better to not have it in anymore. --- CHANGELOG.asciidoc | 1 + filebeat/registrar/registrar.go | 88 ------------ filebeat/registrar/registrar_test.go | 58 -------- filebeat/tests/system/test_migration.py | 178 ------------------------ 4 files changed, 1 insertion(+), 324 deletions(-) delete mode 100644 filebeat/registrar/registrar_test.go delete mode 100644 filebeat/tests/system/test_migration.py diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 04feb7014cd..9244da6b3ef 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -21,6 +21,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff] *Filebeat* - Always use absolute path for event and registry. This can lead to issues when relative paths were used before. {pull}3328[3328] +- Remove code to convert states from 1.x. {pull}3767[3767] *Heartbeat* diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index 3094f650797..8e30ac7b61f 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -7,8 +7,6 @@ import ( "path/filepath" "sync" - "time" - cfg "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/filebeat/input/file" @@ -105,12 +103,6 @@ func (r *Registrar) loadStates() error { logp.Info("Loading registrar data from %s", r.registryFile) - // DEPRECATED: This should be removed in 6.0 - oldStates := r.loadAndConvertOldState(f) - if oldStates { - return nil - } - decoder := json.NewDecoder(f) states := []file.State{} err = decoder.Decode(&states) @@ -125,60 +117,6 @@ func (r *Registrar) loadStates() error { return nil } -// loadAndConvertOldState loads the old state file and converts it to the new state -// This is designed so it can be easily removed in later versions -func (r *Registrar) loadAndConvertOldState(f *os.File) bool { - // Make sure file reader is reset afterwards - defer f.Seek(0, 0) - - stat, err := f.Stat() - if err != nil { - logp.Err("Error getting stat for old state: %+v", err) - return false - } - - // Empty state does not have to be transformed ({} + newline) - if stat.Size() <= 4 { - return false - } - - // Check if already new state format - decoder := json.NewDecoder(f) - newState := []file.State{} - err = decoder.Decode(&newState) - // No error means registry is already in new format - if err == nil { - return false - } - - // Reset file offset - f.Seek(0, 0) - oldStates := map[string]file.State{} - err = decoder.Decode(&oldStates) - if err != nil { - logp.Err("Error decoding old state: %+v", err) - return false - } - - // No old states found -> probably already new format - if oldStates == nil { - return false - } - - // Convert old states to new states - logp.Info("Old registry states found: %v", len(oldStates)) - states := convertOldStates(oldStates) - states = resetStates(states) - r.states.SetStates(states) - - // Rewrite registry in new format - r.writeRegistry() - - logp.Info("Old states converted to new states and written to registrar: %v", len(oldStates)) - - return true -} - // resetStates sets all states to finished and disable TTL on restart // For all states covered by a prospector, TTL will be overwritten with the prospector value func resetStates(states []file.State) []file.State { @@ -192,32 +130,6 @@ func resetStates(states []file.State) []file.State { return states } -func convertOldStates(oldStates map[string]file.State) []file.State { - // Convert old states to new states - states := []file.State{} - for _, state := range oldStates { - // Makes timestamp time of migration, as this is the best guess - state.Timestamp = time.Now() - - // Check for duplicates - dupe := false - for i, other := range states { - if state.FileStateOS.IsSame(other.FileStateOS) { - dupe = true - if state.Offset > other.Offset { - // replace other - states[i] = state - break - } - } - } - if !dupe { - states = append(states, state) - } - } - return states -} - func (r *Registrar) Start() error { // Load the previous log file locations now, for use in prospector diff --git a/filebeat/registrar/registrar_test.go b/filebeat/registrar/registrar_test.go deleted file mode 100644 index eadf0d63050..00000000000 --- a/filebeat/registrar/registrar_test.go +++ /dev/null @@ -1,58 +0,0 @@ -// +build !windows,!integration - -package registrar - -import ( - "sort" - "testing" - - "github.com/elastic/beats/filebeat/input/file" - "github.com/stretchr/testify/assert" -) - -func TestConvertOldStates(t *testing.T) { - type io struct { - Name string - Input map[string]file.State - Output []string - } - tests := []io{ - { - Name: "Simple test with three files", - Input: map[string]file.State{ - "test": {Source: "test", FileStateOS: file.StateOS{Inode: 5}}, - "test1": {Source: "test1", FileStateOS: file.StateOS{Inode: 3}}, - "test2": {Source: "test2", FileStateOS: file.StateOS{Inode: 2}}, - }, - Output: []string{"test", "test1", "test2"}, - }, - { - Name: "De-duplicate inodes. Bigger offset wins (1)", - Input: map[string]file.State{ - "test": {Source: "test", FileStateOS: file.StateOS{Inode: 2}}, - "test1": {Source: "test1", FileStateOS: file.StateOS{Inode: 3}}, - "test2": {Source: "test2", FileStateOS: file.StateOS{Inode: 2}, Offset: 2}, - }, - Output: []string{"test1", "test2"}, - }, - { - Name: "De-duplicate inodes. Bigger offset wins (2)", - Input: map[string]file.State{ - "test": {Source: "test", FileStateOS: file.StateOS{Inode: 2}, Offset: 2}, - "test1": {Source: "test1", FileStateOS: file.StateOS{Inode: 3}}, - "test2": {Source: "test2", FileStateOS: file.StateOS{Inode: 2}, Offset: 0}, - }, - Output: []string{"test", "test1"}, - }, - } - - for _, test := range tests { - result := convertOldStates(test.Input) - resultSources := []string{} - for _, state := range result { - resultSources = append(resultSources, state.Source) - } - sort.Strings(resultSources) - assert.Equal(t, test.Output, resultSources, test.Name) - } -} diff --git a/filebeat/tests/system/test_migration.py b/filebeat/tests/system/test_migration.py deleted file mode 100644 index d26645c5fac..00000000000 --- a/filebeat/tests/system/test_migration.py +++ /dev/null @@ -1,178 +0,0 @@ -from filebeat import BaseTest - -import os -import platform -import time -import shutil -import json -import stat -from nose.plugins.skip import Skip, SkipTest - - -class Test(BaseTest): - - def test_migration_non_windows(self): - """ - Tests if migration from old filebeat registry to new format works - """ - - if os.name == "nt": - raise SkipTest - - registry_file = self.working_dir + '/registry' - - # Write old registry file - with open(registry_file, 'w') as f: - f.write('{"logs/hello.log":{"source":"logs/hello.log","offset":4,"FileStateOS":{"inode":30178938,"device":16777220}},"logs/log2.log":{"source":"logs/log2.log","offset":6,"FileStateOS":{"inode":30178958,"device":16777220}}}') - - self.render_config_template( - path=os.path.abspath(self.working_dir) + "/log/input*", - clean_removed="false", - clean_inactive="0", - ) - - filebeat = self.start_beat() - - self.wait_until( - lambda: self.log_contains("Old registry states found: 2"), - max_timeout=15) - - self.wait_until( - lambda: self.log_contains("Old states converted to new states and written to registrar: 2"), - max_timeout=15) - - filebeat.check_kill_and_wait() - - # Check if content is same as above - assert self.get_registry_entry_by_path("logs/hello.log")["offset"] == 4 - assert self.get_registry_entry_by_path("logs/log2.log")["offset"] == 6 - - # Compare first entry - oldJson = json.loads( - '{"source":"logs/hello.log","offset":4,"FileStateOS":{"inode":30178938,"device":16777220}}') - newJson = self.get_registry_entry_by_path("logs/hello.log") - del newJson["timestamp"] - del newJson["ttl"] - assert newJson == oldJson - - # Compare second entry - oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"inode":30178958,"device":16777220}}') - newJson = self.get_registry_entry_by_path("logs/log2.log") - del newJson["timestamp"] - del newJson["ttl"] - assert newJson == oldJson - - # Make sure the right number of entries is in - data = self.get_registry() - assert len(data) == 2 - - def test_migration_windows(self): - """ - Tests if migration from old filebeat registry to new format works - """ - - if os.name != "nt": - raise SkipTest - - registry_file = self.working_dir + '/registry' - - # Write old registry file - with open(registry_file, 'w') as f: - f.write('{"logs/hello.log":{"source":"logs/hello.log","offset":4,"FileStateOS":{"idxhi":1,"idxlo":12,"vol":34}},"logs/log2.log":{"source":"logs/log2.log","offset":6,"FileStateOS":{"idxhi":67,"idxlo":44,"vol":12}}}') - - self.render_config_template( - path=os.path.abspath(self.working_dir) + "/log/input*", - ) - - filebeat = self.start_beat() - - self.wait_until( - lambda: self.log_contains("Old registry states found: 2"), - max_timeout=15) - - self.wait_until( - lambda: self.log_contains("Old states converted to new states and written to registrar: 2"), - max_timeout=15) - - filebeat.check_kill_and_wait() - - # Check if content is same as above - assert self.get_registry_entry_by_path("logs/hello.log")["offset"] == 4 - assert self.get_registry_entry_by_path("logs/log2.log")["offset"] == 6 - - # Compare first entry - oldJson = json.loads('{"source":"logs/hello.log","offset":4,"FileStateOS":{"idxhi":1,"idxlo":12,"vol":34}}') - newJson = self.get_registry_entry_by_path("logs/hello.log") - del newJson["timestamp"] - del newJson["ttl"] - assert newJson == oldJson - - # Compare second entry - oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"idxhi":67,"idxlo":44,"vol":12}}') - newJson = self.get_registry_entry_by_path("logs/log2.log") - del newJson["timestamp"] - del newJson["ttl"] - assert newJson == oldJson - - # Make sure the right number of entries is in - data = self.get_registry() - assert len(data) == 2 - - def test_migration_continue_reading(self): - """ - Tests if after the migration filebeat keeps reading the file - """ - - os.mkdir(self.working_dir + "/log/") - testfile1 = self.working_dir + "/log/test.log" - - with open(testfile1, 'w') as f: - f.write("entry10\n") - - registry_file = self.working_dir + '/registry' - - self.render_config_template( - path=os.path.abspath(self.working_dir) + "/log/*", - output_file_filename="filebeat_1", - ) - - # Run filebeat to create a registry - filebeat = self.start_beat(output="filebeat1.log") - self.wait_until( - lambda: self.output_has(lines=1, output_file="output/filebeat_1"), - max_timeout=10) - filebeat.check_kill_and_wait() - - # Create old registry file out of the new one - r = self.get_registry() - registry_entry = r[0] - del registry_entry["timestamp"] - del registry_entry["ttl"] - old_registry = {registry_entry["source"]: registry_entry} - - # Overwrite registry - with open(registry_file, 'w') as f: - json.dump(old_registry, f) - - self.render_config_template( - path=os.path.abspath(self.working_dir) + "/log/*", - output_file_filename="filebeat_2", - ) - - filebeat = self.start_beat(output="filebeat2.log") - - # Wait until state is migrated - self.wait_until( - lambda: self.log_contains( - "Old states converted to new states and written to registrar: 1", "filebeat2.log"), - max_timeout=10) - - with open(testfile1, 'a') as f: - f.write("entry12\n") - - # After restart new output file is created -> only 1 new entry - self.wait_until( - lambda: self.output_has(lines=1, output_file="output/filebeat_2"), - max_timeout=10) - - filebeat.check_kill_and_wait()