Skip to content

Commit

Permalink
Cherry-pick #17655 to 7.x: Start to split filebeat/channel up (#19654)
Browse files Browse the repository at this point in the history
(cherry picked from commit bc8123a)
  • Loading branch information
Steffen Siering authored Jul 7, 2020
1 parent 40f54bb commit 63fe814
Show file tree
Hide file tree
Showing 12 changed files with 426 additions and 188 deletions.
91 changes: 91 additions & 0 deletions filebeat/beater/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (

"github.com/elastic/beats/v7/filebeat/input/file"
"github.com/elastic/beats/v7/filebeat/registrar"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
)

type registrarLogger struct {
Expand All @@ -41,6 +43,23 @@ type eventCounter struct {
wg sync.WaitGroup
}

// countingClient adds and substracts from a counter when events have been
// published, dropped or ACKed. The countingClient can be used to keep track of
// inflight events for a beat.Client instance. The counter is updated after the
// client has been disconnected from the publisher pipeline via 'Closed'.
type countingClient struct {
counter *eventCounter
client beat.Client
}

type countingEventer struct {
wgEvents *eventCounter
}

type combinedEventer struct {
a, b beat.ClientEventer
}

func newRegistrarLogger(reg *registrar.Registrar) *registrarLogger {
return &registrarLogger{
done: make(chan struct{}),
Expand Down Expand Up @@ -87,3 +106,75 @@ func (c *eventCounter) Done() {
func (c *eventCounter) Wait() {
c.wg.Wait()
}

// withPipelineEventCounter adds a counter to the pipeline that keeps track of
// all events published, dropped and ACKed by any active client.
// The type accepted by counter is compatible with sync.WaitGroup.
func withPipelineEventCounter(pipeline beat.PipelineConnector, counter *eventCounter) beat.PipelineConnector {
counterListener := &countingEventer{counter}

pipeline = pipetool.WithClientConfigEdit(pipeline, func(config beat.ClientConfig) (beat.ClientConfig, error) {
if evts := config.Events; evts != nil {
config.Events = &combinedEventer{evts, counterListener}
} else {
config.Events = counterListener
}
return config, nil
})

pipeline = pipetool.WithClientWrapper(pipeline, func(client beat.Client) beat.Client {
return &countingClient{
counter: counter,
client: client,
}
})
return pipeline
}

func (c *countingClient) Publish(event beat.Event) {
c.counter.Add(1)
c.client.Publish(event)
}

func (c *countingClient) PublishAll(events []beat.Event) {
c.counter.Add(len(events))
c.client.PublishAll(events)
}

func (c *countingClient) Close() error {
return c.client.Close()
}

func (*countingEventer) Closing() {}
func (*countingEventer) Closed() {}
func (*countingEventer) Published() {}

func (c *countingEventer) FilteredOut(_ beat.Event) {}
func (c *countingEventer) DroppedOnPublish(_ beat.Event) {
c.wgEvents.Done()
}

func (c *combinedEventer) Closing() {
c.a.Closing()
c.b.Closing()
}

func (c *combinedEventer) Closed() {
c.a.Closed()
c.b.Closed()
}

func (c *combinedEventer) Published() {
c.a.Published()
c.b.Published()
}

func (c *combinedEventer) FilteredOut(event beat.Event) {
c.a.FilteredOut(event)
c.b.FilteredOut(event)
}

func (c *combinedEventer) DroppedOnPublish(event beat.Event) {
c.a.DroppedOnPublish(event)
c.b.DroppedOnPublish(event)
}
4 changes: 2 additions & 2 deletions filebeat/beater/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func newCrawler(

// Start starts the crawler with all inputs
func (c *crawler) Start(
pipeline beat.Pipeline,
pipeline beat.PipelineConnector,
configInputs *common.Config,
configModules *common.Config,
) error {
Expand Down Expand Up @@ -111,7 +111,7 @@ func (c *crawler) Start(
}

func (c *crawler) startInput(
pipeline beat.Pipeline,
pipeline beat.PipelineConnector,
config *common.Config,
) error {
if !config.Enabled() {
Expand Down
20 changes: 13 additions & 7 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"

_ "github.com/elastic/beats/v7/filebeat/include"

Expand All @@ -68,6 +69,7 @@ type Filebeat struct {
config *cfg.Config
moduleRegistry *fileset.ModuleRegistry
done chan struct{}
pipeline beat.PipelineConnector
}

// New creates a new Filebeat pointer instance.
Expand Down Expand Up @@ -169,7 +171,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
pipelineLoaderFactory := newPipelineLoaderFactory(b.Config.Output.Config())
modulesFactory := fileset.NewSetupFactory(b.Info, pipelineLoaderFactory)
if fb.config.ConfigModules.Enabled() {
modulesLoader := cfgfile.NewReloader(b.Publisher, fb.config.ConfigModules)
modulesLoader := cfgfile.NewReloader(fb.pipeline, fb.config.ConfigModules)
modulesLoader.Load(modulesFactory)
}

Expand Down Expand Up @@ -328,8 +330,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
return err
}

fb.pipeline = pipetool.WithDefaultGuarantees(b.Publisher, beat.GuaranteedSend)
fb.pipeline = withPipelineEventCounter(fb.pipeline, wgEvents)

outDone := make(chan struct{}) // outDone closes down all active pipeline connections
pipelineConnector := channel.NewOutletFactory(outDone, wgEvents, b.Info).Create
pipelineConnector := channel.NewOutletFactory(outDone).Create

// Create a ES connection factory for dynamic modules pipeline loading
var pipelineLoaderFactory fileset.PipelineLoaderFactory
Expand All @@ -339,7 +344,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
logp.Warn(pipelinesWarning)
}

inputLoader := input.NewRunnerFactory(pipelineConnector, registrar, fb.done)
inputLoader := channel.RunnerFactoryWithCommonInputSettings(b.Info,
input.NewRunnerFactory(pipelineConnector, registrar, fb.done))
moduleLoader := fileset.NewFactory(inputLoader, b.Info, pipelineLoaderFactory, config.OverwritePipelines)

crawler, err := newCrawler(inputLoader, moduleLoader, config.Inputs, fb.done, *once)
Expand Down Expand Up @@ -376,7 +382,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
logp.Debug("modules", "Existing Ingest pipelines will be updated")
}

err = crawler.Start(b.Publisher, config.ConfigInput, config.ConfigModules)
err = crawler.Start(fb.pipeline, config.ConfigInput, config.ConfigModules)
if err != nil {
crawler.Stop()
return fmt.Errorf("Failed to start crawler: %+v", err)
Expand All @@ -393,17 +399,17 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
}

// Register reloadable list of inputs and modules
inputs := cfgfile.NewRunnerList(management.DebugK, inputLoader, b.Publisher)
inputs := cfgfile.NewRunnerList(management.DebugK, inputLoader, fb.pipeline)
reload.Register.MustRegisterList("filebeat.inputs", inputs)

modules := cfgfile.NewRunnerList(management.DebugK, moduleLoader, b.Publisher)
modules := cfgfile.NewRunnerList(management.DebugK, moduleLoader, fb.pipeline)
reload.Register.MustRegisterList("filebeat.modules", modules)

var adiscover *autodiscover.Autodiscover
if fb.config.Autodiscover != nil {
adiscover, err = autodiscover.NewAutodiscover(
"filebeat",
b.Publisher,
fb.pipeline,
cfgfile.MultiplexedRunnerFactory(
cfgfile.MatchHasField("module", moduleLoader),
cfgfile.MatchDefault(inputLoader),
Expand Down
87 changes: 1 addition & 86 deletions filebeat/channel/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ package channel
import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/add_formatted_index"
)

// ConnectorFunc is an adapter for using ordinary functions as Connector.
Expand All @@ -48,97 +45,15 @@ func (c *pipelineConnector) Connect(cfg *common.Config) (Outleter, error) {
}

func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.ClientConfig) (Outleter, error) {
config := inputOutletConfig{}
if err := cfg.Unpack(&config); err != nil {
return nil, err
}

procs, err := processorsForConfig(c.parent.beatInfo, config, clientCfg)
if err != nil {
return nil, err
}

setOptional := func(to common.MapStr, key string, value string) {
if value != "" {
to.Put(key, value)
}
}

meta := clientCfg.Processing.Meta.Clone()
fields := clientCfg.Processing.Fields.Clone()

serviceType := config.ServiceType
if serviceType == "" {
serviceType = config.Module
}

setOptional(meta, "pipeline", config.Pipeline)
setOptional(fields, "fileset.name", config.Fileset)
setOptional(fields, "service.type", serviceType)
setOptional(fields, "input.type", config.Type)
if config.Module != "" {
event := common.MapStr{"module": config.Module}
if config.Fileset != "" {
event["dataset"] = config.Module + "." + config.Fileset
}
fields["event"] = event
}

mode := clientCfg.PublishMode
if mode == beat.DefaultGuarantees {
mode = beat.GuaranteedSend
}

// connect with updated configuration
clientCfg.PublishMode = mode
clientCfg.Processing.EventMetadata = config.EventMetadata
clientCfg.Processing.Meta = meta
clientCfg.Processing.Fields = fields
clientCfg.Processing.Processor = procs
clientCfg.Processing.KeepNull = config.KeepNull
clientCfg.Processing.DisableHost = config.PublisherPipeline.DisableHost
client, err := c.pipeline.ConnectWith(clientCfg)
if err != nil {
return nil, err
}

outlet := newOutlet(client, c.parent.wgEvents)
outlet := newOutlet(client)
if c.parent.done != nil {
return CloseOnSignal(outlet, c.parent.done), nil
}
return outlet, nil
}

// processorsForConfig assembles the Processors for a pipelineConnector.
func processorsForConfig(
beatInfo beat.Info, config inputOutletConfig, clientCfg beat.ClientConfig,
) (*processors.Processors, error) {
procs := processors.NewList(nil)

// Processor ordering is important:
// 1. Index configuration
if !config.Index.IsEmpty() {
staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version)
timestampFormat, err :=
fmtstr.NewTimestampFormatString(&config.Index, staticFields)
if err != nil {
return nil, err
}
indexProcessor := add_formatted_index.New(timestampFormat)
procs.AddProcessor(indexProcessor)
}

// 2. ClientConfig processors
if lst := clientCfg.Processing.Processor; lst != nil {
procs.AddProcessor(lst)
}

// 3. User processors
userProcessors, err := processors.New(config.Processors)
if err != nil {
return nil, err
}
procs.AddProcessors(*userProcessors)

return procs, nil
}
62 changes: 2 additions & 60 deletions filebeat/channel/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,69 +19,17 @@ package channel

import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/processors"
)

type OutletFactory struct {
done <-chan struct{}

eventer beat.ClientEventer
wgEvents eventCounter
beatInfo beat.Info
}

type eventCounter interface {
Add(n int)
Done()
}

// clientEventer adjusts wgEvents if events are dropped during shutdown.
type clientEventer struct {
wgEvents eventCounter
}

// inputOutletConfig defines common input settings
// for the publisher pipeline.
type inputOutletConfig struct {
// event processing
common.EventMetadata `config:",inline"` // Fields and tags to add to events.
Processors processors.PluginConfig `config:"processors"`
KeepNull bool `config:"keep_null"`

PublisherPipeline struct {
DisableHost bool `config:"disable_host"` // Disable addition of host.name.
} `config:"publisher_pipeline"`

// implicit event fields
Type string `config:"type"` // input.type
ServiceType string `config:"service.type"` // service.type

// hidden filebeat modules settings
Module string `config:"_module_name"` // hidden setting
Fileset string `config:"_fileset_name"` // hidden setting

// Output meta data settings
Pipeline string `config:"pipeline"` // ES Ingest pipeline name
Index fmtstr.EventFormatString `config:"index"` // ES output index pattern
}

// NewOutletFactory creates a new outlet factory for
// connecting an input to the publisher pipeline.
func NewOutletFactory(
done <-chan struct{},
wgEvents eventCounter,
beatInfo beat.Info,
) *OutletFactory {
func NewOutletFactory(done <-chan struct{}) *OutletFactory {
o := &OutletFactory{
done: done,
wgEvents: wgEvents,
beatInfo: beatInfo,
}

if wgEvents != nil {
o.eventer = &clientEventer{wgEvents}
done: done,
}

return o
Expand All @@ -94,9 +42,3 @@ func NewOutletFactory(
func (f *OutletFactory) Create(p beat.PipelineConnector) Connector {
return &pipelineConnector{parent: f, pipeline: p}
}

func (e *clientEventer) Closing() {}
func (e *clientEventer) Closed() {}
func (e *clientEventer) Published() {}
func (e *clientEventer) FilteredOut(evt beat.Event) {}
func (e *clientEventer) DroppedOnPublish(evt beat.Event) { e.wgEvents.Done() }
Loading

0 comments on commit 63fe814

Please sign in to comment.