Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #10801 to 7.0: Beats event processing and default fields #11155

Merged
merged 1 commit into from
Mar 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ The list below covers the major changes between 7.0.0-beta1 and master only.
be used to create an index selector. {pull}10347[10347]
- Remove support for loading dashboards to Elasticsearch 5. {pull}10451[10451]
- Remove support for deprecated `GenRootCmd` methods. {pull}10721[10721]
- Remove SkipNormalization, SkipAgentMetadata, SkipAddHostName. {pull}10801[10801] {pull}10769[10769]

==== Bugfixes

- Align default index between elasticsearch and logstash and kafka output. {pull}10841[10841]
- Fix duplication check for `append_fields` option. {pull}10959[10959]

==== Added

- Introduce processing.Support to instance.Setting. This allows Beats to fully modify the event processing. {pull}10801[10801]
16 changes: 9 additions & 7 deletions filebeat/channel/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,15 @@ func (f *OutletFactory) Create(p beat.Pipeline, cfg *common.Config, dynFields *c
}

client, err := p.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
EventMetadata: config.EventMetadata,
DynamicFields: dynFields,
Meta: meta,
Fields: fields,
Processor: processors,
Events: f.eventer,
PublishMode: beat.GuaranteedSend,
Processing: beat.ProcessingConfig{
EventMetadata: config.EventMetadata,
DynamicFields: dynFields,
Meta: meta,
Fields: fields,
Processor: processors,
},
Events: f.eventer,
})
if err != nil {
return nil, err
Expand Down
8 changes: 5 additions & 3 deletions heartbeat/monitors/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,11 @@ func (t *configuredJob) Start() {
}

t.client, err = t.monitor.pipelineConnector.ConnectWith(beat.ClientConfig{
EventMetadata: t.config.EventMetadata,
Processor: t.processors,
Fields: fields,
Processing: beat.ProcessingConfig{
EventMetadata: t.config.EventMetadata,
Processor: t.processors,
Fields: fields,
},
})
if err != nil {
logp.Err("could not start monitor: %v", err)
Expand Down
10 changes: 6 additions & 4 deletions journalbeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,12 @@ func New(
func (i *Input) Run() {
var err error
i.client, err = i.pipeline.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
EventMetadata: i.eventMeta,
Meta: nil,
Processor: i.processors,
PublishMode: beat.GuaranteedSend,
Processing: beat.ProcessingConfig{
EventMetadata: i.eventMeta,
Meta: nil,
Processor: i.processors,
},
ACKCount: func(n int) {
i.logger.Infof("journalbeat successfully published %d events", n)
},
Expand Down
50 changes: 26 additions & 24 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,7 @@ type Client interface {
type ClientConfig struct {
PublishMode PublishMode

// EventMetadata configures additional fields/tags to be added to published events.
EventMetadata common.EventMetadata

// Meta provides additional meta data to be added to the Meta field in the beat.Event
// structure.
Meta common.MapStr

// Fields provides additional 'global' fields to be added to every event
Fields common.MapStr

// DynamicFields provides additional fields to be added to every event, supporting live updates
DynamicFields *common.MapStrPointer

// Processors passes additional processor to the client, to be executed before
// the pipeline processors.
Processor ProcessorList
Processing ProcessingConfig

// WaitClose sets the maximum duration to wait on ACK, if client still has events
// active non-acknowledged events in the publisher pipeline.
Expand All @@ -72,14 +57,6 @@ type ClientConfig struct {
// Events configures callbacks for common client callbacks
Events ClientEventer

// By default events are normalized within processor pipeline,
// if the normalization step should be skipped set this to true.
SkipNormalization bool

// By default events are decorated with agent metadata.
// To skip adding that metadata set this to true.
SkipAgentMetadata bool

// ACK handler strategies.
// Note: ack handlers are run in another go-routine owned by the publisher pipeline.
// They should not block for to long, to not block the internal buffers for
Expand All @@ -101,6 +78,31 @@ type ClientConfig struct {
ACKLastEvent func(interface{})
}

// ProcessingConfig provides additional event processing settings a client can
// pass to the publisher pipeline on Connect.
type ProcessingConfig struct {
// EventMetadata configures additional fields/tags to be added to published events.
EventMetadata common.EventMetadata

// Meta provides additional meta data to be added to the Meta field in the beat.Event
// structure.
Meta common.MapStr

// Fields provides additional 'global' fields to be added to every event
Fields common.MapStr

// DynamicFields provides additional fields to be added to every event, supporting live updates
DynamicFields *common.MapStrPointer

// Processors passes additional processor to the client, to be executed before
// the pipeline processors.
Processor ProcessorList

// Private contains additional information to be passed to the processing
// pipeline builder.
Private interface{}
}

// ClientEventer provides access to internal client events.
type ClientEventer interface {
Closing() // Closing indicates the client is being shutdown next
Expand Down
14 changes: 14 additions & 0 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import (
"github.com/elastic/beats/libbeat/paths"
"github.com/elastic/beats/libbeat/plugin"
"github.com/elastic/beats/libbeat/publisher/pipeline"
"github.com/elastic/beats/libbeat/publisher/processing"
svc "github.com/elastic/beats/libbeat/service"
"github.com/elastic/beats/libbeat/version"
sysinfo "github.com/elastic/go-sysinfo"
Expand All @@ -78,6 +79,8 @@ type Beat struct {

keystore keystore.Keystore
index idxmgmt.Supporter

processing processing.Supporter
}

type beatConfig struct {
Expand Down Expand Up @@ -310,6 +313,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
Logger: logp.L().Named("publisher"),
},
b.Config.Pipeline,
b.processing,
b.makeOutputFactory(b.Config.Output),
)

Expand Down Expand Up @@ -594,6 +598,16 @@ func (b *Beat) configure(settings Settings) error {
imFactory = idxmgmt.MakeDefaultSupport(settings.ILM)
}
b.index, err = imFactory(nil, b.Beat.Info, b.RawConfig)
if err != nil {
return err
}

processingFactory := settings.Processing
if processingFactory == nil {
processingFactory = processing.MakeDefaultBeatSupport(true)
}
b.processing, err = processingFactory(b.Info, logp.L().Named("processors"), b.RawConfig)

return err
}

Expand Down
3 changes: 3 additions & 0 deletions libbeat/cmd/instance/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/elastic/beats/libbeat/idxmgmt"
"github.com/elastic/beats/libbeat/idxmgmt/ilm"
"github.com/elastic/beats/libbeat/monitoring/report"
"github.com/elastic/beats/libbeat/publisher/processing"
)

// Settings contains basic settings for any beat to pass into GenRootCmd
Expand All @@ -40,4 +41,6 @@ type Settings struct {
// load custom index manager. The config object will be the Beats root configuration.
IndexManagement idxmgmt.SupportFactory
ILM ilm.SupportFactory

Processing processing.SupportFactory
}
9 changes: 8 additions & 1 deletion libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/elastic/beats/libbeat/outputs/transport"
"github.com/elastic/beats/libbeat/publisher/pipeline"
"github.com/elastic/beats/libbeat/publisher/processing"
"github.com/elastic/beats/libbeat/publisher/queue"
"github.com/elastic/beats/libbeat/publisher/queue/memqueue"
)
Expand Down Expand Up @@ -169,11 +170,16 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config)
outClient := outputs.NewFailoverClient(clients)
outClient = outputs.WithBackoff(outClient, config.Backoff.Init, config.Backoff.Max)

processing, err := processing.MakeDefaultSupport(true)(beat, log, common.NewConfig())
if err != nil {
return nil, err
}

pipeline, err := pipeline.New(
beat,
pipeline.Monitors{
Metrics: monitoring,
Logger: logp.NewLogger(selector),
Logger: log,
},
queueFactory,
outputs.Group{
Expand All @@ -184,6 +190,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *common.Config)
pipeline.Settings{
WaitClose: 0,
WaitCloseMode: pipeline.NoWaitOnClose,
Processors: processing,
})
if err != nil {
return nil, err
Expand Down
20 changes: 2 additions & 18 deletions libbeat/publisher/pipeline/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/publisher/processing"
"github.com/elastic/beats/libbeat/publisher/queue"
)

Expand Down Expand Up @@ -60,6 +60,7 @@ func Load(
beatInfo beat.Info,
monitors Monitors,
config Config,
processors processing.Supporter,
makeOutput func(outputs.Observer) (string, outputs.Group, error),
) (*Pipeline, error) {
log := monitors.Logger
Expand All @@ -71,28 +72,11 @@ func Load(
log.Info("Dry run mode. All output types except the file based one are disabled.")
}

processors, err := processors.New(config.Processors)
if err != nil {
return nil, fmt.Errorf("error initializing processors: %v", err)
}

name := beatInfo.Name
settings := Settings{
WaitClose: 0,
WaitCloseMode: NoWaitOnClose,
Disabled: publishDisabled,
Processors: processors,
Annotations: Annotations{
Event: config.EventMetadata,
Builtin: common.MapStr{
"host": common.MapStr{
"name": name,
},
"ecs": common.MapStr{
"version": "1.0.0-beta2",
},
},
},
}

queueBuilder, err := createQueueBuilder(config.Queue, monitors)
Expand Down
Loading