Skip to content

Commit

Permalink
Properly shut down crawler in case one prospector is misconfigured (e…
Browse files Browse the repository at this point in the history
…lastic#4037)

If one prospector started to already send data and a second one was misconfigured, the beat paniced during shutdown. This is no prevented by properly shutting down the crawler also on error.

Closes elastic#3917
(cherry picked from commit 95195cc)
  • Loading branch information
ruflin committed Apr 19, 2017
1 parent 2ca1dab commit c39a3e9
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 5 deletions.
4 changes: 1 addition & 3 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ https://github.com/elastic/beats/compare/v5.3.1...master[Check the HEAD diff]
*Affecting all Beats*

*Filebeat*

- Properly shut down crawler in case one prospector is misconfigured. {pull}4037[4037]

*Heartbeat*

Expand Down Expand Up @@ -138,8 +138,6 @@ https://github.com/elastic/beats/compare/v5.2.2...v5.3.0[View commits]
filebeat.config_dir. {pull}3573[3573]
- Fix empty registry file on machine crash. {issue}3537[3537]
- Fix empty registry file on machine crash. {issue}3537[3537]
*Metricbeat*
- Add error handling to system process metricset for when Linux cgroups are missing from the kernel. {pull}3692[3692]
Expand Down
1 change: 1 addition & 0 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {

err = crawler.Start(registrar, config.ProspectorReload)
if err != nil {
crawler.Stop()
return err
}

Expand Down
2 changes: 1 addition & 1 deletion filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (c *Crawler) startProspector(config *common.Config, states []file.State) er

err = p.LoadStates(states)
if err != nil {
return fmt.Errorf("error loading states for propsector %v: %v", p.ID(), err)
return fmt.Errorf("error loading states for prospector %v: %v", p.ID(), err)
}

c.prospectors[p.ID()] = p
Expand Down
2 changes: 1 addition & 1 deletion filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (p *Prospector) Start() {
logp.Info("Prospector channel stopped")
return
case <-p.beatDone:
logp.Info("Prospector channel stopped")
logp.Info("Prospector channel stopped because beat is stopping.")
return
case event := <-p.harvesterChan:
// No stopping on error, because on error it is expected that beatDone is closed
Expand Down
4 changes: 4 additions & 0 deletions filebeat/prospector/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func NewProspectorLog(p *Prospector) (*ProspectorLog, error) {
config: p.config,
}

if len(p.config.Paths) == 0 {
return nil, fmt.Errorf("each prospector must have at least one path defined")
}

return prospectorer, nil
}

Expand Down
1 change: 1 addition & 0 deletions filebeat/prospector/prospector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestProspectorFileExclude(t *testing.T) {

prospector := Prospector{
config: prospectorConfig{
Paths: []string{"test.log"},
ExcludeFiles: []match.Matcher{match.MustCompile(`\.gz$`)},
},
}
Expand Down
3 changes: 3 additions & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ filebeat.prospectors:
max_lines: {{ max_lines|default(500) }}
{% endif %}
{% endif %}
{% if prospector_raw %}
{{prospector_raw}}
{% endif %}

filebeat.spool_size:
filebeat.shutdown_timeout: {{ shutdown_timeout|default(0) }}
Expand Down
24 changes: 24 additions & 0 deletions filebeat/tests/system/test_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,27 @@ def nasa_logs(self):
self.copy_files(["logs/nasa-50k.log"],
source_dir="../files",
target_dir="log")

def test_stopping_empty_path(self):
"""
Test filebeat stops properly when 1 prospector has an invalid config.
"""

prospector_raw = """
- input_type: log
paths: []
"""

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
prospector_raw=prospector_raw,
)
filebeat = self.start_beat()
time.sleep(2)

# Wait until first flush
self.wait_until(
lambda: self.log_contains_count("No paths were defined for prospector") >= 1,
max_timeout=5)

filebeat.check_wait(exit_code=1)

0 comments on commit c39a3e9

Please sign in to comment.