diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 97cd4ab4d70..0413976eb08 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -91,6 +91,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Heartbeat* - Fix NPE on some monitor configuration errors. {pull}11910[11910] +- Fix NPEs / resource leaks when executing config checks. {pull}11165[11165] *Journalbeat* diff --git a/filebeat/autodiscover/autodiscover.go b/filebeat/autodiscover/autodiscover.go index decd3455724..faa0475163f 100644 --- a/filebeat/autodiscover/autodiscover.go +++ b/filebeat/autodiscover/autodiscover.go @@ -51,10 +51,19 @@ func (m *AutodiscoverAdapter) CreateConfig(e bus.Event) ([]*common.Config, error // CheckConfig tests given config to check if it will work or not, returns errors in case it won't work func (m *AutodiscoverAdapter) CheckConfig(c *common.Config) error { + var factory cfgfile.RunnerFactory + if c.HasField("module") { - return m.moduleFactory.CheckConfig(c) + factory = m.moduleFactory + } else { + factory = m.inputFactory + } + + if checker, ok := factory.(cfgfile.ConfigChecker); ok { + return checker.CheckConfig(c) } - return m.inputFactory.CheckConfig(c) + + return nil } // Create a module or input from the given config diff --git a/filebeat/fileset/factory.go b/filebeat/fileset/factory.go index bba19f2969d..8bf3443f6dd 100644 --- a/filebeat/fileset/factory.go +++ b/filebeat/fileset/factory.go @@ -117,12 +117,6 @@ func (f *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrP }, nil } -// CheckConfig checks if a config is valid or not -func (f *Factory) CheckConfig(config *common.Config) error { - // TODO: add code here once we know that spinning up a filebeat input to check for errors doesn't cause memory leaks. - return nil -} - func (p *inputsRunner) Start() { // Load pipelines if p.pipelineLoaderFactory != nil { diff --git a/filebeat/input/runnerfactory.go b/filebeat/input/runnerfactory.go index 19b2b246cb8..fb21ae330c1 100644 --- a/filebeat/input/runnerfactory.go +++ b/filebeat/input/runnerfactory.go @@ -56,9 +56,3 @@ func (r *RunnerFactory) Create( return p, nil } - -// CheckConfig checks if a config is valid or not -func (r *RunnerFactory) CheckConfig(config *common.Config) error { - // TODO: add code here once we know that spinning up a filebeat input to check for errors doesn't cause memory leaks. - return nil -} diff --git a/heartbeat/monitors/mocks_test.go b/heartbeat/monitors/mocks_test.go index 4effc437e4c..f59064be324 100644 --- a/heartbeat/monitors/mocks_test.go +++ b/heartbeat/monitors/mocks_test.go @@ -135,6 +135,14 @@ func mockPluginBuilder() pluginBuilder { reg := monitoring.NewRegistry() return pluginBuilder{"test", ActiveMonitor, func(s string, config *common.Config) ([]jobs.Job, int, error) { + // Declare a real config block with a required attr so we can see what happens when it doesn't work + unpacked := struct { + URLs []string `config:"urls" validate:"required"` + }{} + err := config.Unpack(&unpacked) + if err != nil { + return nil, 0, err + } c := common.Config{} j, err := createMockJob("test", &c) return j, 1, err @@ -164,6 +172,27 @@ func mockPluginConf(t *testing.T, id string, schedule string, url string) *commo return conf } +// mockBadPluginConf returns a conf with an invalid plugin config. +// This should fail after the generic plugin checks fail since the HTTP plugin requires 'urls' to be set. +func mockBadPluginConf(t *testing.T, id string, schedule string) *common.Config { + confMap := map[string]interface{}{ + "type": "test", + "notanoption": []string{"foo"}, + "schedule": schedule, + } + + if id != "" { + confMap["id"] = id + } + + conf, err := common.NewConfigFrom(confMap) + require.NoError(t, err) + + return conf +} + +// mockInvalidPlugin conf returns a config that invalid at the basic level of +// what's expected in heartbeat, i.e. no type. func mockInvalidPluginConf(t *testing.T) *common.Config { confMap := map[string]interface{}{ "hoeutnheou": "oueanthoue", diff --git a/heartbeat/monitors/monitor.go b/heartbeat/monitors/monitor.go index b1aaf3c9660..c11fecca2c0 100644 --- a/heartbeat/monitors/monitor.go +++ b/heartbeat/monitors/monitor.go @@ -94,6 +94,7 @@ func (e ErrDuplicateMonitorID) Error() string { return fmt.Sprintf("monitor ID %s is configured for multiple monitors! IDs must be unique values.", e.ID) } +// newMonitor Creates a new monitor, without leaking resources in the event of an error. func newMonitor( config *common.Config, registrar *pluginsReg, @@ -101,6 +102,23 @@ func newMonitor( scheduler *scheduler.Scheduler, allowWatches bool, factoryMetadata *common.MapStrPointer, +) (*Monitor, error) { + m, err := newMonitorUnsafe(config, registrar, pipelineConnector, scheduler, allowWatches, factoryMetadata) + if m != nil && err != nil { + m.Stop() + } + return m, err +} + +// newMonitorUnsafe is the unsafe way of creating a new monitor because it may return a monitor instance along with an +// error without freeing monitor resources. m.Stop() must always be called on a non-nil monitor to free resources. +func newMonitorUnsafe( + config *common.Config, + registrar *pluginsReg, + pipelineConnector beat.PipelineConnector, + scheduler *scheduler.Scheduler, + allowWatches bool, + factoryMetadata *common.MapStrPointer, ) (*Monitor, error) { // Extract just the Id, Type, and Enabled fields from the config // We'll parse things more precisely later once we know what exact type of @@ -133,13 +151,13 @@ func newMonitor( if m.id != "" { // Ensure we don't have duplicate IDs if _, loaded := uniqueMonitorIDs.LoadOrStore(m.id, m); loaded { - return nil, ErrDuplicateMonitorID{m.id} + return m, ErrDuplicateMonitorID{m.id} } } else { // If there's no explicit ID generate one hash, err := m.configHash() if err != nil { - return nil, err + return m, err } m.id = fmt.Sprintf("auto-%s-%#X", m.typ, hash) } @@ -149,22 +167,22 @@ func newMonitor( m.endpoints = endpoints if err != nil { - return nil, fmt.Errorf("job err %v", err) + return m, fmt.Errorf("job err %v", err) } m.configuredJobs, err = m.makeTasks(config, wrappedJobs) if err != nil { - return nil, err + return m, err } err = m.makeWatchTasks(monitorPlugin) if err != nil { - return nil, err + return m, err } if len(m.watchPollTasks) > 0 { if !allowWatches { - return nil, ErrWatchesDisabled + return m, ErrWatchesDisabled } logp.Info(`Obsolete option 'watch.poll_file' declared. This will be removed in a future release. diff --git a/heartbeat/monitors/monitor_test.go b/heartbeat/monitors/monitor_test.go index 4f224edc660..bf04d075e27 100644 --- a/heartbeat/monitors/monitor_test.go +++ b/heartbeat/monitors/monitor_test.go @@ -78,6 +78,7 @@ func TestMonitor(t *testing.T) { func TestDuplicateMonitorIDs(t *testing.T) { serverMonConf := mockPluginConf(t, "custom", "@every 1ms", "http://example.net") + badConf := mockBadPluginConf(t, "custom", "@every 1ms") reg := mockPluginsReg() pipelineConnector := &MockPipelineConnector{} @@ -90,14 +91,19 @@ func TestDuplicateMonitorIDs(t *testing.T) { return newMonitor(serverMonConf, reg, pipelineConnector, sched, false, nil) } + // Ensure that an error is returned on a bad config + _, m0Err := newMonitor(badConf, reg, pipelineConnector, sched, false, nil) + require.Error(t, m0Err) + + // Would fail if the previous newMonitor didn't free the monitor.id m1, m1Err := makeTestMon() - assert.NoError(t, m1Err) + require.NoError(t, m1Err) _, m2Err := makeTestMon() - assert.Error(t, m2Err) + require.Error(t, m2Err) m1.Stop() _, m3Err := makeTestMon() - assert.NoError(t, m3Err) + require.NoError(t, m3Err) } func TestCheckInvalidConfig(t *testing.T) { diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index 81415e8513e..8fdef5dc5c2 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -46,7 +46,7 @@ type Adapter interface { CreateConfig(bus.Event) ([]*common.Config, error) // RunnerFactory provides runner creation by feeding valid configs - cfgfile.RunnerFactory + cfgfile.CheckableRunnerFactory // EventFilter returns the bus filter to retrieve runner start/stop triggering events EventFilter() []string diff --git a/libbeat/autodiscover/factoryadapter.go b/libbeat/autodiscover/factoryadapter.go index f2a3000b8d6..358addd6bb9 100644 --- a/libbeat/autodiscover/factoryadapter.go +++ b/libbeat/autodiscover/factoryadapter.go @@ -28,11 +28,11 @@ import ( // FactoryAdapter is an adapter that works with any cfgfile.RunnerFactory. type FactoryAdapter struct { - factory cfgfile.RunnerFactory + factory cfgfile.CheckableRunnerFactory } // NewFactoryAdapter builds and returns an autodiscover adapter that works with any cfgfile.RunnerFactory. -func NewFactoryAdapter(factory cfgfile.RunnerFactory) *FactoryAdapter { +func NewFactoryAdapter(factory cfgfile.CheckableRunnerFactory) *FactoryAdapter { return &FactoryAdapter{ factory: factory, } diff --git a/libbeat/cfgfile/reload.go b/libbeat/cfgfile/reload.go index df613e8255c..778b9c10718 100644 --- a/libbeat/cfgfile/reload.go +++ b/libbeat/cfgfile/reload.go @@ -68,9 +68,20 @@ type Reload struct { // RunnerFactory is used for creating of new Runners type RunnerFactory interface { Create(p beat.Pipeline, config *common.Config, meta *common.MapStrPointer) (Runner, error) +} + +// ConfigChecker is usually combined with a RunnerFactory for implementations that can check a config +// without a pipeline and metadata. +type ConfigChecker interface { CheckConfig(config *common.Config) error } +// CheckableRunnerFactory is the union of RunnerFactory and ConfigChecker. +type CheckableRunnerFactory interface { + RunnerFactory + ConfigChecker +} + // Runner is a simple interface providing a simple way to // Start and Stop Reloader type Runner interface { @@ -142,7 +153,13 @@ func (rl *Reloader) Check(runnerFactory RunnerFactory) error { if !c.Config.Enabled() { continue } - _, err := runnerFactory.Create(rl.pipeline, c.Config, c.Meta) + + if checker, ok := runnerFactory.(ConfigChecker); ok { + err = checker.CheckConfig(c.Config) + } else { + _, err = runnerFactory.Create(rl.pipeline, c.Config, c.Meta) + } + if err != nil { return err }