Skip to content

Commit

Permalink
Clean up config checks / fix heartbeat NPE (#11165)
Browse files Browse the repository at this point in the history
This fixes a bug where heartbeat would encounter an NPE while attempting to free resources during a config check and cleans up some internal interfaces.

This change lets beats choose whether their factories implement a special config check function. If not, it is assumed their creation function will return an error and that this can be used for checking the config. It is further assumed that creation with the factory is fully GC-able, which is not the case for heartbeat, hence the need for a custom CheckConfig method.
  • Loading branch information
andrewvc authored May 6, 2019
1 parent 431a589 commit dcce078
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
*Heartbeat*

- Fix NPE on some monitor configuration errors. {pull}11910[11910]
- Fix NPEs / resource leaks when executing config checks. {pull}11165[11165]

*Journalbeat*

Expand Down
13 changes: 11 additions & 2 deletions filebeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,19 @@ func (m *AutodiscoverAdapter) CreateConfig(e bus.Event) ([]*common.Config, error

// CheckConfig tests given config to check if it will work or not, returns errors in case it won't work
func (m *AutodiscoverAdapter) CheckConfig(c *common.Config) error {
var factory cfgfile.RunnerFactory

if c.HasField("module") {
return m.moduleFactory.CheckConfig(c)
factory = m.moduleFactory
} else {
factory = m.inputFactory
}

if checker, ok := factory.(cfgfile.ConfigChecker); ok {
return checker.CheckConfig(c)
}
return m.inputFactory.CheckConfig(c)

return nil
}

// Create a module or input from the given config
Expand Down
6 changes: 0 additions & 6 deletions filebeat/fileset/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,6 @@ func (f *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrP
}, nil
}

// CheckConfig checks if a config is valid or not
func (f *Factory) CheckConfig(config *common.Config) error {
// TODO: add code here once we know that spinning up a filebeat input to check for errors doesn't cause memory leaks.
return nil
}

func (p *inputsRunner) Start() {
// Load pipelines
if p.pipelineLoaderFactory != nil {
Expand Down
6 changes: 0 additions & 6 deletions filebeat/input/runnerfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,3 @@ func (r *RunnerFactory) Create(

return p, nil
}

// CheckConfig checks if a config is valid or not
func (r *RunnerFactory) CheckConfig(config *common.Config) error {
// TODO: add code here once we know that spinning up a filebeat input to check for errors doesn't cause memory leaks.
return nil
}
29 changes: 29 additions & 0 deletions heartbeat/monitors/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ func mockPluginBuilder() pluginBuilder {
reg := monitoring.NewRegistry()

return pluginBuilder{"test", ActiveMonitor, func(s string, config *common.Config) ([]jobs.Job, int, error) {
// Declare a real config block with a required attr so we can see what happens when it doesn't work
unpacked := struct {
URLs []string `config:"urls" validate:"required"`
}{}
err := config.Unpack(&unpacked)
if err != nil {
return nil, 0, err
}
c := common.Config{}
j, err := createMockJob("test", &c)
return j, 1, err
Expand Down Expand Up @@ -164,6 +172,27 @@ func mockPluginConf(t *testing.T, id string, schedule string, url string) *commo
return conf
}

// mockBadPluginConf returns a conf with an invalid plugin config.
// This should fail after the generic plugin checks fail since the HTTP plugin requires 'urls' to be set.
func mockBadPluginConf(t *testing.T, id string, schedule string) *common.Config {
confMap := map[string]interface{}{
"type": "test",
"notanoption": []string{"foo"},
"schedule": schedule,
}

if id != "" {
confMap["id"] = id
}

conf, err := common.NewConfigFrom(confMap)
require.NoError(t, err)

return conf
}

// mockInvalidPlugin conf returns a config that invalid at the basic level of
// what's expected in heartbeat, i.e. no type.
func mockInvalidPluginConf(t *testing.T) *common.Config {
confMap := map[string]interface{}{
"hoeutnheou": "oueanthoue",
Expand Down
30 changes: 24 additions & 6 deletions heartbeat/monitors/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,31 @@ func (e ErrDuplicateMonitorID) Error() string {
return fmt.Sprintf("monitor ID %s is configured for multiple monitors! IDs must be unique values.", e.ID)
}

// newMonitor Creates a new monitor, without leaking resources in the event of an error.
func newMonitor(
config *common.Config,
registrar *pluginsReg,
pipelineConnector beat.PipelineConnector,
scheduler *scheduler.Scheduler,
allowWatches bool,
factoryMetadata *common.MapStrPointer,
) (*Monitor, error) {
m, err := newMonitorUnsafe(config, registrar, pipelineConnector, scheduler, allowWatches, factoryMetadata)
if m != nil && err != nil {
m.Stop()
}
return m, err
}

// newMonitorUnsafe is the unsafe way of creating a new monitor because it may return a monitor instance along with an
// error without freeing monitor resources. m.Stop() must always be called on a non-nil monitor to free resources.
func newMonitorUnsafe(
config *common.Config,
registrar *pluginsReg,
pipelineConnector beat.PipelineConnector,
scheduler *scheduler.Scheduler,
allowWatches bool,
factoryMetadata *common.MapStrPointer,
) (*Monitor, error) {
// Extract just the Id, Type, and Enabled fields from the config
// We'll parse things more precisely later once we know what exact type of
Expand Down Expand Up @@ -133,13 +151,13 @@ func newMonitor(
if m.id != "" {
// Ensure we don't have duplicate IDs
if _, loaded := uniqueMonitorIDs.LoadOrStore(m.id, m); loaded {
return nil, ErrDuplicateMonitorID{m.id}
return m, ErrDuplicateMonitorID{m.id}
}
} else {
// If there's no explicit ID generate one
hash, err := m.configHash()
if err != nil {
return nil, err
return m, err
}
m.id = fmt.Sprintf("auto-%s-%#X", m.typ, hash)
}
Expand All @@ -149,22 +167,22 @@ func newMonitor(
m.endpoints = endpoints

if err != nil {
return nil, fmt.Errorf("job err %v", err)
return m, fmt.Errorf("job err %v", err)
}

m.configuredJobs, err = m.makeTasks(config, wrappedJobs)
if err != nil {
return nil, err
return m, err
}

err = m.makeWatchTasks(monitorPlugin)
if err != nil {
return nil, err
return m, err
}

if len(m.watchPollTasks) > 0 {
if !allowWatches {
return nil, ErrWatchesDisabled
return m, ErrWatchesDisabled
}

logp.Info(`Obsolete option 'watch.poll_file' declared. This will be removed in a future release.
Expand Down
12 changes: 9 additions & 3 deletions heartbeat/monitors/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func TestMonitor(t *testing.T) {

func TestDuplicateMonitorIDs(t *testing.T) {
serverMonConf := mockPluginConf(t, "custom", "@every 1ms", "http://example.net")
badConf := mockBadPluginConf(t, "custom", "@every 1ms")
reg := mockPluginsReg()
pipelineConnector := &MockPipelineConnector{}

Expand All @@ -90,14 +91,19 @@ func TestDuplicateMonitorIDs(t *testing.T) {
return newMonitor(serverMonConf, reg, pipelineConnector, sched, false, nil)
}

// Ensure that an error is returned on a bad config
_, m0Err := newMonitor(badConf, reg, pipelineConnector, sched, false, nil)
require.Error(t, m0Err)

// Would fail if the previous newMonitor didn't free the monitor.id
m1, m1Err := makeTestMon()
assert.NoError(t, m1Err)
require.NoError(t, m1Err)
_, m2Err := makeTestMon()
assert.Error(t, m2Err)
require.Error(t, m2Err)

m1.Stop()
_, m3Err := makeTestMon()
assert.NoError(t, m3Err)
require.NoError(t, m3Err)
}

func TestCheckInvalidConfig(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Adapter interface {
CreateConfig(bus.Event) ([]*common.Config, error)

// RunnerFactory provides runner creation by feeding valid configs
cfgfile.RunnerFactory
cfgfile.CheckableRunnerFactory

// EventFilter returns the bus filter to retrieve runner start/stop triggering events
EventFilter() []string
Expand Down
4 changes: 2 additions & 2 deletions libbeat/autodiscover/factoryadapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import (

// FactoryAdapter is an adapter that works with any cfgfile.RunnerFactory.
type FactoryAdapter struct {
factory cfgfile.RunnerFactory
factory cfgfile.CheckableRunnerFactory
}

// NewFactoryAdapter builds and returns an autodiscover adapter that works with any cfgfile.RunnerFactory.
func NewFactoryAdapter(factory cfgfile.RunnerFactory) *FactoryAdapter {
func NewFactoryAdapter(factory cfgfile.CheckableRunnerFactory) *FactoryAdapter {
return &FactoryAdapter{
factory: factory,
}
Expand Down
19 changes: 18 additions & 1 deletion libbeat/cfgfile/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,20 @@ type Reload struct {
// RunnerFactory is used for creating of new Runners
type RunnerFactory interface {
Create(p beat.Pipeline, config *common.Config, meta *common.MapStrPointer) (Runner, error)
}

// ConfigChecker is usually combined with a RunnerFactory for implementations that can check a config
// without a pipeline and metadata.
type ConfigChecker interface {
CheckConfig(config *common.Config) error
}

// CheckableRunnerFactory is the union of RunnerFactory and ConfigChecker.
type CheckableRunnerFactory interface {
RunnerFactory
ConfigChecker
}

// Runner is a simple interface providing a simple way to
// Start and Stop Reloader
type Runner interface {
Expand Down Expand Up @@ -142,7 +153,13 @@ func (rl *Reloader) Check(runnerFactory RunnerFactory) error {
if !c.Config.Enabled() {
continue
}
_, err := runnerFactory.Create(rl.pipeline, c.Config, c.Meta)

if checker, ok := runnerFactory.(ConfigChecker); ok {
err = checker.CheckConfig(c.Config)
} else {
_, err = runnerFactory.Create(rl.pipeline, c.Config, c.Meta)
}

if err != nil {
return err
}
Expand Down

0 comments on commit dcce078

Please sign in to comment.