Skip to content

Commit

Permalink
Fix ML jobs setup for dynamic modules (elastic#5509)
Browse files Browse the repository at this point in the history
* Fix ML jobs setup for dynamic modules

Modules from `modules.d` were ignored by both `setup` command and
`--setup` flag.

Fixes elastic#5504

* Fix test
  • Loading branch information
exekias authored and tsg committed Nov 6, 2017
1 parent ac0d5e0 commit adcd3d0
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 23 deletions.
42 changes: 37 additions & 5 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"flag"
"fmt"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
Expand Down Expand Up @@ -99,10 +101,8 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
}

// register `setup` callback for ML jobs
if !moduleRegistry.Empty() {
b.SetupMLCallback = func(b *beat.Beat) error {
return fb.loadModulesML(b)
}
b.SetupMLCallback = func(b *beat.Beat) error {
return fb.loadModulesML(b)
}
return fb, nil
}
Expand All @@ -127,6 +127,7 @@ func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {

func (fb *Filebeat) loadModulesML(b *beat.Beat) error {
logp.Debug("machine-learning", "Setting up ML jobs for modules")
var errs multierror.Errors

if b.Config.Output.Name() != "elasticsearch" {
logp.Warn("Filebeat is unable to load the Xpack Machine Learning configurations for the" +
Expand All @@ -139,8 +140,39 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat) error {
if err != nil {
return errors.Errorf("Error creating Elasticsearch client: %v", err)
}
if err := fb.moduleRegistry.LoadML(esClient); err != nil {
errs = append(errs, err)
}

// Add dynamic modules.d
if fb.config.ConfigModules.Enabled() {
config := cfgfile.DefaultDynamicConfig
fb.config.ConfigModules.Unpack(&config)

modulesManager, err := cfgfile.NewGlobManager(config.Path, ".yml", ".disabled")
if err != nil {
return errors.Wrap(err, "initialization error")
}

for _, file := range modulesManager.ListEnabled() {
confs, err := cfgfile.LoadList(file.Path)
if err != nil {
errs = append(errs, errors.Wrap(err, "error loading config file"))
continue
}
set, err := fileset.NewModuleRegistry(confs, "", false)
if err != nil {
errs = append(errs, err)
continue
}

if err := set.LoadML(esClient); err != nil {
errs = append(errs, err)
}
}
}

return fb.moduleRegistry.LoadML(esClient)
return errs.Err()
}

// Run allows the beater to be run as a beat.
Expand Down
9 changes: 9 additions & 0 deletions filebeat/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ services:
env_file:
- ${PWD}/build/test.env
- ${PWD}/prospector/redis/_meta/env
environment:
- KIBANA_HOST=kibana
- KIBANA_PORT=5601
working_dir: /go/src/github.com/elastic/beats/filebeat
volumes:
- ${PWD}/..:/go/src/github.com/elastic/beats/
Expand All @@ -18,12 +21,18 @@ services:
image: busybox
depends_on:
elasticsearch: { condition: service_healthy }
kibana: { condition: service_healthy }
redis: { condition: service_healthy }

elasticsearch:
extends:
file: ../testing/environments/${TESTING_ENVIRONMENT}.yml
service: elasticsearch

kibana:
extends:
file: ../testing/environments/${TESTING_ENVIRONMENT}.yml
service: kibana

redis:
build: ${PWD}/prospector/redis/_meta
9 changes: 9 additions & 0 deletions filebeat/tests/system/config/filebeat_modules.yml.j2
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("registry")}}

filebeat.config.modules:
path: {{ beat.working_dir + '/modules.d/*.yml' }}

output.elasticsearch.hosts: ["{{ elasticsearch_url }}"]
output.elasticsearch.index: {{ index_name }}

setup.template.name: {{ index_name }}
setup.template.pattern: {{ index_name }}*

setup.kibana.host: {{ kibana_url }}

{% if kibana_path %}
setup.dashboards.directory: {{ kibana_path }}
{% endif %}
76 changes: 58 additions & 18 deletions filebeat/tests/system/test_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import unittest
import glob
import shutil
import subprocess
from elasticsearch import Elasticsearch
import json
Expand All @@ -13,6 +14,7 @@ class Test(BaseTest):

def init(self):
self.elasticsearch_url = self.get_elasticsearch_url()
self.kibana_url = self.get_kibana_url()
print("Using elasticsearch: {}".format(self.elasticsearch_url))
self.es = Elasticsearch([self.elasticsearch_url])
logging.getLogger("urllib3").setLevel(logging.WARNING)
Expand All @@ -21,6 +23,9 @@ def init(self):
self.modules_path = os.path.abspath(self.working_dir +
"/../../../../module")

self.kibana_path = os.path.abspath(self.working_dir +
"/../../../../_meta/kibana")

self.filebeat = os.path.abspath(self.working_dir +
"/../../../../filebeat.test")

Expand Down Expand Up @@ -206,35 +211,70 @@ def search_objects():
@unittest.skipIf(not INTEGRATION_TESTS or
os.getenv("TESTING_ENVIRONMENT") == "2x",
"integration test not available on 2.x")
def test_setup_machine_learning_nginx(self):
"""
Tests that setup works and loads nginx dashboards.
"""
def test_ml_setup(self):
""" Test ML are installed in all possible ways """
for setup_flag in (True, False):
for modules_flag in (True, False):
self._run_ml_test(setup_flag, modules_flag)

def _run_ml_test(self, setup_flag, modules_flag):
self.init()

# Clean any previous state
for df in self.es.transport.perform_request("GET", "/_xpack/ml/datafeeds/")["datafeeds"]:
if df["datafeed_id"] == 'filebeat-nginx-access-response_code':
self.es.transport.perform_request("DELETE", "/_xpack/ml/datafeeds/" + df["datafeed_id"])

for df in self.es.transport.perform_request("GET", "/_xpack/ml/anomaly_detectors/")["jobs"]:
if df["job_id"] == 'datafeed-filebeat-nginx-access-response_code':
self.es.transport.perform_request("DELETE", "/_xpack/ml/anomaly_detectors/" + df["job_id"])

shutil.rmtree(os.path.join(self.working_dir, "modules.d"), ignore_errors=True)

# generate a minimal configuration
cfgfile = os.path.join(self.working_dir, "filebeat.yml")
self.render_config_template(
template_name="filebeat_modules",
output=cfgfile,
index_name=self.index_name,
elasticsearch_url=self.elasticsearch_url)
elasticsearch_url=self.elasticsearch_url,
kibana_url=self.kibana_url,
kibana_path=self.kibana_path)

if not modules_flag:
# Enable nginx
os.mkdir(os.path.join(self.working_dir, "modules.d"))
with open(os.path.join(self.working_dir, "modules.d/nginx.yml"), "wb") as nginx:
nginx.write("- module: nginx")

cmd = [
self.filebeat, "-systemTest",
"-e", "-d", "*",
"-c", cfgfile,
"setup", "--modules=nginx", "--machine-learning"]
"-c", cfgfile
]

output = open(os.path.join(self.working_dir, "output.log"), "ab")
output.write(" ".join(cmd) + "\n")
subprocess.Popen(cmd,
stdin=None,
stdout=output,
stderr=subprocess.STDOUT,
bufsize=0).wait()
if setup_flag:
cmd += ["--setup"]
else:
cmd += ["setup", "--machine-learning"]

jobs = self.es.transport.perform_request("GET", "/_xpack/ml/anomaly_detectors/")
assert "filebeat-nginx-access-response_code" in (job["job_id"] for job in jobs["jobs"])
if modules_flag:
cmd += ["--modules=nginx"]

datafeeds = self.es.transport.perform_request("GET", "/_xpack/ml/datafeeds/")
assert "filebeat-nginx-access-response_code" in (df["job_id"] for df in datafeeds["datafeeds"])
output = open(os.path.join(self.working_dir, "output.log"), "ab")
output.write(" ".join(cmd) + "\n")
beat = subprocess.Popen(cmd,
stdin=None,
stdout=output,
stderr=output,
bufsize=0)

# Check result
self.wait_until(lambda: "filebeat-nginx-access-response_code" in
(df["job_id"] for df in self.es.transport.perform_request(
"GET", "/_xpack/ml/anomaly_detectors/")["jobs"]),
max_timeout=30)
self.wait_until(lambda: "datafeed-filebeat-nginx-access-response_code" in
(df["datafeed_id"] for df in self.es.transport.perform_request("GET", "/_xpack/ml/datafeeds/")["datafeeds"]))

beat.kill()
9 changes: 9 additions & 0 deletions libbeat/tests/system/beat/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,3 +542,12 @@ def get_elasticsearch_url(self):
host=os.getenv("ES_HOST", "localhost"),
port=os.getenv("ES_PORT", "9200"),
)

def get_kibana_url(self):
"""
Returns kibana host URL
"""
return "http://{host}:{port}".format(
host=os.getenv("KIBANA_HOST", "localhost"),
port=os.getenv("KIBANA_PORT", "5601"),
)

0 comments on commit adcd3d0

Please sign in to comment.