diff --git a/CHANGELOG-developer.asciidoc b/CHANGELOG-developer.asciidoc index 6019b71dc0f..cce92d9e36a 100644 --- a/CHANGELOG-developer.asciidoc +++ b/CHANGELOG-developer.asciidoc @@ -19,5 +19,7 @@ The list below covers the major changes between 6.3.0 and master only. ==== Breaking changes +- The beat.Pipeline is now passed to cfgfile.RunnerFactory. Beats using libbeat for module reloading or autodiscovery need to be adapted. {pull}7018[7017] + ==== Added diff --git a/filebeat/autodiscover/autodiscover.go b/filebeat/autodiscover/autodiscover.go index 08e8c1b4d35..343fc126ba4 100644 --- a/filebeat/autodiscover/autodiscover.go +++ b/filebeat/autodiscover/autodiscover.go @@ -3,6 +3,7 @@ package autodiscover import ( "errors" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/bus" @@ -38,11 +39,11 @@ func (m *AutodiscoverAdapter) CheckConfig(c *common.Config) error { } // Create a module or input from the given config -func (m *AutodiscoverAdapter) Create(c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { +func (m *AutodiscoverAdapter) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { if c.HasField("module") { - return m.moduleFactory.Create(c, meta) + return m.moduleFactory.Create(p, c, meta) } - return m.inputFactory.Create(c, meta) + return m.inputFactory.Create(p, c, meta) } // EventFilter returns the bus filter to retrieve runner start/stop triggering events diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 7976ba90ecc..b9a64d612a4 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -312,7 +312,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { outDone := make(chan struct{}) // outDone closes down all active pipeline connections crawler, err := crawler.New( - channel.NewOutletFactory(outDone, b.Publisher, wgEvents).Create, + channel.NewOutletFactory(outDone, wgEvents).Create, config.Inputs, b.Info.Version, fb.done, @@ -358,7 +358,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { logp.Debug("modules", "Existing Ingest pipelines will be updated") } - err = crawler.Start(registrar, config.ConfigInput, config.ConfigModules, pipelineLoaderFactory, config.OverwritePipelines) + err = crawler.Start(b.Publisher, registrar, config.ConfigInput, config.ConfigModules, pipelineLoaderFactory, config.OverwritePipelines) if err != nil { crawler.Stop() return err @@ -377,7 +377,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { var adiscover *autodiscover.Autodiscover if fb.config.Autodiscover != nil { adapter := fbautodiscover.NewAutodiscoverAdapter(crawler.InputsFactory, crawler.ModulesFactory) - adiscover, err = autodiscover.NewAutodiscover("filebeat", adapter, config.Autodiscover) + adiscover, err = autodiscover.NewAutodiscover("filebeat", b.Publisher, adapter, config.Autodiscover) if err != nil { return err } diff --git a/filebeat/channel/factory.go b/filebeat/channel/factory.go index cf3b2de30dd..2f3e2f695fc 100644 --- a/filebeat/channel/factory.go +++ b/filebeat/channel/factory.go @@ -7,8 +7,7 @@ import ( ) type OutletFactory struct { - done <-chan struct{} - pipeline beat.Pipeline + done <-chan struct{} eventer beat.ClientEventer wgEvents eventCounter @@ -47,12 +46,10 @@ type inputOutletConfig struct { // connecting an input to the publisher pipeline. func NewOutletFactory( done <-chan struct{}, - pipeline beat.Pipeline, wgEvents eventCounter, ) *OutletFactory { o := &OutletFactory{ done: done, - pipeline: pipeline, wgEvents: wgEvents, } @@ -67,7 +64,7 @@ func NewOutletFactory( // Inputs and all harvesters use the same pipeline client instance. // This guarantees ordering between events as required by the registrar for // file.State updates -func (f *OutletFactory) Create(cfg *common.Config, dynFields *common.MapStrPointer) (Outleter, error) { +func (f *OutletFactory) Create(p beat.Pipeline, cfg *common.Config, dynFields *common.MapStrPointer) (Outleter, error) { config := inputOutletConfig{} if err := cfg.Unpack(&config); err != nil { return nil, err @@ -104,7 +101,7 @@ func (f *OutletFactory) Create(cfg *common.Config, dynFields *common.MapStrPoint } } - client, err := f.pipeline.ConnectWith(beat.ClientConfig{ + client, err := p.ConnectWith(beat.ClientConfig{ PublishMode: beat.GuaranteedSend, EventMetadata: config.EventMetadata, DynamicFields: dynFields, diff --git a/filebeat/channel/interface.go b/filebeat/channel/interface.go index 06bbcd43277..6c66e8c59ce 100644 --- a/filebeat/channel/interface.go +++ b/filebeat/channel/interface.go @@ -2,11 +2,15 @@ package channel import ( "github.com/elastic/beats/filebeat/util" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" ) // Factory is used to create a new Outlet instance -type Factory func(*common.Config, *common.MapStrPointer) (Outleter, error) +type Factory func(beat.Pipeline, *common.Config, *common.MapStrPointer) (Outleter, error) + +// Connector creates an Outlet connecting the event publishing with some internal pipeline. +type Connector func(*common.Config, *common.MapStrPointer) (Outleter, error) // Outleter is the outlet for an input type Outleter interface { diff --git a/filebeat/channel/util.go b/filebeat/channel/util.go index d0806011722..e9c03fba1c2 100644 --- a/filebeat/channel/util.go +++ b/filebeat/channel/util.go @@ -2,6 +2,8 @@ package channel import ( "github.com/elastic/beats/filebeat/util" + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/atomic" ) @@ -12,6 +14,13 @@ type subOutlet struct { res chan bool } +// ConnectTo creates a new Connector, combining a beat.Pipeline with an outlet Factory. +func ConnectTo(pipeline beat.Pipeline, factory Factory) Connector { + return func(cfg *common.Config, m *common.MapStrPointer) (Outleter, error) { + return factory(pipeline, cfg, m) + } +} + // SubOutlet create a sub-outlet, which can be closed individually, without closing the // underlying outlet. func SubOutlet(out Outleter) Outleter { diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index a1f92005625..f916bae02ca 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -9,6 +9,7 @@ import ( "github.com/elastic/beats/filebeat/input/file" input "github.com/elastic/beats/filebeat/prospector" "github.com/elastic/beats/filebeat/registrar" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -42,14 +43,20 @@ func New(out channel.Factory, inputConfigs []*common.Config, beatVersion string, } // Start starts the crawler with all inputs -func (c *Crawler) Start(r *registrar.Registrar, configInputs *common.Config, - configModules *common.Config, pipelineLoaderFactory fileset.PipelineLoaderFactory, overwritePipelines bool) error { +func (c *Crawler) Start( + pipeline beat.Pipeline, + r *registrar.Registrar, + configInputs *common.Config, + configModules *common.Config, + pipelineLoaderFactory fileset.PipelineLoaderFactory, + overwritePipelines bool, +) error { logp.Info("Loading Inputs: %v", len(c.inputConfigs)) // Prospect the globs/paths given on the command line and launch harvesters for _, inputConfig := range c.inputConfigs { - err := c.startInput(inputConfig, r.GetStates()) + err := c.startInput(pipeline, inputConfig, r.GetStates()) if err != nil { return err } @@ -57,7 +64,7 @@ func (c *Crawler) Start(r *registrar.Registrar, configInputs *common.Config, c.InputsFactory = input.NewRunnerFactory(c.out, r, c.beatDone) if configInputs.Enabled() { - c.inputReloader = cfgfile.NewReloader(configInputs) + c.inputReloader = cfgfile.NewReloader(pipeline, configInputs) if err := c.inputReloader.Check(c.InputsFactory); err != nil { return err } @@ -69,7 +76,7 @@ func (c *Crawler) Start(r *registrar.Registrar, configInputs *common.Config, c.ModulesFactory = fileset.NewFactory(c.out, r, c.beatVersion, pipelineLoaderFactory, overwritePipelines, c.beatDone) if configModules.Enabled() { - c.modulesReloader = cfgfile.NewReloader(configModules) + c.modulesReloader = cfgfile.NewReloader(pipeline, configModules) if err := c.modulesReloader.Check(c.ModulesFactory); err != nil { return err } @@ -84,11 +91,17 @@ func (c *Crawler) Start(r *registrar.Registrar, configInputs *common.Config, return nil } -func (c *Crawler) startInput(config *common.Config, states []file.State) error { +func (c *Crawler) startInput( + pipeline beat.Pipeline, + config *common.Config, + states []file.State, +) error { if !config.Enabled() { return nil } - p, err := input.New(config, c.out, c.beatDone, states, nil) + + connector := channel.ConnectTo(pipeline, c.out) + p, err := input.New(config, connector, c.beatDone, states, nil) if err != nil { return fmt.Errorf("Error in initing input: %s", err) } diff --git a/filebeat/fileset/factory.go b/filebeat/fileset/factory.go index 473877fbf4c..a006482d5d9 100644 --- a/filebeat/fileset/factory.go +++ b/filebeat/fileset/factory.go @@ -6,6 +6,7 @@ import ( "github.com/elastic/beats/filebeat/channel" input "github.com/elastic/beats/filebeat/prospector" "github.com/elastic/beats/filebeat/registrar" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -45,7 +46,7 @@ func NewFactory(outlet channel.Factory, registrar *registrar.Registrar, beatVers } // Create creates a module based on a config -func (f *Factory) Create(c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { +func (f *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { // Start a registry of one module: m, err := NewModuleRegistry([]*common.Config{c}, f.beatVersion, false) if err != nil { @@ -66,8 +67,9 @@ func (f *Factory) Create(c *common.Config, meta *common.MapStrPointer) (cfgfile. } inputs := make([]*input.Runner, len(pConfigs)) + connector := channel.ConnectTo(p, f.outlet) for i, pConfig := range pConfigs { - inputs[i], err = input.New(pConfig, f.outlet, f.beatDone, f.registrar.GetStates(), meta) + inputs[i], err = input.New(pConfig, connector, f.beatDone, f.registrar.GetStates(), meta) if err != nil { logp.Err("Error creating input: %s", err) return nil, err diff --git a/filebeat/input/docker/input.go b/filebeat/input/docker/input.go index 4b3fdf557ae..2dd4044dda3 100644 --- a/filebeat/input/docker/input.go +++ b/filebeat/input/docker/input.go @@ -23,7 +23,7 @@ func init() { // NewInput creates a new docker input func NewInput( cfg *common.Config, - outletFactory channel.Factory, + outletFactory channel.Connector, context input.Context, ) (input.Input, error) { cfgwarn.Experimental("Docker input is enabled.") diff --git a/filebeat/input/input.go b/filebeat/input/input.go index 89c882a8244..c00fd3f2c86 100644 --- a/filebeat/input/input.go +++ b/filebeat/input/input.go @@ -34,7 +34,7 @@ type Runner struct { // New instantiates a new Runner func New( conf *common.Config, - outlet channel.Factory, + outlet channel.Connector, beatDone chan struct{}, states []file.State, dynFields *common.MapStrPointer, diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 88be68fde1c..63bb77dafe8 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -52,7 +52,7 @@ type Input struct { // NewInput instantiates a new Log func NewInput( cfg *common.Config, - outlet channel.Factory, + outlet channel.Connector, context input.Context, ) (input.Input, error) { diff --git a/filebeat/input/redis/input.go b/filebeat/input/redis/input.go index dad011feb82..a6a061602b7 100644 --- a/filebeat/input/redis/input.go +++ b/filebeat/input/redis/input.go @@ -31,7 +31,7 @@ type Input struct { } // NewInput creates a new redis input -func NewInput(cfg *common.Config, outletFactory channel.Factory, context input.Context) (input.Input, error) { +func NewInput(cfg *common.Config, outletFactory channel.Connector, context input.Context) (input.Input, error) { cfgwarn.Experimental("Redis slowlog input is enabled.") config := defaultConfig diff --git a/filebeat/input/registry.go b/filebeat/input/registry.go index a94391489fb..e4c3711c7ba 100644 --- a/filebeat/input/registry.go +++ b/filebeat/input/registry.go @@ -16,7 +16,8 @@ type Context struct { DynamicFields *common.MapStrPointer } -type Factory = func(config *common.Config, outletFactory channel.Factory, context Context) (Input, error) +// Factory is used to register functions creating new Input instances. +type Factory = func(config *common.Config, connector channel.Connector, context Context) (Input, error) var registry = make(map[string]Factory) diff --git a/filebeat/input/registry_test.go b/filebeat/input/registry_test.go index a1247f72b23..69526a703cc 100644 --- a/filebeat/input/registry_test.go +++ b/filebeat/input/registry_test.go @@ -9,7 +9,7 @@ import ( "github.com/elastic/beats/libbeat/common" ) -var fakeFactory = func(config *common.Config, outletFactory channel.Factory, context Context) (Input, error) { +var fakeFactory = func(_ *common.Config, _ channel.Connector, _ Context) (Input, error) { return nil, nil } @@ -28,8 +28,7 @@ func TestAddNilFactory(t *testing.T) { } func TestAddFactoryTwice(t *testing.T) { - var err error - err = Register("name", fakeFactory) + err := Register("name", fakeFactory) if err != nil { t.Fatal(err) } diff --git a/filebeat/input/runnerfactory.go b/filebeat/input/runnerfactory.go index ae557f8ecb0..fabeb1467ef 100644 --- a/filebeat/input/runnerfactory.go +++ b/filebeat/input/runnerfactory.go @@ -3,6 +3,7 @@ package input import ( "github.com/elastic/beats/filebeat/channel" "github.com/elastic/beats/filebeat/registrar" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" ) @@ -24,8 +25,13 @@ func NewRunnerFactory(outlet channel.Factory, registrar *registrar.Registrar, be } // Create creates a input based on a config -func (r *RunnerFactory) Create(c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { - p, err := New(c, r.outlet, r.beatDone, r.registrar.GetStates(), meta) +func (r *RunnerFactory) Create( + pipeline beat.Pipeline, + c *common.Config, + meta *common.MapStrPointer, +) (cfgfile.Runner, error) { + connector := channel.ConnectTo(pipeline, r.outlet) + p, err := New(c, connector, r.beatDone, r.registrar.GetStates(), meta) if err != nil { // In case of error with loading state, input is still returned return p, err diff --git a/filebeat/input/stdin/input.go b/filebeat/input/stdin/input.go index 025eb064d81..b11ef4ec922 100644 --- a/filebeat/input/stdin/input.go +++ b/filebeat/input/stdin/input.go @@ -30,7 +30,7 @@ type Input struct { // NewInput creates a new stdin input // This input contains one harvester which is reading from stdin -func NewInput(cfg *common.Config, outlet channel.Factory, context input.Context) (input.Input, error) { +func NewInput(cfg *common.Config, outlet channel.Connector, context input.Context) (input.Input, error) { out, err := outlet(cfg, context.DynamicFields) if err != nil { return nil, err diff --git a/filebeat/input/syslog/input.go b/filebeat/input/syslog/input.go index 7208c0f2cad..6a9fbfcd7c1 100644 --- a/filebeat/input/syslog/input.go +++ b/filebeat/input/syslog/input.go @@ -88,7 +88,7 @@ type Input struct { // NewInput creates a new syslog input func NewInput( cfg *common.Config, - outlet channel.Factory, + outlet channel.Connector, context input.Context, ) (input.Input, error) { cfgwarn.Experimental("Syslog input type is used") diff --git a/filebeat/input/tcp/input.go b/filebeat/input/tcp/input.go index 96f66ed4780..e65bc28221b 100644 --- a/filebeat/input/tcp/input.go +++ b/filebeat/input/tcp/input.go @@ -36,7 +36,7 @@ type Input struct { // NewInput creates a new TCP input func NewInput( cfg *common.Config, - outlet channel.Factory, + outlet channel.Connector, context input.Context, ) (input.Input, error) { cfgwarn.Experimental("TCP input type is used") diff --git a/filebeat/input/udp/input.go b/filebeat/input/udp/input.go index 0f6ff183c88..751b97a75d0 100644 --- a/filebeat/input/udp/input.go +++ b/filebeat/input/udp/input.go @@ -34,7 +34,7 @@ type Input struct { // NewInput creates a new udp input func NewInput( cfg *common.Config, - outlet channel.Factory, + outlet channel.Connector, context input.Context, ) (input.Input, error) { cfgwarn.Experimental("UDP input type is used") diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index cd7b557168d..37f70160fb4 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -2,6 +2,7 @@ package autodiscover import ( "github.com/elastic/beats/libbeat/autodiscover/meta" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/bus" @@ -33,17 +34,18 @@ type Adapter interface { // Autodiscover process, it takes a beat adapter and user config and runs autodiscover process, spawning // new modules when any configured providers does a match type Autodiscover struct { - bus bus.Bus - adapter Adapter - providers []Provider - runners *cfgfile.Registry - meta *meta.Map + bus bus.Bus + defaultPipeline beat.Pipeline + adapter Adapter + providers []Provider + runners *cfgfile.Registry + meta *meta.Map listener bus.Listener } // NewAutodiscover instantiates and returns a new Autodiscover manager -func NewAutodiscover(name string, adapter Adapter, config *Config) (*Autodiscover, error) { +func NewAutodiscover(name string, pipeline beat.Pipeline, adapter Adapter, config *Config) (*Autodiscover, error) { // Init Event bus bus := bus.New(name) @@ -59,11 +61,12 @@ func NewAutodiscover(name string, adapter Adapter, config *Config) (*Autodiscove } return &Autodiscover{ - bus: bus, - adapter: adapter, - runners: cfgfile.NewRegistry(), - providers: providers, - meta: meta.NewMap(), + bus: bus, + defaultPipeline: pipeline, + adapter: adapter, + runners: cfgfile.NewRegistry(), + providers: providers, + meta: meta.NewMap(), }, nil } @@ -135,7 +138,7 @@ func (a *Autodiscover) handleStart(event bus.Event) { continue } - runner, err := a.adapter.Create(config, &dynFields) + runner, err := a.adapter.Create(a.defaultPipeline, config, &dynFields) if err != nil { logp.Debug(debugK, "Failed to create runner with config %v: %v", config, err) continue diff --git a/libbeat/autodiscover/autodiscover_test.go b/libbeat/autodiscover/autodiscover_test.go index 449fedfe0e7..a771d98d47f 100644 --- a/libbeat/autodiscover/autodiscover_test.go +++ b/libbeat/autodiscover/autodiscover_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/bus" @@ -56,7 +57,7 @@ func (m *mockAdapter) CheckConfig(*common.Config) error { return nil } -func (m *mockAdapter) Create(config *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { +func (m *mockAdapter) Create(_ beat.Pipeline, config *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { runner := &mockRunner{ config: config, meta: meta, @@ -127,7 +128,7 @@ func TestAutodiscover(t *testing.T) { } // Create autodiscover manager - autodiscover, err := NewAutodiscover("test", &adapter, &config) + autodiscover, err := NewAutodiscover("test", nil, &adapter, &config) if err != nil { t.Fatal(err) } @@ -233,7 +234,7 @@ func TestAutodiscoverHash(t *testing.T) { } // Create autodiscover manager - autodiscover, err := NewAutodiscover("test", &adapter, &config) + autodiscover, err := NewAutodiscover("test", nil, &adapter, &config) if err != nil { t.Fatal(err) } diff --git a/libbeat/cfgfile/reload.go b/libbeat/cfgfile/reload.go index 116f5b25f48..9b549e402bd 100644 --- a/libbeat/cfgfile/reload.go +++ b/libbeat/cfgfile/reload.go @@ -9,6 +9,7 @@ import ( "github.com/mitchellh/hashstructure" "github.com/pkg/errors" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" @@ -45,7 +46,7 @@ type Reload struct { } type RunnerFactory interface { - Create(config *common.Config, meta *common.MapStrPointer) (Runner, error) + Create(p beat.Pipeline, config *common.Config, meta *common.MapStrPointer) (Runner, error) } type Runner interface { @@ -55,6 +56,7 @@ type Runner interface { // Reloader is used to register and reload modules type Reloader struct { + pipeline beat.Pipeline registry *Registry config DynamicConfig path string @@ -63,7 +65,7 @@ type Reloader struct { } // NewReloader creates new Reloader instance for the given config -func NewReloader(cfg *common.Config) *Reloader { +func NewReloader(pipeline beat.Pipeline, cfg *common.Config) *Reloader { config := DefaultDynamicConfig cfg.Unpack(&config) @@ -73,6 +75,7 @@ func NewReloader(cfg *common.Config) *Reloader { } return &Reloader{ + pipeline: pipeline, registry: NewRegistry(), config: config, path: path, @@ -109,7 +112,7 @@ func (rl *Reloader) Check(runnerFactory RunnerFactory) error { if !c.Enabled() { continue } - _, err := runnerFactory.Create(c, nil) + _, err := runnerFactory.Create(rl.pipeline, c, nil) if err != nil { return err } @@ -196,7 +199,7 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) { // As module already exist, it must be removed from the stop list and not started if !rl.registry.Has(hash) { debugf("Add module to startlist: %v", hash) - runner, err := runnerFactory.Create(c, nil) + runner, err := runnerFactory.Create(rl.pipeline, c, nil) if err != nil { logp.Err("Unable to create runner due to error: %v", err) continue diff --git a/metricbeat/autodiscover/autodiscover.go b/metricbeat/autodiscover/autodiscover.go index b9ceffd9890..80fbcb35571 100644 --- a/metricbeat/autodiscover/autodiscover.go +++ b/metricbeat/autodiscover/autodiscover.go @@ -3,6 +3,7 @@ package autodiscover import ( "errors" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/bus" @@ -36,8 +37,8 @@ func (m *AutodiscoverAdapter) CheckConfig(c *common.Config) error { } // Create a module or prospector from the given config -func (m *AutodiscoverAdapter) Create(c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { - return m.factory.Create(c, meta) +func (m *AutodiscoverAdapter) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { + return m.factory.Create(p, c, meta) } // EventFilter returns the bus filter to retrieve runner start/stop triggering events diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index 2217ae4ac9a..f0d26263a93 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -146,9 +146,9 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe if config.Autodiscover != nil { var err error - factory := module.NewFactory(b.Publisher, metricbeat.moduleOptions...) + factory := module.NewFactory(metricbeat.moduleOptions...) adapter := mbautodiscover.NewAutodiscoverAdapter(factory) - metricbeat.autodiscover, err = autodiscover.NewAutodiscover("metricbeat", adapter, config.Autodiscover) + metricbeat.autodiscover, err = autodiscover.NewAutodiscover("metricbeat", b.Publisher, adapter, config.Autodiscover) if err != nil { return nil, err } @@ -182,8 +182,8 @@ func (bt *Metricbeat) Run(b *beat.Beat) error { } if bt.config.ConfigModules.Enabled() { - moduleReloader := cfgfile.NewReloader(bt.config.ConfigModules) - factory := module.NewFactory(b.Publisher, bt.moduleOptions...) + moduleReloader := cfgfile.NewReloader(b.Publisher, bt.config.ConfigModules) + factory := module.NewFactory(bt.moduleOptions...) if err := moduleReloader.Check(factory); err != nil { return err diff --git a/metricbeat/mb/module/factory.go b/metricbeat/mb/module/factory.go index e6c38d0a87c..b580c1d2556 100644 --- a/metricbeat/mb/module/factory.go +++ b/metricbeat/mb/module/factory.go @@ -13,26 +13,25 @@ import ( // Factory creates new Runner instances from configuration objects. // It is used to register and reload modules. type Factory struct { - pipeline beat.Pipeline - options []Option + options []Option } // NewFactory creates new Reloader instance for the given config -func NewFactory(p beat.Pipeline, options ...Option) *Factory { +func NewFactory(options ...Option) *Factory { return &Factory{ - pipeline: p, - options: options, + options: options, } } -func (r *Factory) Create(c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { +// Create creates a new metricbeat module runner reporting events to the passed pipeline. +func (r *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { var errs multierror.Errors err := cfgwarn.CheckRemoved5xSettings(c, "filters") if err != nil { errs = append(errs, err) } - connector, err := NewConnector(r.pipeline, c, meta) + connector, err := NewConnector(p, c, meta) if err != nil { errs = append(errs, err) }