diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 10096412fcac..4eb36799f0eb 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -106,6 +106,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha2...master[Check the HEAD d - Add the ability to configure kernel's audit failure mode. {pull}4516[4516] - Add experimental Aerospike module. {pull}4560[4560] - Vsphere module: collect custom fields from virtual machines. {issue}4464[4464] +- Add `test modules` command, to test modules expected output. {pull}4656[4656] - Add `processors` setting to metricbeat modules. {pull}4699[4699] *Packetbeat* diff --git a/libbeat/cfgfile/glob_manager.go b/libbeat/cfgfile/glob_manager.go index b3ef2e7554e3..95c131f618a0 100644 --- a/libbeat/cfgfile/glob_manager.go +++ b/libbeat/cfgfile/glob_manager.go @@ -15,13 +15,13 @@ type GlobManager struct { glob string enabledExtension string disabledExtension string - files []*cfgfile + files []*CfgFile } -type cfgfile struct { - name string - path string - enabled bool +type CfgFile struct { + Name string + Path string + Enabled bool } // NewGlobManager takes a glob and enabled/disabled extensions and returns a GlobManager object. @@ -58,10 +58,10 @@ func (g *GlobManager) load() error { for _, path := range files { // Trim cfg file name - g.files = append(g.files, &cfgfile{ - name: strings.TrimSuffix(filepath.Base(path), g.enabledExtension), - enabled: true, - path: path, + g.files = append(g.files, &CfgFile{ + Name: strings.TrimSuffix(filepath.Base(path), g.enabledExtension), + Enabled: true, + Path: path, }) } @@ -74,10 +74,10 @@ func (g *GlobManager) load() error { for _, path := range files { // Trim cfg file name - g.files = append(g.files, &cfgfile{ - name: strings.TrimSuffix(filepath.Base(path), g.enabledExtension+g.disabledExtension), - enabled: false, - path: path, + g.files = append(g.files, &CfgFile{ + Name: strings.TrimSuffix(filepath.Base(path), g.enabledExtension+g.disabledExtension), + Enabled: false, + Path: path, }) } @@ -85,34 +85,34 @@ func (g *GlobManager) load() error { } // ListEnabled conf files -func (g *GlobManager) ListEnabled() []string { - var names []string +func (g *GlobManager) ListEnabled() []*CfgFile { + var enabled []*CfgFile for _, file := range g.files { - if file.enabled { - names = append(names, file.name) + if file.Enabled { + enabled = append(enabled, file) } } - return names + return enabled } // ListDisabled conf files -func (g *GlobManager) ListDisabled() []string { - var names []string +func (g *GlobManager) ListDisabled() []*CfgFile { + var disabled []*CfgFile for _, file := range g.files { - if !file.enabled { - names = append(names, file.name) + if !file.Enabled { + disabled = append(disabled, file) } } - return names + return disabled } // Enabled returns true if given conf file is enabled func (g *GlobManager) Enabled(name string) bool { for _, file := range g.files { - if name == file.name { - return file.enabled + if name == file.Name { + return file.Enabled } } return false @@ -121,7 +121,7 @@ func (g *GlobManager) Enabled(name string) bool { // Exists return true if the given conf exists (enabled or disabled) func (g *GlobManager) Exists(name string) bool { for _, file := range g.files { - if name == file.name { + if name == file.Name { return true } } @@ -131,14 +131,14 @@ func (g *GlobManager) Exists(name string) bool { // Enable given conf file, does nothing if it's enabled already func (g *GlobManager) Enable(name string) error { for _, file := range g.files { - if name == file.name { - if !file.enabled { - newPath := strings.TrimSuffix(file.path, g.disabledExtension) - if err := os.Rename(file.path, newPath); err != nil { + if name == file.Name { + if !file.Enabled { + newPath := strings.TrimSuffix(file.Path, g.disabledExtension) + if err := os.Rename(file.Path, newPath); err != nil { return errors.Wrap(err, "enable failed") } - file.enabled = true - file.path = newPath + file.Enabled = true + file.Path = newPath } return nil } @@ -150,14 +150,14 @@ func (g *GlobManager) Enable(name string) error { // Disable given conf file, does nothing if it's disabled already func (g *GlobManager) Disable(name string) error { for _, file := range g.files { - if name == file.name { - if file.enabled { - newPath := file.path + g.disabledExtension - if err := os.Rename(file.path, newPath); err != nil { + if name == file.Name { + if file.Enabled { + newPath := file.Path + g.disabledExtension + if err := os.Rename(file.Path, newPath); err != nil { return errors.Wrap(err, "disable failed") } - file.enabled = false - file.path = newPath + file.Enabled = false + file.Path = newPath } return nil } diff --git a/libbeat/cfgfile/glob_manager_test.go b/libbeat/cfgfile/glob_manager_test.go index c651986d14f1..b709af8698ff 100644 --- a/libbeat/cfgfile/glob_manager_test.go +++ b/libbeat/cfgfile/glob_manager_test.go @@ -60,7 +60,9 @@ func TestGlobManager(t *testing.T) { assert.Equal(t, len(manager.ListEnabled()), 1) assert.Equal(t, len(manager.ListDisabled()), 2) - assert.Equal(t, manager.ListEnabled(), []string{"config1"}) + enabled := manager.ListEnabled() + assert.Equal(t, enabled[0].Name, "config1") + assert.Equal(t, enabled[0].Enabled, true) // Test enable if err = manager.Enable("config3"); err != nil { @@ -70,7 +72,9 @@ func TestGlobManager(t *testing.T) { assert.Equal(t, len(manager.ListEnabled()), 2) assert.Equal(t, len(manager.ListDisabled()), 1) - assert.Equal(t, manager.ListDisabled(), []string{"config2"}) + disabled := manager.ListDisabled() + assert.Equal(t, disabled[0].Name, "config2") + assert.Equal(t, disabled[0].Enabled, false) // Check correct files layout: files, err := filepath.Glob(dir + "/*") diff --git a/libbeat/cmd/export.go b/libbeat/cmd/export.go index 20ee115868a3..202d1c1f26c0 100644 --- a/libbeat/cmd/export.go +++ b/libbeat/cmd/export.go @@ -3,18 +3,17 @@ package cmd import ( "github.com/spf13/cobra" - "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/cmd/export" ) -func genExportCmd(name, beatVersion string, beatCreator beat.Creator) *cobra.Command { +func genExportCmd(name, beatVersion string) *cobra.Command { exportCmd := &cobra.Command{ Use: "export", Short: "Export current config or index template", } - exportCmd.AddCommand(export.GenExportConfigCmd(name, beatVersion, beatCreator)) - exportCmd.AddCommand(export.GenTemplateConfigCmd(name, beatVersion, beatCreator)) + exportCmd.AddCommand(export.GenExportConfigCmd(name, beatVersion)) + exportCmd.AddCommand(export.GenTemplateConfigCmd(name, beatVersion)) return exportCmd } diff --git a/libbeat/cmd/export/config.go b/libbeat/cmd/export/config.go index 64dd47b9c422..7060a5dd2f92 100644 --- a/libbeat/cmd/export/config.go +++ b/libbeat/cmd/export/config.go @@ -10,7 +10,7 @@ import ( "github.com/elastic/beats/libbeat/beat" ) -func GenExportConfigCmd(name, beatVersion string, beatCreator beat.Creator) *cobra.Command { +func GenExportConfigCmd(name, beatVersion string) *cobra.Command { return &cobra.Command{ Use: "config", Short: "Export current config to stdout", diff --git a/libbeat/cmd/export/template.go b/libbeat/cmd/export/template.go index 7c5ad8ec7510..b459638c42b3 100644 --- a/libbeat/cmd/export/template.go +++ b/libbeat/cmd/export/template.go @@ -10,7 +10,7 @@ import ( "github.com/elastic/beats/libbeat/template" ) -func GenTemplateConfigCmd(name, beatVersion string, beatCreator beat.Creator) *cobra.Command { +func GenTemplateConfigCmd(name, beatVersion string) *cobra.Command { genTemplateConfigCmd := &cobra.Command{ Use: "template", Short: "Export index template to stdout", diff --git a/libbeat/cmd/modules.go b/libbeat/cmd/modules.go index c75c10954aa7..4e52ffb72d7b 100644 --- a/libbeat/cmd/modules.go +++ b/libbeat/cmd/modules.go @@ -7,13 +7,14 @@ import ( "github.com/spf13/cobra" "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/cfgfile" ) // ModulesManager interface provides all actions needed to implement modules command // (to list, enable & disable modules) type ModulesManager interface { - ListEnabled() []string - ListDisabled() []string + ListEnabled() []*cfgfile.CfgFile + ListDisabled() []*cfgfile.CfgFile Exists(name string) bool Enabled(name string) bool Enable(name string) error @@ -69,12 +70,12 @@ func genListModulesCmd(name, version string, modulesFactory ModulesManagerFactor fmt.Println("Enabled:") for _, module := range modules.ListEnabled() { - fmt.Println(module) + fmt.Println(module.Name) } fmt.Println("\nDisabled:") for _, module := range modules.ListDisabled() { - fmt.Println(module) + fmt.Println(module.Name) } }, } diff --git a/libbeat/cmd/root.go b/libbeat/cmd/root.go index 9059c73943db..0651f30f3f3c 100644 --- a/libbeat/cmd/root.go +++ b/libbeat/cmd/root.go @@ -51,7 +51,7 @@ func GenRootCmdWithRunFlags(name, version string, beatCreator beat.Creator, runF rootCmd.SetupCmd = genSetupCmd(name, version, beatCreator) rootCmd.VersionCmd = genVersionCmd(name, version) rootCmd.CompletionCmd = genCompletionCmd(name, version, rootCmd) - rootCmd.ExportCmd = genExportCmd(name, version, beatCreator) + rootCmd.ExportCmd = genExportCmd(name, version) rootCmd.TestCmd = genTestCmd(name, version, beatCreator) // Root command is an alias for run diff --git a/libbeat/testing/console.go b/libbeat/testing/console.go index 638221fdfb33..ef1301af813d 100644 --- a/libbeat/testing/console.go +++ b/libbeat/testing/console.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "os" + "strings" "github.com/fatih/color" ) @@ -14,6 +15,7 @@ type ConsoleDriver struct { level int reported bool killer func() + result string } // NewConsoleDriver initializes and returns a new console driver with output to given file @@ -25,14 +27,18 @@ func NewConsoleDriver(stdout io.Writer) *ConsoleDriver { // Killer function will be called on fatal errors func NewConsoleDriverWithKiller(stdout io.Writer, killer func()) *ConsoleDriver { return &ConsoleDriver{ - Stdout: stdout, - level: 0, - killer: killer, + Stdout: stdout, + level: 0, + killer: killer, + reported: true, } } func (d *ConsoleDriver) Run(name string, f func(Driver)) { - d.printf("%s...\n", name) + if !d.reported { + fmt.Fprintln(d.Stdout, "") + } + d.printf("%s...", name) // Run sub func driver := &ConsoleDriver{ @@ -43,59 +49,86 @@ func (d *ConsoleDriver) Run(name string, f func(Driver)) { f(driver) if !driver.reported { - driver.ok() + color.New(color.FgGreen).Fprintf(driver.Stdout, "OK\n") + driver.reported = true + } + + if driver.result != "" { + driver.Info("result", driver.indent(driver.result)) } + + d.reported = true } func (d *ConsoleDriver) Info(field, value string) { + if !d.reported { + fmt.Fprintln(d.Stdout, "") + } d.printf("%s: %s\n", field, value) d.reported = true } func (d *ConsoleDriver) Warn(field, reason string) { + if !d.reported { + fmt.Fprintln(d.Stdout, "") + } d.printf("%s... ", field) - d.warn(reason) + color.New(color.FgYellow).Fprintf(d.Stdout, "WARN ") + fmt.Fprintln(d.Stdout, reason) + d.reported = true } func (d *ConsoleDriver) Error(field string, err error) { - d.printf("%s... ", field) if err == nil { - d.ok() + d.ok(field) return } - d.error(err) + d.error(field, err) } func (d *ConsoleDriver) Fatal(field string, err error) { - d.printf("%s... ", field) if err == nil { - d.ok() + d.ok(field) return } - d.error(err) + d.error(field, err) d.killer() } -func (d *ConsoleDriver) ok() { +func (d *ConsoleDriver) Result(data string) { + d.result = data +} + +func (d *ConsoleDriver) ok(field string) { + if !d.reported { + fmt.Fprintln(d.Stdout, "") + } + d.printf("%s... ", field) color.New(color.FgGreen).Fprintf(d.Stdout, "OK\n") d.reported = true } -func (d *ConsoleDriver) error(err error) { +func (d *ConsoleDriver) error(field string, err error) { + if !d.reported { + fmt.Fprintln(d.Stdout, "") + } + d.printf("%s... ", field) color.New(color.FgRed).Fprintf(d.Stdout, "ERROR ") fmt.Fprintln(d.Stdout, err.Error()) d.reported = true } -func (d *ConsoleDriver) warn(reason string) { - color.New(color.FgYellow).Fprintf(d.Stdout, "WARN ") - fmt.Fprintln(d.Stdout, reason) - d.reported = true -} - func (d *ConsoleDriver) printf(format string, args ...interface{}) { for i := 0; i < d.level; i++ { fmt.Fprint(d.Stdout, " ") } fmt.Fprintf(d.Stdout, format, args...) } + +func (d *ConsoleDriver) indent(data string) string { + res := "\n" + for _, line := range strings.Split(data, "\n") { + res += strings.Repeat(" ", d.level+2) + line + "\n" + } + return res +} diff --git a/libbeat/testing/console_test.go b/libbeat/testing/console_test.go index 21f4a74158e1..1b4a262005ec 100644 --- a/libbeat/testing/console_test.go +++ b/libbeat/testing/console_test.go @@ -63,7 +63,18 @@ func TestConsoleDriverRun(t *testing.T) { output.Flush() assert.True(t, called) - assert.Equal(t, buffer.String(), "test...\nOK\n") + assert.Equal(t, buffer.String(), "test...OK\n") +} + +func TestConsoleDriverResult(t *testing.T) { + buffer, output, driver := createDriver(nil) + + driver.Run("test", func(d Driver) { + d.Result("This is a multiline\nresult") + }) + + output.Flush() + assert.Equal(t, buffer.String(), "test...OK\n result: \n This is a multiline\n result\n\n") } func TestConsoleDriverRunWithReports(t *testing.T) { diff --git a/libbeat/testing/null.go b/libbeat/testing/null.go index 4a5b64123d71..c20945bbfd1a 100644 --- a/libbeat/testing/null.go +++ b/libbeat/testing/null.go @@ -16,3 +16,5 @@ func (d *nullDriver) Warn(field, reason string) {} func (d *nullDriver) Error(field string, err error) {} func (d *nullDriver) Fatal(field string, err error) {} + +func (d *nullDriver) Result(data string) {} diff --git a/libbeat/testing/testing.go b/libbeat/testing/testing.go index 89c8cd874e0b..7300c3eb3a3c 100644 --- a/libbeat/testing/testing.go +++ b/libbeat/testing/testing.go @@ -16,6 +16,9 @@ type Driver interface { // Fatal behaves like error but stops current goroutine on error Fatal(field string, err error) + + // Shows given result to the user + Result(data string) } // Testable is optionally implemented by clients that support self testing. diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index ba0810fa48f2..4f97d249e4bb 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -2,15 +2,16 @@ package beater import ( "sync" + "time" "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/module" "github.com/joeshaw/multierror" - "github.com/elastic/beats/libbeat/cfgfile" "github.com/pkg/errors" // Add metricbeat specific processors @@ -137,3 +138,39 @@ func (bt *Metricbeat) Run(b *beat.Beat) error { func (bt *Metricbeat) Stop() { close(bt.done) } + +// Modules return a list of all configured modules, including anyone present +// under dynamic config settings +func (bt *Metricbeat) Modules() ([]*module.Wrapper, error) { + var modules []*module.Wrapper + for _, m := range bt.modules { + modules = append(modules, m.module) + } + + // Add dynamic modules + if bt.config.ConfigModules.Enabled() { + config := cfgfile.DefaultDynamicConfig + bt.config.ConfigModules.Unpack(&config) + + modulesManager, err := cfgfile.NewGlobManager(config.Path, ".yml", ".disabled") + if err != nil { + return nil, errors.Wrap(err, "initialization error") + } + + for _, file := range modulesManager.ListEnabled() { + confs, err := cfgfile.LoadList(file.Path) + if err != nil { + return nil, errors.Wrap(err, "error loading config files") + } + for _, conf := range confs { + m, err := module.NewWrapper(time.Duration(0), conf, mb.Registry) + if err != nil { + return nil, errors.Wrap(err, "module initialization error") + } + modules = append(modules, m) + } + } + } + + return modules, nil +} diff --git a/metricbeat/cmd/root.go b/metricbeat/cmd/root.go index 0d81f744bc73..38b84d8dcace 100644 --- a/metricbeat/cmd/root.go +++ b/metricbeat/cmd/root.go @@ -10,6 +10,7 @@ import ( cmd "github.com/elastic/beats/libbeat/cmd" "github.com/elastic/beats/metricbeat/beater" + "github.com/elastic/beats/metricbeat/cmd/test" ) // Name of this beat @@ -24,4 +25,5 @@ func init() { RootCmd = cmd.GenRootCmdWithRunFlags(Name, "", beater.New, runFlags) RootCmd.AddCommand(cmd.GenModulesCmd(Name, "", buildModulesManager)) + RootCmd.TestCmd.AddCommand(test.GenTestModulesCmd(Name, "")) } diff --git a/metricbeat/cmd/test/modules.go b/metricbeat/cmd/test/modules.go new file mode 100644 index 000000000000..f7375752fe6e --- /dev/null +++ b/metricbeat/cmd/test/modules.go @@ -0,0 +1,74 @@ +package test + +import ( + "fmt" + "os" + + "github.com/spf13/cobra" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/testing" + "github.com/elastic/beats/metricbeat/beater" +) + +func GenTestModulesCmd(name, beatVersion string) *cobra.Command { + return &cobra.Command{ + Use: "modules [module] [metricset]", + Short: "Test modules settings", + Run: func(cmd *cobra.Command, args []string) { + var filter_module, filter_metricset string + if len(args) > 0 { + filter_module = args[0] + } + + if len(args) > 1 { + filter_metricset = args[1] + } + + b, err := beat.New(name, beatVersion) + if err != nil { + fmt.Fprintf(os.Stderr, "Error initializing beat: %s\n", err) + os.Exit(1) + } + + err = b.Init() + if err != nil { + fmt.Fprintf(os.Stderr, "Error initializing beat: %s\n", err) + os.Exit(1) + } + + config, err := b.BeatConfig() + if err != nil { + fmt.Fprintf(os.Stderr, "Error initializing beat: %s\n", err) + os.Exit(1) + } + + mb, err := beater.New(b, config) + if err != nil { + fmt.Fprintf(os.Stderr, "Error initializing metricbeat: %s\n", err) + os.Exit(1) + } + + modules, err := mb.(*beater.Metricbeat).Modules() + if err != nil { + fmt.Fprintf(os.Stderr, "Error getting metricbeat modules: %s\n", err) + os.Exit(1) + } + + driver := testing.NewConsoleDriver(os.Stdout) + for _, module := range modules { + if filter_module != "" && module.Name() != filter_module { + continue + } + driver.Run(module.Name(), func(driver testing.Driver) { + for _, set := range module.MetricSets() { + if filter_metricset != "" && set.Name() != filter_metricset { + continue + } + set.Test(driver) + } + }) + } + }, + } +} diff --git a/metricbeat/mb/module/testing.go b/metricbeat/mb/module/testing.go new file mode 100644 index 000000000000..581440fa934b --- /dev/null +++ b/metricbeat/mb/module/testing.go @@ -0,0 +1,46 @@ +package module + +import ( + "encoding/json" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/testing" +) + +// testingReporter offers reported interface and send results to testing.Driver +type testingReporter struct { + driver testing.Driver + done <-chan struct{} +} + +func (r *testingReporter) Done() <-chan struct{} { + return r.done +} + +func (r *testingReporter) Event(event common.MapStr) bool { + return r.ErrorWith(nil, event) +} + +func (r *testingReporter) Error(err error) bool { + return r.ErrorWith(err, nil) +} + +func (r *testingReporter) ErrorWith(err error, event common.MapStr) bool { + if err != nil { + r.driver.Error("error", err) + } + + if event != nil { + d, err := json.MarshalIndent(&event, "", " ") + if err != nil { + r.driver.Error("convert event", err) + return true + } + + r.driver.Result(string(d)) + } + + return true +} + +func (r testingReporter) StartFetchTimer() {} diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index 542fb2cd2f15..d4fe9aa169f5 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -10,6 +10,7 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" "github.com/elastic/beats/libbeat/publisher/beat" + "github.com/elastic/beats/libbeat/testing" "github.com/elastic/beats/metricbeat/mb" "github.com/mitchellh/hashstructure" @@ -146,6 +147,11 @@ func (mw *Wrapper) Hash() uint64 { return mw.configHash } +// MetricSets return the list of metricsets of the module +func (mw *Wrapper) MetricSets() []*metricSetWrapper { + return mw.metricSets +} + // metricSetWrapper methods func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) { @@ -188,7 +194,7 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) { // startPeriodicFetching performs an immediate fetch for the MetricSet then it // begins a continuous timer scheduled loop to fetch data. To stop the loop the // done channel should be closed. -func (msw *metricSetWrapper) startPeriodicFetching(reporter *eventReporter) { +func (msw *metricSetWrapper) startPeriodicFetching(reporter reporter) { // Fetch immediately. msw.fetch(reporter) @@ -197,7 +203,7 @@ func (msw *metricSetWrapper) startPeriodicFetching(reporter *eventReporter) { defer t.Stop() for { select { - case <-reporter.done: + case <-reporter.Done(): return case <-t.C: msw.fetch(reporter) @@ -208,7 +214,7 @@ func (msw *metricSetWrapper) startPeriodicFetching(reporter *eventReporter) { // fetch invokes the appropriate Fetch method for the MetricSet and publishes // the result using the publisher client. This method will recover from panics // and log a stack track if one occurs. -func (msw *metricSetWrapper) fetch(reporter *eventReporter) { +func (msw *metricSetWrapper) fetch(reporter reporter) { switch fetcher := msw.MetricSet.(type) { case mb.EventFetcher: msw.singleEventFetch(fetcher, reporter) @@ -221,14 +227,14 @@ func (msw *metricSetWrapper) fetch(reporter *eventReporter) { } } -func (msw *metricSetWrapper) singleEventFetch(fetcher mb.EventFetcher, reporter *eventReporter) { - reporter.startFetchTimer() +func (msw *metricSetWrapper) singleEventFetch(fetcher mb.EventFetcher, reporter reporter) { + reporter.StartFetchTimer() event, err := fetcher.Fetch() reporter.ErrorWith(err, event) } -func (msw *metricSetWrapper) multiEventFetch(fetcher mb.EventsFetcher, reporter *eventReporter) { - reporter.startFetchTimer() +func (msw *metricSetWrapper) multiEventFetch(fetcher mb.EventsFetcher, reporter reporter) { + reporter.StartFetchTimer() events, err := fetcher.Fetch() if len(events) == 0 { reporter.ErrorWith(err, nil) @@ -239,8 +245,8 @@ func (msw *metricSetWrapper) multiEventFetch(fetcher mb.EventsFetcher, reporter } } -func (msw *metricSetWrapper) reportingFetch(fetcher mb.ReportingMetricSet, reporter *eventReporter) { - reporter.startFetchTimer() +func (msw *metricSetWrapper) reportingFetch(fetcher mb.ReportingMetricSet, reporter reporter) { + reporter.StartFetchTimer() fetcher.Fetch(reporter) } @@ -259,8 +265,30 @@ func (msw *metricSetWrapper) String() string { msw.module.Name(), msw.Name(), msw.Host()) } +func (msw *metricSetWrapper) Test(d testing.Driver) { + done := make(chan struct{}) + d.Run(msw.Name(), func(d testing.Driver) { + // ReportingMetricSet would hang out forever, perhaps we can add a timeout based test in the future + if _, ok := msw.MetricSet.(mb.ReportingMetricSet); ok { + d.Warn("test", "metricset doesn't support testing") + return + } + + reporter := &testingReporter{ + driver: d, + done: done, + } + msw.fetch(reporter) + }) +} + // Reporter implementation +type reporter interface { + mb.PushReporter + StartFetchTimer() +} + // eventReporter implements the Reporter interface which is a callback interface // used by MetricSet implementations to report an event(s), an error, or an error // with some additional metadata. @@ -273,7 +301,7 @@ type eventReporter struct { // startFetchTimer demarcates the start of a new fetch. The elapsed time of a // fetch is computed based on the time of this call. -func (r *eventReporter) startFetchTimer() { +func (r *eventReporter) StartFetchTimer() { r.start = time.Now() } diff --git a/metricbeat/tests/system/test_cmd.py b/metricbeat/tests/system/test_cmd.py index a31df610d4e3..44c54e6f7565 100644 --- a/metricbeat/tests/system/test_cmd.py +++ b/metricbeat/tests/system/test_cmd.py @@ -1,4 +1,5 @@ import os +import shutil import metricbeat @@ -109,5 +110,77 @@ def test_modules_disable(self): assert not os.path.exists(self.working_dir + "/modules.d/enabled2.yml") assert not os.path.exists(self.working_dir + "/modules.d/enabled3.yml") + def test_modules_test(self): + """ + Test test modules command + """ + self.write_system_yml() + + exit_code = self.run_beat( + extra_args=["test", "modules"]) + + assert exit_code == 0 + assert self.log_contains("cpu...OK") + assert self.log_contains("memory...OK") + + def test_modules_test_error(self): + """ + Test test modules command with an error result + """ + self.write_system_yml() + self.write_nginx_yml() + + exit_code = self.run_beat( + extra_args=["test", "modules"]) + + assert exit_code == 0 + assert self.log_contains("ERROR error making http request") + assert self.log_contains("cpu...OK") + assert self.log_contains("memory...OK") + + def test_modules_test_filter_no_result(self): + """ + Test test modules command filter by module (no result) + """ + self.write_system_yml() + + exit_code = self.run_beat( + extra_args=["test", "modules", "apache"]) + + assert exit_code == 0 + assert not self.log_contains("OK") + + def test_modules_test_filter(self): + """ + Test test modules command filter by metricset + """ + self.write_system_yml() + self.write_nginx_yml() + + exit_code = self.run_beat( + extra_args=["test", "modules", "system", "cpu"]) + + assert exit_code == 0 + assert self.log_contains("cpu...OK") + assert not self.log_contains("memory...OK") + def touch(self, path): open(path, 'a').close() + + def write_system_yml(self): + with open(self.working_dir + "/modules.d/system.yml", "wb") as f: + f.write(""" +- module: system + period: 10s + metricsets: + - cpu + - memory""") + + def write_nginx_yml(self): + with open(self.working_dir + "/modules.d/nginx.yml", "wb") as f: + f.write(""" +- module: nginx + period: 10s + hosts: ["errorhost:80"] + metricsets: + - stubstatus""")