Skip to content

Commit

Permalink
Check modules and prospectors settings when reload is off (elastic#5053
Browse files Browse the repository at this point in the history
…) (elastic#5107)

* Check modules and prospectors settings when reload is off

This change ensures that we check configs when loading them through
`filebeat.config.modules`, `filebeat.config.prospectors` or `metricbeat.config.modules`.

It will error and exit if settings are wrong and **reload is disabled**

* Extract common operations from check and start

(cherry picked from commit ac3d0dd)
  • Loading branch information
exekias authored and tsg committed Sep 7, 2017
1 parent 06714be commit 49b2807
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 20 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Fix wrong MySQL CRUD queries timelion visualization {pull}4857[4857]
- Add new metrics to CPU metricsset {pull}4969[4969]
- Fix a memory allocation issue where more memory was allocated than needed in the windows-perfmon metricset. {issue}5035[5035]
- Don't start metricbeat if external modules config is wrong and reload is disabled {pull}5053[5053]

*Packetbeat*

Expand All @@ -62,6 +63,8 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di

*Filebeat*

- Don't start filebeat if external modules/prospectors config is wrong and reload is disabled {pull}5053[5053]

*Heartbeat*

*Metricbeat*
Expand Down
8 changes: 8 additions & 0 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config

c.prospectorsReloader = cfgfile.NewReloader(configProspectors)
prospectorsFactory := prospector.NewFactory(c.out, r, c.beatDone)
if err := c.prospectorsReloader.Check(prospectorsFactory); err != nil {
return err
}

go func() {
c.prospectorsReloader.Run(prospectorsFactory)
}()
Expand All @@ -67,6 +71,10 @@ func (c *Crawler) Start(r *registrar.Registrar, configProspectors *common.Config

c.modulesReloader = cfgfile.NewReloader(configModules)
modulesFactory := fileset.NewFactory(c.out, r, c.beatVersion, pipelineLoaderFactory, c.beatDone)
if err := c.modulesReloader.Check(modulesFactory); err != nil {
return err
}

go func() {
c.modulesReloader.Run(modulesFactory)
}()
Expand Down
3 changes: 1 addition & 2 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,10 @@ filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("reg

{% if reload or reload_path -%}
filebeat.config.{{ reload_type|default("prospectors") }}:
enabled: true
path: {{ reload_path }}
{% if reload -%}
reload.period: 1s
reload.enabled: true
reload.enabled: {{ reload|default("false") }}
{% endif -%}
{% endif -%}

Expand Down
32 changes: 32 additions & 0 deletions filebeat/tests/system/test_reload_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,35 @@ def test_load_configs(self):
assert output[0]["message"] == "Hello 1"
assert output[1]["message"] == "Hello 2"
proc.check_kill_and_wait()

def test_wrong_module_no_reload(self):
"""
Test beat errors when reload is disabled and some module config is wrong
"""
self.render_config_template(
reload=False,
reload_path=self.working_dir + "/configs/*.yml",
prospectors=False,
)
os.mkdir(self.working_dir + "/configs/")

config_path = self.working_dir + "/configs/wrong_module.yml"
moduleConfig = """
- module: test
test:
enabled: true
wrong_field: error
prospector:
scan_frequency: 1s
"""
with open(config_path, 'w') as f:
f.write(moduleConfig)

exit_code = self.run_beat()

# Wait until offset for new line is updated
self.wait_until(
lambda: self.log_contains("No paths were defined for prospector accessing"),
max_timeout=10)

assert exit_code == 1
83 changes: 67 additions & 16 deletions libbeat/cfgfile/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"sync"
"time"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
Expand Down Expand Up @@ -54,6 +57,7 @@ type Runner interface {
type Reloader struct {
registry *Registry
config DynamicConfig
path string
done chan struct{}
wg sync.WaitGroup
}
Expand All @@ -63,13 +67,56 @@ func NewReloader(cfg *common.Config) *Reloader {
config := DefaultDynamicConfig
cfg.Unpack(&config)

path := config.Path
if !filepath.IsAbs(path) {
path = paths.Resolve(paths.Config, path)
}

return &Reloader{
registry: NewRegistry(),
config: config,
path: path,
done: make(chan struct{}),
}
}

// Check configs are valid (only if reload is disabled)
func (rl *Reloader) Check(runnerFactory RunnerFactory) error {
// If config reload is enabled we ignore errors (as they may be fixed afterwards)
if rl.config.Reload.Enabled {
return nil
}

debugf("Checking module configs from: %s", rl.path)
gw := NewGlobWatcher(rl.path)

files, _, err := gw.Scan()
if err != nil {
return errors.Wrap(err, "fetching config files")
}

// Load all config objects
configs, err := rl.loadConfigs(files)
if err != nil {
return err
}

debugf("Number of module configs found: %v", len(configs))

// Initialize modules
for _, c := range configs {
// Only add configs to startList which are enabled
if !c.Enabled() {
continue
}
_, err := runnerFactory.Create(c)
if err != nil {
return err
}
}
return nil
}

// Run runs the reloader
func (rl *Reloader) Run(runnerFactory RunnerFactory) {
logp.Info("Config reloader started")
Expand All @@ -80,12 +127,7 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) {
// Stop all running modules when method finishes
defer rl.stopRunners(rl.registry.CopyList())

path := rl.config.Path
if !filepath.IsAbs(path) {
path = paths.Resolve(paths.Config, path)
}

gw := NewGlobWatcher(path)
gw := NewGlobWatcher(rl.path)

// If reloading is disable, config files should be loaded immidiately
if !rl.config.Reload.Enabled {
Expand Down Expand Up @@ -118,16 +160,7 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) {
}

// Load all config objects
configs := []*common.Config{}
for _, file := range files {
c, err := LoadList(file)
if err != nil {
logp.Err("Error loading config: %s", err)
continue
}

configs = append(configs, c...)
}
configs, _ := rl.loadConfigs(files)

debugf("Number of module configs found: %v", len(configs))

Expand Down Expand Up @@ -182,6 +215,24 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) {
}
}

func (rl *Reloader) loadConfigs(files []string) ([]*common.Config, error) {
// Load all config objects
configs := []*common.Config{}
var errs multierror.Errors
for _, file := range files {
c, err := LoadList(file)
if err != nil {
errs = append(errs, err)
logp.Err("Error loading config: %s", err)
continue
}

configs = append(configs, c...)
}

return configs, errs.Err()
}

// Stop stops the reloader and waits for all modules to properly stop
func (rl *Reloader) Stop() {
close(rl.done)
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ func (bt *Metricbeat) Run(b *beat.Beat) error {
moduleReloader := cfgfile.NewReloader(bt.config.ConfigModules)
factory := module.NewFactory(bt.config.MaxStartDelay, b.Publisher)

if err := moduleReloader.Check(factory); err != nil {
return err
}

go moduleReloader.Run(factory)
wg.Add(1)
go func() {
Expand Down
4 changes: 2 additions & 2 deletions metricbeat/tests/system/config/metricbeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ metricbeat.modules:
{% endif -%}
{%- endfor %}

{% if reload -%}
{% if reload or reload_path -%}
metricbeat.config.modules:
path: {{ reload_path|default("${path.config}/modules.d/*.yml") }}
reload.period: 1s
reload.enabled: true
reload.enabled: {{ reload|default("false")}}
{% endif -%}

# Disable random start delay for metricsets.
Expand Down
29 changes: 29 additions & 0 deletions metricbeat/tests/system/test_reload.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,32 @@ def test_start_stop(self):
assert lines == self.output_lines()

proc.check_kill_and_wait()

@unittest.skipUnless(re.match("(?i)win|linux|darwin|freebsd|openbsd", sys.platform), "os")
def test_wrong_module_no_reload(self):
"""
Test beat errors when reload is disabled and some module config is wrong
"""
self.render_config_template(
reload=False,
reload_path=self.working_dir + "/configs/*.yml",
)
os.mkdir(self.working_dir + "/configs/")

config_path = self.working_dir + "/configs/system.yml"
systemConfig = """
- module: system
metricsets: ["wrong_metricset"]
period: 1s
"""
with open(config_path, 'w') as f:
f.write(systemConfig)

exit_code = self.run_beat()

# Wait until offset for new line is updated
self.wait_until(
lambda: self.log_contains("metricset not found"),
max_timeout=10)

assert exit_code == 1

0 comments on commit 49b2807

Please sign in to comment.