From c82430126e7b7294e2ec9531eda379b532f14adb Mon Sep 17 00:00:00 2001 From: ruflin Date: Mon, 23 May 2016 11:33:02 +0200 Subject: [PATCH 1/3] Migration to new state structure The previous state list was dependent on a file path. This had the potential to lead to overwrite and conflicts on file rotation. As the file path is also stored inside the state object this information was duplicated. Now a pure array is stored in the registry which makes the format more flexibel and brings it close to the format used by Logstash. Changes: * Not storing an index with paths as this leaded to duplicates * Introduction of last_seen to differentiate between entries with same bath which show up multiple times. This introduces also the base to clean up the registry file. Currently it is only used in the prospector, no registry yet. * Registry can now contain multiple entries for a path * Refactor registrar to use the same state array as prospector does * Introduce migration check to migrate from old to new state. Make it backward compatible by checking for old structure on startup * Decouple registrar from prospector * Update tests to follow new model --- CHANGELOG.asciidoc | 1 + filebeat/beater/filebeat.go | 6 +- filebeat/crawler/crawler.go | 8 +- filebeat/crawler/prospector.go | 11 +- filebeat/crawler/prospector_log.go | 23 ++- filebeat/crawler/prospector_test.go | 2 +- filebeat/crawler/registrar.go | 177 ++++++++++++----------- filebeat/input/state.go | 30 +++- filebeat/tests/system/filebeat.py | 23 +++ filebeat/tests/system/test_crawler.py | 8 +- filebeat/tests/system/test_registrar.py | 180 ++++++++++++++++++------ 11 files changed, 303 insertions(+), 166 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 4706950fa26..73e99fed8b6 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -55,6 +55,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha2...master[Check the HEAD d *Topbeat* *Filebeat* +- The registry format was changed to an array instead of dict. The migration to the new format will happen automatically at the first startup. {pull}1703[1703] *Winlogbeat* diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 98d52749ba2..47f1beca0a8 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -71,7 +71,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error { } // Load the previous log file locations now, for use in prospector - fb.registrar.LoadState() + err = fb.registrar.LoadState() + if err != nil { + logp.Err("Error loading state: %v", err) + return err + } // Init and Start spooler: Harvesters dump events into the spooler. fb.spooler = NewSpooler(fb.FbConfig.Filebeat, fb.publisherChan) diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index e0b46a18242..d4c6277f5e3 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -36,10 +36,13 @@ func (c *Crawler) Start(prospectorConfigs []*common.Config, eventChan chan *inpu logp.Info("Loading Prospectors: %v", len(prospectorConfigs)) + // Get existing states + states := *c.Registrar.state + // Prospect the globs/paths given on the command line and launch harvesters for _, prospectorConfig := range prospectorConfigs { - prospector, err := NewProspector(prospectorConfig, c.Registrar, eventChan) + prospector, err := NewProspector(prospectorConfig, states, eventChan) if err != nil { return fmt.Errorf("Error in initing prospector: %s", err) } @@ -60,10 +63,9 @@ func (c *Crawler) Start(prospectorConfigs []*common.Config, eventChan chan *inpu logp.Debug("crawler", "Starting prospector %v", id) prospector.Run() }(i, p) - } - logp.Info("All prospectors are initialised and running with %d states to persist", len(c.Registrar.getState())) + logp.Info("All prospectors are initialised and running with %d states to persist", c.Registrar.state.Count()) return nil } diff --git a/filebeat/crawler/prospector.go b/filebeat/crawler/prospector.go index 11480e7def6..e624a57e73b 100644 --- a/filebeat/crawler/prospector.go +++ b/filebeat/crawler/prospector.go @@ -17,7 +17,6 @@ type Prospector struct { prospectorer Prospectorer spoolerChan chan *input.FileEvent harvesterChan chan *input.FileEvent - registrar *Registrar done chan struct{} states *input.States wg sync.WaitGroup @@ -28,14 +27,13 @@ type Prospectorer interface { Run() } -func NewProspector(cfg *common.Config, registrar *Registrar, spoolerChan chan *input.FileEvent) (*Prospector, error) { +func NewProspector(cfg *common.Config, states input.States, spoolerChan chan *input.FileEvent) (*Prospector, error) { prospector := &Prospector{ config: defaultConfig, - registrar: registrar, spoolerChan: spoolerChan, harvesterChan: make(chan *input.FileEvent), done: make(chan struct{}), - states: input.NewStates(), + states: &states, wg: sync.WaitGroup{}, } @@ -44,7 +42,6 @@ func NewProspector(cfg *common.Config, registrar *Registrar, spoolerChan chan *i } err := prospector.Init() - if err != nil { return nil, err } @@ -72,15 +69,13 @@ func (p *Prospector) Init() error { switch p.config.Harvester.InputType { case cfg.StdinInputType: prospectorer, err = NewProspectorStdin(p) - prospectorer.Init() case cfg.LogInputType: prospectorer, err = NewProspectorLog(p) - prospectorer.Init() - default: return fmt.Errorf("Invalid prospector type: %v", p.config.Harvester.InputType) } + prospectorer.Init() p.prospectorer = prospectorer return nil diff --git a/filebeat/crawler/prospector_log.go b/filebeat/crawler/prospector_log.go index eeebae364ef..281aa5c8b7b 100644 --- a/filebeat/crawler/prospector_log.go +++ b/filebeat/crawler/prospector_log.go @@ -30,24 +30,17 @@ func (p *ProspectorLog) Init() { logp.Debug("prospector", "exclude_files: %s", p.config.ExcludeFiles) logp.Info("Load previous states from registry into memory") + fileStates := p.Prospector.states.GetStates() - // Load the initial state from the registry - for path, fileinfo := range p.getFiles() { - - // Check for each path found, if there is a previous state - offset := p.Prospector.registrar.fetchState(path, fileinfo) - - // Offset found -> skip to previous state - if offset > 0 { - state := input.NewFileState(fileinfo, path) - state.Offset = offset - // Make sure new harvester is started for all states - state.Finished = true - // Prospector must update all states as it has to detect also file rotation - p.Prospector.states.Update(state) - } + // Make sure all states are set as finished + for key, state := range fileStates { + state.Finished = true + fileStates[key] = state } + // Overwrite prospector states + p.Prospector.states.SetStates(fileStates) + logp.Info("Previous states loaded: %v", p.Prospector.states.Count()) } diff --git a/filebeat/crawler/prospector_test.go b/filebeat/crawler/prospector_test.go index ba660e396a2..7c8efeb48f7 100644 --- a/filebeat/crawler/prospector_test.go +++ b/filebeat/crawler/prospector_test.go @@ -15,7 +15,7 @@ import ( func TestProspectorDefaultConfigs(t *testing.T) { - prospector, err := NewProspector(common.NewConfig(), nil, nil) + prospector, err := NewProspector(common.NewConfig(), *input.NewStates(), nil) assert.NoError(t, err) // Default values expected diff --git a/filebeat/crawler/registrar.go b/filebeat/crawler/registrar.go index fdc5a406d2f..f7f5ebb4195 100644 --- a/filebeat/crawler/registrar.go +++ b/filebeat/crawler/registrar.go @@ -7,6 +7,8 @@ import ( "path/filepath" "sync" + "time" + cfg "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/input" . "github.com/elastic/beats/filebeat/input" @@ -17,9 +19,8 @@ import ( type Registrar struct { Channel chan []*FileEvent done chan struct{} - registryFile string // Path to the Registry File - state map[string]FileState // Map with all file paths inside and the corresponding state - stateMutex sync.Mutex + registryFile string // Path to the Registry File + state *input.States // Map with all file paths inside and the corresponding state wg sync.WaitGroup } @@ -28,7 +29,7 @@ func NewRegistrar(registryFile string) (*Registrar, error) { r := &Registrar{ registryFile: registryFile, done: make(chan struct{}), - state: map[string]FileState{}, + state: input.NewStates(), Channel: make(chan []*FileEvent, 1), wg: sync.WaitGroup{}, } @@ -63,14 +64,84 @@ func (r *Registrar) Init() error { // loadState fetches the previous reading state from the configure RegistryFile file // The default file is `registry` in the data path. -func (r *Registrar) LoadState() { - if existing, e := os.Open(r.registryFile); e == nil { - defer existing.Close() +func (r *Registrar) LoadState() error { + + // Check if files exists + _, err := os.Stat(r.registryFile) + if err != nil && !os.IsNotExist(err) { + return err + } + + // Error means no file found + if err != nil { + logp.Info("No registry file found under: %s. Creating a new registry file.", r.registryFile) + return nil + } + + file, err := os.Open(r.registryFile) + if err != nil { + return err + } + + defer file.Close() + + logp.Info("Loading registrar data from %s", r.registryFile) + + // DEPRECATED: This should be removed in 6.0 + oldStates := r.loadAndConvertOldState(file) + if oldStates { + return nil + } + + decoder := json.NewDecoder(file) + states := []input.FileState{} + decoder.Decode(&states) + + r.state.SetStates(states) + logp.Info("States Loaded from registrar: %+v", len(states)) + + return nil +} + +// loadAndConvertOldState loads the old state file and converts it to the new state +// This is designed so it can be easily removed in later versions +func (r *Registrar) loadAndConvertOldState(file *os.File) bool { + // Make sure file reader is reset afterwards + defer file.Seek(0, 0) + + decoder := json.NewDecoder(file) + oldStates := map[string]FileState{} + err := decoder.Decode(&oldStates) + + if err != nil { + logp.Debug("registrar", "Error decoding old state: %+v", err) + return false + } + + // No old states found -> probably already new format + if oldStates == nil { + return false + } - logp.Info("Loading registrar data from %s", r.registryFile) - decoder := json.NewDecoder(existing) - decoder.Decode(&r.state) + // Convert old states to new states + states := make([]input.FileState, len(oldStates)) + logp.Info("Old registry states found: %v", len(oldStates)) + counter := 0 + for _, state := range oldStates { + // Makes time last_seen time of migration, as this is the best guess + state.LastSeen = time.Now() + states[counter] = state + counter++ } + + r.state.SetStates(states) + + // Rewrite registry in new format + r.writeRegistry() + + logp.Info("Old states converted to new states and written to registrar: %v", len(oldStates)) + + return true } func (r *Registrar) Start() { @@ -112,22 +183,17 @@ func (r *Registrar) processEventStates(events []*FileEvent) { if event.InputType == cfg.StdinInputType { continue } - - r.setState(event.Source, event.FileState) + r.state.Update(event.FileState) } } +// Stop stops the registry. It waits until Run function finished. func (r *Registrar) Stop() { logp.Info("Stopping Registrar") close(r.done) r.wg.Wait() } -func (r *Registrar) GetFileState(path string) (FileState, bool) { - state, exist := r.getStateEntry(path) - return state, exist -} - // writeRegistry writes the new json registry file to disk. func (r *Registrar) writeRegistry() error { logp.Debug("registrar", "Write registry file: %s", r.registryFile) @@ -139,84 +205,15 @@ func (r *Registrar) writeRegistry() error { return e } - encoder := json.NewEncoder(file) + states := r.state.GetStates() - state := r.getState() - encoder.Encode(state) + encoder := json.NewEncoder(file) + encoder.Encode(states) // Directly close file because of windows file.Close() - logp.Info("Registry file updated. %d states written.", len(state)) + logp.Info("Registry file updated. %d states written.", len(states)) return SafeFileRotate(r.registryFile, tempfile) } - -func (r *Registrar) fetchState(filePath string, fileInfo os.FileInfo) int64 { - - if previous, err := r.getPreviousFile(filePath, fileInfo); err == nil { - - if previous != filePath { - // File has rotated between shutdown and startup - // We return last state downstream, with a modified event source with the new file name - // And return the offset - also force harvest in case the file is old and we're about to skip it - logp.Info("Detected rename of a previously harvested file: %s -> %s", previous, filePath) - } - - logp.Info("Previous state for file %s found", filePath) - - lastState, _ := r.GetFileState(previous) - return lastState.Offset - } - - logp.Info("New file. Start reading from the beginning: %s", filePath) - - // New file so just start from the beginning - return 0 -} - -// getPreviousFile checks in the registrar if there is the newFile already exist with a different name -// In case an old file is found, the path to the file is returned, if not, an error is returned -func (r *Registrar) getPreviousFile(newFilePath string, newFileInfo os.FileInfo) (string, error) { - - newState := input.GetOSFileState(newFileInfo) - - for oldFilePath, oldState := range r.getState() { - - // Compare states - if newState.IsSame(oldState.FileStateOS) { - logp.Info("Old file with new name found: %s -> %s", oldFilePath, newFilePath) - return oldFilePath, nil - } - } - - return "", fmt.Errorf("No previous file found") -} - -func (r *Registrar) setState(path string, state FileState) { - r.stateMutex.Lock() - defer r.stateMutex.Unlock() - - r.state[path] = state -} - -func (r *Registrar) getStateEntry(path string) (FileState, bool) { - r.stateMutex.Lock() - defer r.stateMutex.Unlock() - - state, exist := r.state[path] - return state, exist -} - -func (r *Registrar) getState() map[string]FileState { - r.stateMutex.Lock() - defer r.stateMutex.Unlock() - - copy := make(map[string]FileState) - - for k, v := range r.state { - copy[k] = v - } - - return copy -} diff --git a/filebeat/input/state.go b/filebeat/input/state.go index 106fd855917..ae3af93d6b1 100644 --- a/filebeat/input/state.go +++ b/filebeat/input/state.go @@ -15,6 +15,7 @@ type FileState struct { Finished bool `json:"-"` // harvester state Fileinfo os.FileInfo `json:"-"` // the file info FileStateOS FileStateOS + LastSeen time.Time `json:"last_seen"` } // NewFileState creates a new file state @@ -24,6 +25,7 @@ func NewFileState(fileInfo os.FileInfo, path string) FileState { Source: path, Finished: false, FileStateOS: GetOSFileState(fileInfo), + LastSeen: time.Now(), } } @@ -45,6 +47,7 @@ func (s *States) Update(newState FileState) { defer s.mutex.Unlock() index, oldState := s.findPrevious(newState) + newState.LastSeen = time.Now() if index >= 0 { s.states[index] = newState @@ -84,10 +87,9 @@ func (s *States) Cleanup(older time.Duration) { defer s.mutex.Unlock() for i, state := range s.states { - // File is older then ignore_older -> remove state - modTime := state.Fileinfo.ModTime() - if time.Since(modTime) > older { + // File wasn't seen for longer then older -> remove state + if time.Since(state.LastSeen) > older { logp.Debug("prospector", "State removed for %s because of older: %s", state.Source) s.states = append(s.states[:i], s.states[i+1:]...) } @@ -101,3 +103,25 @@ func (s *States) Count() int { defer s.mutex.Unlock() return len(s.states) } + +// Returns a copy of the file states +func (s *States) GetStates() []FileState { + + s.mutex.Lock() + defer s.mutex.Unlock() + + copy := make([]FileState, len(s.states)) + + for i := range s.states { + copy[i] = s.states[i] + } + + return copy +} + +// SetStates overwrites all internal states with the given states array +func (s *States) SetStates(states []FileState) { + s.mutex.Lock() + defer s.mutex.Unlock() + s.states = states +} diff --git a/filebeat/tests/system/filebeat.py b/filebeat/tests/system/filebeat.py index a584cb0e234..cb6372a53bf 100644 --- a/filebeat/tests/system/filebeat.py +++ b/filebeat/tests/system/filebeat.py @@ -21,3 +21,26 @@ def get_registry(self): with open(dotFilebeat) as file: return json.load(file) + + def get_registry_entry_by_path(self, path): + """ + Fetches the registry file and checks if an entry for the given path exists + If the path exists, the state for the given path is returned + If a path exists multiple times (which is possible because of file rotation) + the most recent version is returned + """ + registry = self.get_registry() + + tmp_entry = None + + # Checks all entries and returns the most recent one + for entry in registry: + if entry["source"] == path: + if tmp_entry == None: + tmp_entry = entry + else: + if tmp_entry["last_seen"] < entry["last_seen"]: + tmp_entry = entry + + return tmp_entry + diff --git a/filebeat/tests/system/test_crawler.py b/filebeat/tests/system/test_crawler.py index 3e1995860cd..62c648220a5 100644 --- a/filebeat/tests/system/test_crawler.py +++ b/filebeat/tests/system/test_crawler.py @@ -318,8 +318,8 @@ def test_file_disappear_appear(self): data = self.get_registry() # Make sure new file was picked up. As it has the same file name, - # only one entry exists - assert len(data) == 1 + # one entry for the new file and one for the old should exist + assert len(data) == 2 # Make sure output has 11 entries, the new file was started # from scratch @@ -376,8 +376,8 @@ def test_force_close(self): data = self.get_registry() # Make sure new file was picked up. As it has the same file name, - # only one entry exists - assert len(data) == 1 + # one entry for the new and one for the old should exist + assert len(data) == 2 # Make sure output has 11 entries, the new file was started # from scratch diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index 56e7bc7e3f3..95f90fed298 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -4,6 +4,7 @@ import platform import time import shutil +import json from nose.plugins.skip import Skip, SkipTest @@ -59,7 +60,7 @@ def test_registrar_file_content(self): assert len(data) == 1 logFileAbsPath = os.path.abspath(testfile) - record = data[logFileAbsPath] + record = self.get_registry_entry_by_path(logFileAbsPath) self.assertDictContainsSubset({ "source": logFileAbsPath, @@ -192,8 +193,12 @@ def test_rotating_file(self): data = self.get_registry() # Make sure the offsets are correctly set - data[os.path.abspath(testfile)]["offset"] = 10 - data[os.path.abspath(testfilerenamed)]["offset"] = 9 + if os.name == "nt": + assert self.get_registry_entry_by_path(os.path.abspath(testfile))["offset"] == 11 + assert self.get_registry_entry_by_path(os.path.abspath(testfilerenamed))["offset"] == 10 + else: + assert self.get_registry_entry_by_path(os.path.abspath(testfile))["offset"] == 10 + assert self.get_registry_entry_by_path(os.path.abspath(testfilerenamed))["offset"] == 9 # Check that 2 files are port of the registrar file assert len(data) == 2 @@ -240,7 +245,7 @@ def test_rotating_file_inode(self): max_timeout=10) data = self.get_registry() - assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + assert os.stat(testfile).st_ino == self.get_registry_entry_by_path(os.path.abspath(testfile))["FileStateOS"]["inode"] testfilerenamed1 = self.working_dir + "/log/input.1" os.rename(testfile, testfilerenamed1) @@ -257,8 +262,8 @@ def test_rotating_file_inode(self): data = self.get_registry() - assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] - assert os.stat(testfilerenamed1).st_ino == data[os.path.abspath(testfilerenamed1)]["FileStateOS"]["inode"] + assert os.stat(testfile).st_ino == self.get_registry_entry_by_path(os.path.abspath(testfile))["FileStateOS"]["inode"] + assert os.stat(testfilerenamed1).st_ino == self.get_registry_entry_by_path(os.path.abspath(testfilerenamed1))["FileStateOS"]["inode"] # Rotate log file, create a new empty one and remove it afterwards testfilerenamed2 = self.working_dir + "/log/input.2" @@ -282,11 +287,11 @@ def test_rotating_file_inode(self): data = self.get_registry() # Compare file inodes and the one in the registry - assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] - assert os.stat(testfilerenamed1).st_ino == data[os.path.abspath(testfilerenamed1)]["FileStateOS"]["inode"] + assert os.stat(testfile).st_ino == self.get_registry_entry_by_path(os.path.abspath(testfile))["FileStateOS"]["inode"] + assert os.stat(testfilerenamed1).st_ino == self.get_registry_entry_by_path(os.path.abspath(testfilerenamed1))["FileStateOS"]["inode"] - # Check that 2 files are part of the registrar file. The deleted file should never have been detected - assert len(data) == 2 + # Check that 3 files are part of the registrar file. The deleted file should never have been detected, but the rotated one should be in + assert len(data) == 3 def test_restart_continue(self): @@ -316,8 +321,7 @@ def test_restart_continue(self): # Wait a momemt to make sure registry is completely written time.sleep(1) - data = self.get_registry() - assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + assert os.stat(testfile).st_ino == self.get_registry_entry_by_path(os.path.abspath(testfile))["FileStateOS"]["inode"] filebeat.check_kill_and_wait() @@ -344,7 +348,7 @@ def test_restart_continue(self): data = self.get_registry() # Compare file inodes and the one in the registry - assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + assert os.stat(testfile).st_ino == self.get_registry_entry_by_path(os.path.abspath(testfile))["FileStateOS"]["inode"] # Check that 1 files are part of the registrar file. The deleted file should never have been detected assert len(data) == 1 @@ -384,7 +388,7 @@ def test_rotating_file_with_restart(self): time.sleep(1) data = self.get_registry() - assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] + assert os.stat(testfile).st_ino == self.get_registry_entry_by_path(os.path.abspath(testfile))["FileStateOS"]["inode"] testfilerenamed1 = self.working_dir + "/log/input.1" os.rename(testfile, testfilerenamed1) @@ -401,8 +405,8 @@ def test_rotating_file_with_restart(self): data = self.get_registry() - assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] - assert os.stat(testfilerenamed1).st_ino == data[os.path.abspath(testfilerenamed1)]["FileStateOS"]["inode"] + assert os.stat(testfile).st_ino == self.get_registry_entry_by_path(os.path.abspath(testfile))["FileStateOS"]["inode"] + assert os.stat(testfilerenamed1).st_ino == self.get_registry_entry_by_path(os.path.abspath(testfilerenamed1))["FileStateOS"]["inode"] filebeat.check_kill_and_wait() @@ -438,11 +442,11 @@ def test_rotating_file_with_restart(self): data = self.get_registry() # Compare file inodes and the one in the registry - assert os.stat(testfile).st_ino == data[os.path.abspath(testfile)]["FileStateOS"]["inode"] - assert os.stat(testfilerenamed1).st_ino == data[os.path.abspath(testfilerenamed1)]["FileStateOS"]["inode"] + assert os.stat(testfile).st_ino == self.get_registry_entry_by_path(os.path.abspath(testfile))["FileStateOS"]["inode"] + assert os.stat(testfilerenamed1).st_ino == self.get_registry_entry_by_path(os.path.abspath(testfilerenamed1))["FileStateOS"]["inode"] - # Check that 2 files are part of the registrar file. The deleted file should never have been detected - assert len(data) == 2 + # Check that 3 files are part of the registrar file. The deleted file should never have been detected, but the rotated one should be in + assert len(data) == 3 def test_state_after_rotation(self): """ @@ -478,11 +482,11 @@ def test_state_after_rotation(self): # Check that offsets are correct if os.name == "nt": # Under windows offset is +1 because of additional newline char - assert data[os.path.abspath(testfile1)]["offset"] == 9 - assert data[os.path.abspath(testfile2)]["offset"] == 8 + assert self.get_registry_entry_by_path(os.path.abspath(testfile1))["offset"] == 9 + assert self.get_registry_entry_by_path(os.path.abspath(testfile2))["offset"] == 8 else: - assert data[os.path.abspath(testfile1)]["offset"] == 8 - assert data[os.path.abspath(testfile2)]["offset"] == 7 + assert self.get_registry_entry_by_path(os.path.abspath(testfile1))["offset"] == 8 + assert self.get_registry_entry_by_path(os.path.abspath(testfile2))["offset"] == 7 # Rotate files and remove old one os.rename(testfile2, testfile3) @@ -508,18 +512,14 @@ def test_state_after_rotation(self): time.sleep(5) filebeat.kill_and_wait() - - data = self.get_registry() - # Check that offsets are correct if os.name == "nt": # Under windows offset is +1 because of additional newline char - assert data[os.path.abspath(testfile1)]["offset"] == 10 - assert data[os.path.abspath(testfile2)]["offset"] == 9 + assert self.get_registry_entry_by_path(os.path.abspath(testfile1))["offset"] == 10 + assert self.get_registry_entry_by_path(os.path.abspath(testfile2))["offset"] == 9 else: - assert data[os.path.abspath(testfile1)]["offset"] == 9 - assert data[os.path.abspath(testfile2)]["offset"] == 8 - + assert self.get_registry_entry_by_path(os.path.abspath(testfile1))["offset"] == 9 + assert self.get_registry_entry_by_path(os.path.abspath(testfile2))["offset"] == 8 def test_state_after_rotation_ignore_older(self): @@ -561,9 +561,9 @@ def test_state_after_rotation_ignore_older(self): # Check that offsets are correct if os.name == "nt": # Under windows offset is +1 because of additional newline char - assert data[os.path.abspath(testfile1)]["offset"] == 9 + assert self.get_registry_entry_by_path(os.path.abspath(testfile1))["offset"] == 9 else: - assert data[os.path.abspath(testfile1)]["offset"] == 8 + assert self.get_registry_entry_by_path(os.path.abspath(testfile1))["offset"] == 8 # Rotate files and remove old one os.rename(testfile2, testfile3) @@ -589,14 +589,112 @@ def test_state_after_rotation_ignore_older(self): time.sleep(5) filebeat.kill_and_wait() - - data = self.get_registry() - # Check that offsets are correct if os.name == "nt": # Under windows offset is +1 because of additional newline char - assert data[os.path.abspath(testfile1)]["offset"] == 10 - assert data[os.path.abspath(testfile2)]["offset"] == 9 + assert self.get_registry_entry_by_path(os.path.abspath(testfile1))["offset"] == 10 + assert self.get_registry_entry_by_path(os.path.abspath(testfile2))["offset"] == 9 else: - assert data[os.path.abspath(testfile1)]["offset"] == 9 - assert data[os.path.abspath(testfile2)]["offset"] == 8 + assert self.get_registry_entry_by_path(os.path.abspath(testfile1))["offset"] == 9 + assert self.get_registry_entry_by_path(os.path.abspath(testfile2))["offset"] == 8 + + + def test_migration_non_windows(self): + """ + Tests if migration from old filebeat registry to new format works + """ + + if os.name == "nt": + raise SkipTest + + registry_file = self.working_dir + '/registry' + + # Write old registry file + with open(registry_file, 'w') as f: + f.write('{"logs/hello.log":{"source":"logs/hello.log","offset":4,"FileStateOS":{"inode":30178938,"device":16777220}},"logs/log2.log":{"source":"logs/log2.log","offset":6,"FileStateOS":{"inode":30178958,"device":16777220}}}') + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/input*", + ) + + filebeat = self.start_beat() + + self.wait_until( + lambda: self.log_contains("Old registry states found: 2"), + max_timeout=15) + + self.wait_until( + lambda: self.log_contains("Old states converted to new states and written to registrar: 2"), + max_timeout=15) + + filebeat.check_kill_and_wait() + + # Check if content is same as above + assert self.get_registry_entry_by_path("logs/hello.log")["offset"] == 4 + assert self.get_registry_entry_by_path("logs/log2.log")["offset"] == 6 + + # Compare first entry + oldJson = json.loads('{"source":"logs/hello.log","offset":4,"FileStateOS":{"inode":30178938,"device":16777220}}') + newJson = self.get_registry_entry_by_path("logs/hello.log") + del newJson["last_seen"] + assert newJson == oldJson + + # Compare second entry + oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"inode":30178958,"device":16777220}}') + newJson = self.get_registry_entry_by_path("logs/log2.log") + del newJson["last_seen"] + assert newJson == oldJson + + # Make sure the right number of entries is in + data = self.get_registry() + assert len(data) == 2 + + def test_migration_windows(self): + """ + Tests if migration from old filebeat registry to new format works + """ + + if os.name != "nt": + raise SkipTest + + registry_file = self.working_dir + '/registry' + + # Write old registry file + with open(registry_file, 'w') as f: + f.write('{"logs/hello.log":{"source":"logs/hello.log","offset":4,"FileStateOS":{"idxhi":1,"idxlo":12,"vol":34}},"logs/log2.log":{"source":"logs/log2.log","offset":6,"FileStateOS":{"idxhi":67,"idxlo":44,"vol":12}}}') + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/input*", + ) + + filebeat = self.start_beat() + + self.wait_until( + lambda: self.log_contains("Old registry states found: 2"), + max_timeout=15) + + self.wait_until( + lambda: self.log_contains("Old states converted to new states and written to registrar: 2"), + max_timeout=15) + + filebeat.check_kill_and_wait() + + # Check if content is same as above + assert self.get_registry_entry_by_path("logs/hello.log")["offset"] == 4 + assert self.get_registry_entry_by_path("logs/log2.log")["offset"] == 6 + + # Compare first entry + oldJson = json.loads('{"source":"logs/hello.log","offset":4,"FileStateOS":{"idxhi":1,"idxlo":12,"vol":34}}') + newJson = self.get_registry_entry_by_path("logs/hello.log") + del newJson["last_seen"] + assert newJson == oldJson + + # Compare second entry + oldJson = json.loads('{"source":"logs/log2.log","offset":6,"FileStateOS":{"idxhi":67,"idxlo":44,"vol":12}}') + newJson = self.get_registry_entry_by_path("logs/log2.log") + del newJson["last_seen"] + assert newJson == oldJson + + # Make sure the right number of entries is in + data = self.get_registry() + assert len(data) == 2 From 3614306e04a1da2c70eef7dccc5e9ab3b4d564b4 Mon Sep 17 00:00:00 2001 From: ruflin Date: Tue, 24 May 2016 14:08:52 +0200 Subject: [PATCH 2/3] Create copy of states --- filebeat/crawler/prospector.go | 2 +- filebeat/input/state.go | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/filebeat/crawler/prospector.go b/filebeat/crawler/prospector.go index e624a57e73b..c1c4f94363e 100644 --- a/filebeat/crawler/prospector.go +++ b/filebeat/crawler/prospector.go @@ -33,7 +33,7 @@ func NewProspector(cfg *common.Config, states input.States, spoolerChan chan *in spoolerChan: spoolerChan, harvesterChan: make(chan *input.FileEvent), done: make(chan struct{}), - states: &states, + states: states.Copy(), wg: sync.WaitGroup{}, } diff --git a/filebeat/input/state.go b/filebeat/input/state.go index ae3af93d6b1..9b660a163a9 100644 --- a/filebeat/input/state.go +++ b/filebeat/input/state.go @@ -125,3 +125,10 @@ func (s *States) SetStates(states []FileState) { defer s.mutex.Unlock() s.states = states } + +// Copy create a new copy of the states object +func (s *States) Copy() *States { + states := NewStates() + states.states = s.GetStates() + return states +} From 40a76c59ebf86a7b07ecb7fc8e97aa5b05b31ccb Mon Sep 17 00:00:00 2001 From: ruflin Date: Tue, 24 May 2016 14:21:30 +0200 Subject: [PATCH 3/3] Shorten copy --- filebeat/input/state.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/filebeat/input/state.go b/filebeat/input/state.go index 9b660a163a9..fc9b696b70e 100644 --- a/filebeat/input/state.go +++ b/filebeat/input/state.go @@ -106,17 +106,13 @@ func (s *States) Count() int { // Returns a copy of the file states func (s *States) GetStates() []FileState { - s.mutex.Lock() defer s.mutex.Unlock() - copy := make([]FileState, len(s.states)) - - for i := range s.states { - copy[i] = s.states[i] - } + newStates := make([]FileState, len(s.states)) + copy(newStates, s.states) - return copy + return newStates } // SetStates overwrites all internal states with the given states array