Skip to content

Commit

Permalink
Reduce libbeat state being hold in beat for dynamic loading modules (e…
Browse files Browse the repository at this point in the history
…lastic#7018)

Reduce libbeat state being hold in beat for dynamic loading modules

Metricbeat/Filebeat modules and inputs require to create a connection to
the shared libbeat per go-routine. This requires the beat to keep some
libbeat state, in order to connect a module to the pipeline when loading
a module dynamically (config reloading/autodiscovery).

With this change, the pipeline is passed to the RunnerFactory, This way
the libbeat-internal state required for connecting a dynamically loaded
module to libbeat is managed by libbeat, not the beat itself.
  • Loading branch information
Steffen Siering authored and stevea78 committed May 20, 2018
1 parent bc4b07c commit 1d0f5c0
Show file tree
Hide file tree
Showing 25 changed files with 109 additions and 68 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG-developer.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,7 @@ The list below covers the major changes between 6.3.0 and master only.

==== Breaking changes

- The beat.Pipeline is now passed to cfgfile.RunnerFactory. Beats using libbeat for module reloading or autodiscovery need to be adapted. {pull}7018[7017]


==== Added
7 changes: 4 additions & 3 deletions filebeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package autodiscover
import (
"errors"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/bus"
Expand Down Expand Up @@ -38,11 +39,11 @@ func (m *AutodiscoverAdapter) CheckConfig(c *common.Config) error {
}

// Create a module or input from the given config
func (m *AutodiscoverAdapter) Create(c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
func (m *AutodiscoverAdapter) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
if c.HasField("module") {
return m.moduleFactory.Create(c, meta)
return m.moduleFactory.Create(p, c, meta)
}
return m.inputFactory.Create(c, meta)
return m.inputFactory.Create(p, c, meta)
}

// EventFilter returns the bus filter to retrieve runner start/stop triggering events
Expand Down
6 changes: 3 additions & 3 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {

outDone := make(chan struct{}) // outDone closes down all active pipeline connections
crawler, err := crawler.New(
channel.NewOutletFactory(outDone, b.Publisher, wgEvents).Create,
channel.NewOutletFactory(outDone, wgEvents).Create,
config.Inputs,
b.Info.Version,
fb.done,
Expand Down Expand Up @@ -358,7 +358,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
logp.Debug("modules", "Existing Ingest pipelines will be updated")
}

err = crawler.Start(registrar, config.ConfigInput, config.ConfigModules, pipelineLoaderFactory, config.OverwritePipelines)
err = crawler.Start(b.Publisher, registrar, config.ConfigInput, config.ConfigModules, pipelineLoaderFactory, config.OverwritePipelines)
if err != nil {
crawler.Stop()
return err
Expand All @@ -377,7 +377,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
var adiscover *autodiscover.Autodiscover
if fb.config.Autodiscover != nil {
adapter := fbautodiscover.NewAutodiscoverAdapter(crawler.InputsFactory, crawler.ModulesFactory)
adiscover, err = autodiscover.NewAutodiscover("filebeat", adapter, config.Autodiscover)
adiscover, err = autodiscover.NewAutodiscover("filebeat", b.Publisher, adapter, config.Autodiscover)
if err != nil {
return err
}
Expand Down
9 changes: 3 additions & 6 deletions filebeat/channel/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import (
)

type OutletFactory struct {
done <-chan struct{}
pipeline beat.Pipeline
done <-chan struct{}

eventer beat.ClientEventer
wgEvents eventCounter
Expand Down Expand Up @@ -47,12 +46,10 @@ type inputOutletConfig struct {
// connecting an input to the publisher pipeline.
func NewOutletFactory(
done <-chan struct{},
pipeline beat.Pipeline,
wgEvents eventCounter,
) *OutletFactory {
o := &OutletFactory{
done: done,
pipeline: pipeline,
wgEvents: wgEvents,
}

Expand All @@ -67,7 +64,7 @@ func NewOutletFactory(
// Inputs and all harvesters use the same pipeline client instance.
// This guarantees ordering between events as required by the registrar for
// file.State updates
func (f *OutletFactory) Create(cfg *common.Config, dynFields *common.MapStrPointer) (Outleter, error) {
func (f *OutletFactory) Create(p beat.Pipeline, cfg *common.Config, dynFields *common.MapStrPointer) (Outleter, error) {
config := inputOutletConfig{}
if err := cfg.Unpack(&config); err != nil {
return nil, err
Expand Down Expand Up @@ -104,7 +101,7 @@ func (f *OutletFactory) Create(cfg *common.Config, dynFields *common.MapStrPoint
}
}

client, err := f.pipeline.ConnectWith(beat.ClientConfig{
client, err := p.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
EventMetadata: config.EventMetadata,
DynamicFields: dynFields,
Expand Down
6 changes: 5 additions & 1 deletion filebeat/channel/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package channel

import (
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
)

// Factory is used to create a new Outlet instance
type Factory func(*common.Config, *common.MapStrPointer) (Outleter, error)
type Factory func(beat.Pipeline, *common.Config, *common.MapStrPointer) (Outleter, error)

// Connector creates an Outlet connecting the event publishing with some internal pipeline.
type Connector func(*common.Config, *common.MapStrPointer) (Outleter, error)

// Outleter is the outlet for an input
type Outleter interface {
Expand Down
9 changes: 9 additions & 0 deletions filebeat/channel/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package channel

import (
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/atomic"
)

Expand All @@ -12,6 +14,13 @@ type subOutlet struct {
res chan bool
}

// ConnectTo creates a new Connector, combining a beat.Pipeline with an outlet Factory.
func ConnectTo(pipeline beat.Pipeline, factory Factory) Connector {
return func(cfg *common.Config, m *common.MapStrPointer) (Outleter, error) {
return factory(pipeline, cfg, m)
}
}

// SubOutlet create a sub-outlet, which can be closed individually, without closing the
// underlying outlet.
func SubOutlet(out Outleter) Outleter {
Expand Down
27 changes: 20 additions & 7 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/elastic/beats/filebeat/input/file"
input "github.com/elastic/beats/filebeat/prospector"
"github.com/elastic/beats/filebeat/registrar"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
Expand Down Expand Up @@ -42,22 +43,28 @@ func New(out channel.Factory, inputConfigs []*common.Config, beatVersion string,
}

// Start starts the crawler with all inputs
func (c *Crawler) Start(r *registrar.Registrar, configInputs *common.Config,
configModules *common.Config, pipelineLoaderFactory fileset.PipelineLoaderFactory, overwritePipelines bool) error {
func (c *Crawler) Start(
pipeline beat.Pipeline,
r *registrar.Registrar,
configInputs *common.Config,
configModules *common.Config,
pipelineLoaderFactory fileset.PipelineLoaderFactory,
overwritePipelines bool,
) error {

logp.Info("Loading Inputs: %v", len(c.inputConfigs))

// Prospect the globs/paths given on the command line and launch harvesters
for _, inputConfig := range c.inputConfigs {
err := c.startInput(inputConfig, r.GetStates())
err := c.startInput(pipeline, inputConfig, r.GetStates())
if err != nil {
return err
}
}

c.InputsFactory = input.NewRunnerFactory(c.out, r, c.beatDone)
if configInputs.Enabled() {
c.inputReloader = cfgfile.NewReloader(configInputs)
c.inputReloader = cfgfile.NewReloader(pipeline, configInputs)
if err := c.inputReloader.Check(c.InputsFactory); err != nil {
return err
}
Expand All @@ -69,7 +76,7 @@ func (c *Crawler) Start(r *registrar.Registrar, configInputs *common.Config,

c.ModulesFactory = fileset.NewFactory(c.out, r, c.beatVersion, pipelineLoaderFactory, overwritePipelines, c.beatDone)
if configModules.Enabled() {
c.modulesReloader = cfgfile.NewReloader(configModules)
c.modulesReloader = cfgfile.NewReloader(pipeline, configModules)
if err := c.modulesReloader.Check(c.ModulesFactory); err != nil {
return err
}
Expand All @@ -84,11 +91,17 @@ func (c *Crawler) Start(r *registrar.Registrar, configInputs *common.Config,
return nil
}

func (c *Crawler) startInput(config *common.Config, states []file.State) error {
func (c *Crawler) startInput(
pipeline beat.Pipeline,
config *common.Config,
states []file.State,
) error {
if !config.Enabled() {
return nil
}
p, err := input.New(config, c.out, c.beatDone, states, nil)

connector := channel.ConnectTo(pipeline, c.out)
p, err := input.New(config, connector, c.beatDone, states, nil)
if err != nil {
return fmt.Errorf("Error in initing input: %s", err)
}
Expand Down
6 changes: 4 additions & 2 deletions filebeat/fileset/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/elastic/beats/filebeat/channel"
input "github.com/elastic/beats/filebeat/prospector"
"github.com/elastic/beats/filebeat/registrar"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
Expand Down Expand Up @@ -45,7 +46,7 @@ func NewFactory(outlet channel.Factory, registrar *registrar.Registrar, beatVers
}

// Create creates a module based on a config
func (f *Factory) Create(c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
func (f *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
// Start a registry of one module:
m, err := NewModuleRegistry([]*common.Config{c}, f.beatVersion, false)
if err != nil {
Expand All @@ -66,8 +67,9 @@ func (f *Factory) Create(c *common.Config, meta *common.MapStrPointer) (cfgfile.
}

inputs := make([]*input.Runner, len(pConfigs))
connector := channel.ConnectTo(p, f.outlet)
for i, pConfig := range pConfigs {
inputs[i], err = input.New(pConfig, f.outlet, f.beatDone, f.registrar.GetStates(), meta)
inputs[i], err = input.New(pConfig, connector, f.beatDone, f.registrar.GetStates(), meta)
if err != nil {
logp.Err("Error creating input: %s", err)
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/docker/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func init() {
// NewInput creates a new docker input
func NewInput(
cfg *common.Config,
outletFactory channel.Factory,
outletFactory channel.Connector,
context input.Context,
) (input.Input, error) {
cfgwarn.Experimental("Docker input is enabled.")
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Runner struct {
// New instantiates a new Runner
func New(
conf *common.Config,
outlet channel.Factory,
outlet channel.Connector,
beatDone chan struct{},
states []file.State,
dynFields *common.MapStrPointer,
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/log/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Input struct {
// NewInput instantiates a new Log
func NewInput(
cfg *common.Config,
outlet channel.Factory,
outlet channel.Connector,
context input.Context,
) (input.Input, error) {

Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/redis/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Input struct {
}

// NewInput creates a new redis input
func NewInput(cfg *common.Config, outletFactory channel.Factory, context input.Context) (input.Input, error) {
func NewInput(cfg *common.Config, outletFactory channel.Connector, context input.Context) (input.Input, error) {
cfgwarn.Experimental("Redis slowlog input is enabled.")

config := defaultConfig
Expand Down
3 changes: 2 additions & 1 deletion filebeat/input/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ type Context struct {
DynamicFields *common.MapStrPointer
}

type Factory = func(config *common.Config, outletFactory channel.Factory, context Context) (Input, error)
// Factory is used to register functions creating new Input instances.
type Factory = func(config *common.Config, connector channel.Connector, context Context) (Input, error)

var registry = make(map[string]Factory)

Expand Down
5 changes: 2 additions & 3 deletions filebeat/input/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/elastic/beats/libbeat/common"
)

var fakeFactory = func(config *common.Config, outletFactory channel.Factory, context Context) (Input, error) {
var fakeFactory = func(_ *common.Config, _ channel.Connector, _ Context) (Input, error) {
return nil, nil
}

Expand All @@ -28,8 +28,7 @@ func TestAddNilFactory(t *testing.T) {
}

func TestAddFactoryTwice(t *testing.T) {
var err error
err = Register("name", fakeFactory)
err := Register("name", fakeFactory)
if err != nil {
t.Fatal(err)
}
Expand Down
10 changes: 8 additions & 2 deletions filebeat/input/runnerfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package input
import (
"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/registrar"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
)
Expand All @@ -24,8 +25,13 @@ func NewRunnerFactory(outlet channel.Factory, registrar *registrar.Registrar, be
}

// Create creates a input based on a config
func (r *RunnerFactory) Create(c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) {
p, err := New(c, r.outlet, r.beatDone, r.registrar.GetStates(), meta)
func (r *RunnerFactory) Create(
pipeline beat.Pipeline,
c *common.Config,
meta *common.MapStrPointer,
) (cfgfile.Runner, error) {
connector := channel.ConnectTo(pipeline, r.outlet)
p, err := New(c, connector, r.beatDone, r.registrar.GetStates(), meta)
if err != nil {
// In case of error with loading state, input is still returned
return p, err
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/stdin/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Input struct {

// NewInput creates a new stdin input
// This input contains one harvester which is reading from stdin
func NewInput(cfg *common.Config, outlet channel.Factory, context input.Context) (input.Input, error) {
func NewInput(cfg *common.Config, outlet channel.Connector, context input.Context) (input.Input, error) {
out, err := outlet(cfg, context.DynamicFields)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/syslog/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type Input struct {
// NewInput creates a new syslog input
func NewInput(
cfg *common.Config,
outlet channel.Factory,
outlet channel.Connector,
context input.Context,
) (input.Input, error) {
cfgwarn.Experimental("Syslog input type is used")
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/tcp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Input struct {
// NewInput creates a new TCP input
func NewInput(
cfg *common.Config,
outlet channel.Factory,
outlet channel.Connector,
context input.Context,
) (input.Input, error) {
cfgwarn.Experimental("TCP input type is used")
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/udp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Input struct {
// NewInput creates a new udp input
func NewInput(
cfg *common.Config,
outlet channel.Factory,
outlet channel.Connector,
context input.Context,
) (input.Input, error) {
cfgwarn.Experimental("UDP input type is used")
Expand Down
Loading

0 comments on commit 1d0f5c0

Please sign in to comment.