Skip to content

Commit

Permalink
Fix ML jobs setup for dynamic modules
Browse files Browse the repository at this point in the history
Modules from `modules.d` were ignored by both `setup` command and
`--setup` flag.

Fixes #5504
  • Loading branch information
exekias committed Nov 3, 2017
1 parent 33270f1 commit e535809
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 6 deletions.
42 changes: 37 additions & 5 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"flag"
"fmt"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
Expand Down Expand Up @@ -99,10 +101,8 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
}

// register `setup` callback for ML jobs
if !moduleRegistry.Empty() {
b.SetupMLCallback = func(b *beat.Beat) error {
return fb.loadModulesML(b)
}
b.SetupMLCallback = func(b *beat.Beat) error {
return fb.loadModulesML(b)
}
return fb, nil
}
Expand All @@ -127,6 +127,7 @@ func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {

func (fb *Filebeat) loadModulesML(b *beat.Beat) error {
logp.Debug("machine-learning", "Setting up ML jobs for modules")
var errs multierror.Errors

if b.Config.Output.Name() != "elasticsearch" {
logp.Warn("Filebeat is unable to load the Xpack Machine Learning configurations for the" +
Expand All @@ -139,8 +140,39 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat) error {
if err != nil {
return errors.Errorf("Error creating Elasticsearch client: %v", err)
}
if err := fb.moduleRegistry.LoadML(esClient); err != nil {
errs = append(errs, err)
}

// Add dynamic modules.d
if fb.config.ConfigModules.Enabled() {
config := cfgfile.DefaultDynamicConfig
fb.config.ConfigModules.Unpack(&config)

modulesManager, err := cfgfile.NewGlobManager(config.Path, ".yml", ".disabled")
if err != nil {
return errors.Wrap(err, "initialization error")
}

for _, file := range modulesManager.ListEnabled() {
confs, err := cfgfile.LoadList(file.Path)
if err != nil {
errs = append(errs, errors.Wrap(err, "error loading config file"))
continue
}
set, err := fileset.NewModuleRegistry(confs, "", false)
if err != nil {
errs = append(errs, err)
continue
}

if err := set.LoadML(esClient); err != nil {
errs = append(errs, err)
}
}
}

return fb.moduleRegistry.LoadML(esClient)
return errs.Err()
}

// Run allows the beater to be run as a beat.
Expand Down
3 changes: 3 additions & 0 deletions filebeat/tests/system/config/filebeat_modules.yml.j2
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("registry")}}

filebeat.config.modules:
path: {{ beat.working_dir + '/modules.d/*.yml' }}

output.elasticsearch.hosts: ["{{ elasticsearch_url }}"]
output.elasticsearch.index: {{ index_name }}

Expand Down
86 changes: 85 additions & 1 deletion filebeat/tests/system/test_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def search_objects():
"integration test not available on 2.x")
def test_setup_machine_learning_nginx(self):
"""
Tests that setup works and loads nginx dashboards.
Tests that setup works and loads machine learning jobs using --modules flag.
"""
self.init()
# generate a minimal configuration
Expand Down Expand Up @@ -238,3 +238,87 @@ def test_setup_machine_learning_nginx(self):

datafeeds = self.es.transport.perform_request("GET", "/_xpack/ml/datafeeds/")
assert "filebeat-nginx-access-response_code" in (df["job_id"] for df in datafeeds["datafeeds"])

@unittest.skipIf(not INTEGRATION_TESTS or
os.getenv("TESTING_ENVIRONMENT") == "2x",
"integration test not available on 2.x")
def test_setup_machine_learning_nginx_enable(self):
"""
Tests that setup works and loads machine learning jobs for enabled modules.
"""
self.init()
# generate a minimal configuration
cfgfile = os.path.join(self.working_dir, "filebeat.yml")
self.render_config_template(
template_name="filebeat_modules",
output=cfgfile,
index_name=self.index_name,
elasticsearch_url=self.elasticsearch_url)

# Enable nginx
os.mkdir(os.path.join(self.working_dir, "modules.d"))
with open(os.path.join(self.working_dir, "modules.d/nginx.yml"), "wb") as nginx:
nginx.write("- module: nginx")

cmd = [
self.filebeat, "-systemTest",
"-e", "-d", "*",
"-c", cfgfile,
"setup", "--machine-learning"]

output = open(os.path.join(self.working_dir, "output.log"), "ab")
output.write(" ".join(cmd) + "\n")
subprocess.Popen(cmd,
stdin=None,
stdout=output,
stderr=output,
bufsize=0).wait()

jobs = self.es.transport.perform_request("GET", "/_xpack/ml/anomaly_detectors/")
assert "filebeat-nginx-access-response_code" in (job["job_id"] for job in jobs["jobs"])

datafeeds = self.es.transport.perform_request("GET", "/_xpack/ml/datafeeds/")
assert "filebeat-nginx-access-response_code" in (df["job_id"] for df in datafeeds["datafeeds"])

@unittest.skipIf(not INTEGRATION_TESTS or
os.getenv("TESTING_ENVIRONMENT") == "2x",
"integration test not available on 2.x")
def test_setup_flag_machine_learning_nginx_enable(self):
"""
Tests that setup works and loads machine learning jobs for enabled modules using --setup flag.
"""
self.init()
# generate a minimal configuration
cfgfile = os.path.join(self.working_dir, "filebeat.yml")
self.render_config_template(
template_name="filebeat_modules",
output=cfgfile,
index_name=self.index_name,
elasticsearch_url=self.elasticsearch_url)

# Enable nginx
os.mkdir(os.path.join(self.working_dir, "modules.d"))
with open(os.path.join(self.working_dir, "modules.d/nginx.yml"), "wb") as nginx:
nginx.write("- module: nginx")

cmd = [
self.filebeat, "-systemTest",
"-e", "-d", "*",
"-c", cfgfile,
"--setup"]

output = open(os.path.join(self.working_dir, "output.log"), "ab")
output.write(" ".join(cmd) + "\n")
beat = subprocess.Popen(cmd,
stdin=None,
stdout=output,
stderr=output,
bufsize=0)

jobs = self.es.transport.perform_request("GET", "/_xpack/ml/anomaly_detectors/")
assert "filebeat-nginx-access-response_code" in (job["job_id"] for job in jobs["jobs"])

datafeeds = self.es.transport.perform_request("GET", "/_xpack/ml/datafeeds/")
assert "filebeat-nginx-access-response_code" in (df["job_id"] for df in datafeeds["datafeeds"])

beat.kill()

0 comments on commit e535809

Please sign in to comment.