diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 572d363cd3c..b7f60fee388 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -36,6 +36,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha1...master[Check the HEAD d *Filebeat* - Fix race condition on harvester stopping with reloading enabled. {issue}3779[3779] +- Fix recursive glob config parsing and resolution across restarts. {pull}4269[4269] *Heartbeat* diff --git a/filebeat/input/file/glob.go b/filebeat/input/file/glob.go index 71917fcbaa4..2e3c82ab612 100644 --- a/filebeat/input/file/glob.go +++ b/filebeat/input/file/glob.go @@ -21,8 +21,8 @@ func wildcards(doubleStarPatternDepth uint8, dir string, suffix string) []string return wildcardList } -// globPattern detects the use of "**" and expands it to standard glob patterns up to a max depth -func globPatterns(pattern string, doubleStarPatternDepth uint8) ([]string, error) { +// GlobPatterns detects the use of "**" and expands it to standard glob patterns up to a max depth +func GlobPatterns(pattern string, doubleStarPatternDepth uint8) ([]string, error) { if doubleStarPatternDepth == 0 { return []string{pattern}, nil } @@ -54,7 +54,7 @@ func globPatterns(pattern string, doubleStarPatternDepth uint8) ([]string, error // Glob expands '**' patterns into multiple patterns to satisfy https://golang.org/pkg/path/filepath/#Match func Glob(pattern string, doubleStarPatternDepth uint8) ([]string, error) { - patterns, err := globPatterns(pattern, doubleStarPatternDepth) + patterns, err := GlobPatterns(pattern, doubleStarPatternDepth) if err != nil { return nil, err } diff --git a/filebeat/input/file/glob_test.go b/filebeat/input/file/glob_test.go index aa9d2c58e63..2f3c8a56b48 100644 --- a/filebeat/input/file/glob_test.go +++ b/filebeat/input/file/glob_test.go @@ -61,7 +61,7 @@ type globPatternsTest struct { func TestGlobPatterns(t *testing.T) { for _, test := range globPatternsTests { - patterns, err := globPatterns(test.pattern, 2) + patterns, err := GlobPatterns(test.pattern, 2) if err != nil { if test.expectedError { continue diff --git a/filebeat/prospector/log/config.go b/filebeat/prospector/log/config.go index 981ffb64f7e..913c0142b40 100644 --- a/filebeat/prospector/log/config.go +++ b/filebeat/prospector/log/config.go @@ -8,7 +8,9 @@ import ( cfg "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester" "github.com/elastic/beats/filebeat/harvester/reader" + "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/libbeat/common/match" + "github.com/elastic/beats/libbeat/logp" ) var ( @@ -62,7 +64,7 @@ type config struct { HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"` Symlinks bool `config:"symlinks"` TailFiles bool `config:"tail_files"` - recursiveGlob bool `config:"recursive_glob.enabled"` + RecursiveGlob bool `config:"recursive_glob.enabled"` // Harvester BufferSize int `config:"harvester_buffer_size"` @@ -124,3 +126,25 @@ func (c *config) Validate() error { return nil } + +func (c *config) resolvePaths() error { + var paths []string + if !c.RecursiveGlob { + logp.Debug("prospector", "recursive glob disabled") + paths = c.Paths + } else { + logp.Debug("prospector", "recursive glob enabled") + } + for _, path := range c.Paths { + patterns, err := file.GlobPatterns(path, recursiveGlobDepth) + if err != nil { + return err + } + if len(patterns) > 1 { + logp.Debug("prospector", "%q expanded to %#v", path, patterns) + } + paths = append(paths, patterns...) + } + c.Paths = paths + return nil +} diff --git a/filebeat/prospector/log/prospector.go b/filebeat/prospector/log/prospector.go index 944d7b95130..2c8d5647d2d 100644 --- a/filebeat/prospector/log/prospector.go +++ b/filebeat/prospector/log/prospector.go @@ -36,7 +36,7 @@ type Prospector struct { done chan struct{} } -// NewLog instantiates a new Log +// NewProspector instantiates a new Log func NewProspector(cfg *common.Config, states []file.State, outlet channel.Outleter, done chan struct{}) (*Prospector, error) { p := &Prospector{ @@ -51,6 +51,10 @@ func NewProspector(cfg *common.Config, states []file.State, outlet channel.Outle if err := cfg.Unpack(&p.config); err != nil { return nil, err } + if err := p.config.resolvePaths(); err != nil { + logp.Err("Failed to resolve paths in config: %+v", err) + return nil, err + } // Create empty harvester to check if configs are fine // TODO: Do config validation instead @@ -175,11 +179,7 @@ func (p *Prospector) getFiles() map[string]os.FileInfo { paths := map[string]os.FileInfo{} for _, path := range p.config.Paths { - depth := uint8(0) - if p.config.recursiveGlob { - depth = recursiveGlobDepth - } - matches, err := file.Glob(path, depth) + matches, err := filepath.Glob(path) if err != nil { logp.Err("glob(%s) failed: %v", path, err) continue diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index 1ad3312c48e..fd15fbdfc73 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -10,6 +10,8 @@ filebeat.prospectors: # Paths that should be crawled and fetched {% if path %}paths: - {{ path }}{% endif %} + {% if recursive_glob %}recursive_glob.enabled: true + {% endif %} # Type of the files. Annotated in every documented scan_frequency: {{scan_frequency | default("0.1s") }} ignore_older: {{ignore_older}} diff --git a/filebeat/tests/system/test_harvester.py b/filebeat/tests/system/test_harvester.py index f383ec3f2fd..d999ff2ddad 100644 --- a/filebeat/tests/system/test_harvester.py +++ b/filebeat/tests/system/test_harvester.py @@ -614,7 +614,7 @@ def test_symlink_rotated(self): # Check if two different files are in registry data = self.get_registry() - assert len(data) == 2 + assert len(data) == 2, "expected to see 2 entries, got '%s'" % data def test_symlink_removed(self): """ diff --git a/filebeat/tests/system/test_prospector.py b/filebeat/tests/system/test_prospector.py old mode 100644 new mode 100755 index 42a654ff3ed..3e1d85deef4 --- a/filebeat/tests/system/test_prospector.py +++ b/filebeat/tests/system/test_prospector.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + from filebeat import BaseTest import os import time @@ -678,3 +680,47 @@ def test_prospector_filter_includefields(self): )[0] assert "message" not in output assert "offset" in output + + def test_restart_recursive_glob(self): + """ + Check that file reading via recursive glob patterns continues after restart + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/**", + scan_frequency="1s", + recursive_glob=True, + ) + + testfile_dir = os.path.join(self.working_dir, "log", "some", "other", "subdir") + os.makedirs(testfile_dir) + testfile_path = os.path.join(testfile_dir, "input") + + filebeat = self.start_beat() + + with open(testfile_path, 'w') as testfile: + testfile.write("entry1\n") + + self.wait_until( + lambda: self.output_has_message("entry1"), + max_timeout=10, + name="output contains 'entry1'") + + filebeat.check_kill_and_wait() + + # Append to file + with open(testfile_path, 'a') as testfile: + testfile.write("entry2\n") + + filebeat = self.start_beat(output="filebeat2.log") + + self.wait_until( + lambda: self.output_has_message("entry2"), + max_timeout=10, + name="output contains 'entry2'") + + filebeat.check_kill_and_wait() + + +if __name__ == '__main__': + import unittest + unittest.main() diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py old mode 100644 new mode 100755 index dd22f62a88b..2014d34f6cc --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -1,9 +1,11 @@ +#!/usr/bin/env python """Test the registrar""" import os import platform import time import shutil + from filebeat import BaseTest from nose.plugins.skip import SkipTest @@ -323,7 +325,7 @@ def test_rotating_file_inode(self): def test_restart_continue(self): """ - Check that file readining continues after restart + Check that file reading continues after restart """ self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/input*", @@ -1396,3 +1398,8 @@ def test_registrar_files_with_prospector_level_processors(self): "inode": stat.st_ino, "device": stat.st_dev, }, file_state_os) + + +if __name__ == '__main__': + import unittest + unittest.main() diff --git a/libbeat/tests/system/beat/beat.py b/libbeat/tests/system/beat/beat.py index 61d43dd0a1e..8a6897cc58e 100644 --- a/libbeat/tests/system/beat/beat.py +++ b/libbeat/tests/system/beat/beat.py @@ -17,6 +17,10 @@ INTEGRATION_TESTS = os.environ.get('INTEGRATION_TESTS', False) +class TimeoutError(Exception): + pass + + class Proc(object): """ Slim wrapper on subprocess.Popen that redirects @@ -279,9 +283,8 @@ def wait_until(self, cond, max_timeout=10, poll_interval=0.1, name="cond"): start = datetime.now() while not cond(): if datetime.now() - start > timedelta(seconds=max_timeout): - raise Exception("Timeout waiting for '{}' to be true. " - .format(name) + - "Waited {} seconds.".format(max_timeout)) + raise TimeoutError("Timeout waiting for '{}' to be true. ".format(name) + + "Waited {} seconds.".format(max_timeout)) time.sleep(poll_interval) def get_log(self, logfile=None): @@ -351,6 +354,16 @@ def output_has(self, lines, output_file=None): except IOError: return False + def output_has_message(self, message, output_file=None): + """ + Returns true if the output has the given message field. + """ + try: + return any(line for line in self.read_output(output_file=output_file, required_fields=["message"]) + if line.get("message") == message) + except (IOError, TypeError): + return False + def all_have_fields(self, objs, fields): """ Checks that the given list of output objects have