Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport #4037: Properly shut down crawler in case one prospector is misconfigured #4047

Merged
merged 1 commit into from
Apr 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ https://github.com/elastic/beats/compare/v5.3.0...master[Check the HEAD diff]
*Filebeat*

- Fix modules default file permissions. {pull}3879[3879]
- Properly shut down crawler in case one prospector is misconfigured. {pull}4037[4037]

*Heartbeat*

Expand Down
1 change: 1 addition & 0 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {

err = crawler.Start(registrar, config.ProspectorReload)
if err != nil {
crawler.Stop()
return err
}

Expand Down
2 changes: 1 addition & 1 deletion filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (c *Crawler) startProspector(config *common.Config, states []file.State) er

err = p.LoadStates(states)
if err != nil {
return fmt.Errorf("error loading states for propsector %v: %v", p.ID(), err)
return fmt.Errorf("error loading states for prospector %v: %v", p.ID(), err)
}

c.prospectors[p.ID()] = p
Expand Down
2 changes: 1 addition & 1 deletion filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (p *Prospector) Start() {
logp.Info("Prospector channel stopped")
return
case <-p.beatDone:
logp.Info("Prospector channel stopped")
logp.Info("Prospector channel stopped because beat is stopping.")
return
case event := <-p.harvesterChan:
// No stopping on error, because on error it is expected that beatDone is closed
Expand Down
4 changes: 4 additions & 0 deletions filebeat/prospector/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func NewProspectorLog(p *Prospector) (*ProspectorLog, error) {
config: p.config,
}

if len(p.config.Paths) == 0 {
return nil, fmt.Errorf("each prospector must have at least one path defined")
}

return prospectorer, nil
}

Expand Down
1 change: 1 addition & 0 deletions filebeat/prospector/prospector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestProspectorFileExclude(t *testing.T) {

prospector := Prospector{
config: prospectorConfig{
Paths: []string{"test.log"},
ExcludeFiles: []match.Matcher{match.MustCompile(`\.gz$`)},
},
}
Expand Down
3 changes: 3 additions & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ filebeat.prospectors:
max_lines: {{ max_lines|default(500) }}
{% endif %}
{% endif %}
{% if prospector_raw %}
{{prospector_raw}}
{% endif %}

filebeat.spool_size:
filebeat.shutdown_timeout: {{ shutdown_timeout|default(0) }}
Expand Down
24 changes: 24 additions & 0 deletions filebeat/tests/system/test_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,27 @@ def nasa_logs(self):
self.copy_files(["logs/nasa-50k.log"],
source_dir="../files",
target_dir="log")

def test_stopping_empty_path(self):
"""
Test filebeat stops properly when 1 prospector has an invalid config.
"""

prospector_raw = """
- input_type: log
paths: []
"""

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
prospector_raw=prospector_raw,
)
filebeat = self.start_beat()
time.sleep(2)

# Wait until first flush
self.wait_until(
lambda: self.log_contains_count("No paths were defined for prospector") >= 1,
max_timeout=5)

filebeat.check_wait(exit_code=1)