From 6fbf4439f43d875e5e20059b67b5a19ef3365d35 Mon Sep 17 00:00:00 2001 From: ruflin Date: Wed, 6 Jan 2016 17:08:49 +0100 Subject: [PATCH] Close filebeat if prospector without paths exists * Add test for harvesters --- CHANGELOG.asciidoc | 2 +- filebeat/crawler/prospector.go | 4 ++ filebeat/crawler/prospector_test.go | 18 +++++++ filebeat/tests/system/test_prospector.py | 66 ++++++++++++++++++++++-- 4 files changed, 86 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index e68cf2f2b8a..dbf6fd5e64d 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -35,7 +35,7 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff] *Filebeat* - Add exclude_files configuration option {pull}563[563] -- Stop filebeat if filebeat is started without any prospectors defined {pull}644[644] +- Stop filebeat if filebeat is started without any prospectors defined or empty prospectors {pull}644[644] {pull}647[647] *Winlogbeat* diff --git a/filebeat/crawler/prospector.go b/filebeat/crawler/prospector.go index 792ae477b8f..4ccbc51b625 100644 --- a/filebeat/crawler/prospector.go +++ b/filebeat/crawler/prospector.go @@ -60,6 +60,10 @@ func (p *Prospector) setupProspectorConfig() error { // Init File Stat list p.prospectorList = make(map[string]harvester.FileStat) + if config.Harvester.InputType == cfg.LogInputType && len(config.Paths) == 0 { + return fmt.Errorf("No paths were defined for prospector") + } + return nil } diff --git a/filebeat/crawler/prospector_test.go b/filebeat/crawler/prospector_test.go index 8ffe9a733f7..19124c66800 100644 --- a/filebeat/crawler/prospector_test.go +++ b/filebeat/crawler/prospector_test.go @@ -124,6 +124,7 @@ func TestProspectorInitInvalidIgnoreOlder(t *testing.T) { func TestProspectorInitInputTypeLog(t *testing.T) { prospectorConfig := config.ProspectorConfig{ + Paths: []string{"testpath1", "testpath2"}, Harvester: config.HarvesterConfig{ InputType: "log", }, @@ -138,6 +139,23 @@ func TestProspectorInitInputTypeLog(t *testing.T) { assert.Equal(t, "log", prospector.ProspectorConfig.Harvester.InputType) } +func TestProspectorInitInputTypeLogError(t *testing.T) { + + prospectorConfig := config.ProspectorConfig{ + Harvester: config.HarvesterConfig{ + InputType: "log", + }, + } + + prospector := Prospector{ + ProspectorConfig: prospectorConfig, + } + + err := prospector.Init() + // Error should be returned because no path is set + assert.Error(t, err) +} + func TestProspectorInitInputTypeStdin(t *testing.T) { prospectorConfig := config.ProspectorConfig{ diff --git a/filebeat/tests/system/test_prospector.py b/filebeat/tests/system/test_prospector.py index 2fe1edddd21..1dc70fd2c68 100644 --- a/filebeat/tests/system/test_prospector.py +++ b/filebeat/tests/system/test_prospector.py @@ -185,7 +185,7 @@ def test_rotating_ignore_older_low_write_rate(self): os.mkdir(self.working_dir + "/log/") testfile = self.working_dir + "/log/test.log" - proc = self.start_filebeat(debug_selectors=['*']) + filebeat = self.start_filebeat(debug_selectors=['*']) # wait for first "Start next scan" log message self.wait_until( @@ -230,14 +230,17 @@ def test_rotating_ignore_older_low_write_rate(self): lambda: self.output_count(lambda x: x >= lines), max_timeout=5) - proc.kill_and_wait() + filebeat.kill_and_wait() def test_shutdown_no_prospectors(self): + """ + In case no prospectors are defined, filebeat must shut down and report an error + """ self.render_config_template( prospectors=False, ) - proc = self.start_filebeat(debug_selectors=['*']) + filebeat = self.start_filebeat(debug_selectors=['*']) # wait for first "Start next scan" log message self.wait_until( @@ -249,3 +252,60 @@ def test_shutdown_no_prospectors(self): lambda: self.log_contains( "shutting down"), max_timeout=10) + + filebeat.kill_and_wait() + + + def test_no_paths_defined(self): + """ + In case a prospector is defined but doesn't contain any paths, prospector must return error which + leads to shutdown of filebeat because of configuration error + """ + self.render_config_template( + ) + + filebeat = self.start_filebeat(debug_selectors=['*']) + + # wait for first "Start next scan" log message + self.wait_until( + lambda: self.log_contains( + "No paths were defined for prospector"), + max_timeout=10) + + self.wait_until( + lambda: self.log_contains( + "shutting down"), + max_timeout=10) + + filebeat.kill_and_wait() + + + def test_files_added_late(self): + """ + Tests that prospectors stay running even though no harvesters are started yet + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + ) + + os.mkdir(self.working_dir + "/log/") + + filebeat = self.start_filebeat(debug_selectors=['*']) + + # wait until events are sent for the first time + self.wait_until( + lambda: self.log_contains( + "Events flushed"), + max_timeout=10) + + testfile = self.working_dir + "/log/test.log" + with open(testfile, 'a') as file: + file.write("Hello World1\n") + file.write("Hello World2\n") + + # wait for log to be read + self.wait_until( + lambda: self.output_has(lines=2), + max_timeout=15) + + filebeat.kill_and_wait()