diff --git a/filebeat/beater/channels.go b/filebeat/beater/channels.go index bf0f318d622d..8b0c5dfd29a6 100644 --- a/filebeat/beater/channels.go +++ b/filebeat/beater/channels.go @@ -4,9 +4,9 @@ import ( "sync" "sync/atomic" - "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/filebeat/registrar" "github.com/elastic/beats/filebeat/spooler" + "github.com/elastic/beats/libbeat/common" ) type spoolerOutlet struct { @@ -19,12 +19,12 @@ type spoolerOutlet struct { type publisherChannel struct { done chan struct{} - ch chan []*input.Event + ch chan []*common.MapStr } type registrarLogger struct { done chan struct{} - ch chan<- []*input.Event + ch chan<- []*common.MapStr } type finishedLogger struct { @@ -44,7 +44,7 @@ func newSpoolerOutlet( } } -func (o *spoolerOutlet) OnEvent(event *input.Event) bool { +func (o *spoolerOutlet) OnEvent(event *common.MapStr) bool { open := atomic.LoadInt32(&o.isOpen) == 1 if !open { return false @@ -69,12 +69,12 @@ func (o *spoolerOutlet) OnEvent(event *input.Event) bool { func newPublisherChannel() *publisherChannel { return &publisherChannel{ done: make(chan struct{}), - ch: make(chan []*input.Event, 1), + ch: make(chan []*common.MapStr, 1), } } func (c *publisherChannel) Close() { close(c.done) } -func (c *publisherChannel) Send(events []*input.Event) bool { +func (c *publisherChannel) Send(events []*common.MapStr) bool { select { case <-c.done: // set ch to nil, so no more events will be send after channel close signal @@ -96,7 +96,7 @@ func newRegistrarLogger(reg *registrar.Registrar) *registrarLogger { } func (l *registrarLogger) Close() { close(l.done) } -func (l *registrarLogger) Published(events []*input.Event) bool { +func (l *registrarLogger) Published(events []*common.MapStr) bool { select { case <-l.done: // set ch to nil, so no more events will be send after channel close signal @@ -114,7 +114,7 @@ func newFinishedLogger(wg *sync.WaitGroup) *finishedLogger { return &finishedLogger{wg} } -func (l *finishedLogger) Published(events []*input.Event) bool { +func (l *finishedLogger) Published(events []*common.MapStr) bool { for range events { l.wg.Done() } diff --git a/filebeat/input/event.go b/filebeat/input/event.go index 51267dba7637..384a7f0cbabc 100644 --- a/filebeat/input/event.go +++ b/filebeat/input/event.go @@ -65,6 +65,10 @@ func (e *Event) ToMapStr() common.MapStr { event["message"] = *e.Text } + meta := e.Metadata(); if meta != nil { + event["meta"] = meta + } + return event } diff --git a/filebeat/prospector/config.go b/filebeat/prospector/config.go index a5fd6b90b473..184b78ad0f4c 100644 --- a/filebeat/prospector/config.go +++ b/filebeat/prospector/config.go @@ -8,6 +8,7 @@ import ( "github.com/elastic/beats/filebeat/harvester/reader" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/match" + "github.com/elastic/beats/libbeat/processors" ) var ( @@ -43,6 +44,7 @@ type prospectorConfig struct { Pipeline string `config:"pipeline"` Module string `config:"_module_name"` // hidden option to set the module name Fileset string `config:"_fileset_name"` // hidden option to set the fileset name + Filters processors.PluginConfig `config:"filters"` } func (config *prospectorConfig) Validate() error { diff --git a/filebeat/prospector/prospector.go b/filebeat/prospector/prospector.go index 2bbdccc90c5e..56048780821a 100644 --- a/filebeat/prospector/prospector.go +++ b/filebeat/prospector/prospector.go @@ -16,6 +16,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" + "github.com/elastic/beats/libbeat/processors" ) var ( @@ -40,6 +41,7 @@ type Prospector struct { registry *harvesterRegistry beatDone chan struct{} eventCounter *sync.WaitGroup + filters *processors.Processors } // Prospectorer is the interface common to all prospectors @@ -50,7 +52,7 @@ type Prospectorer interface { // Outlet is the outlet for a prospector type Outlet interface { - OnEvent(event *input.Event) bool + OnEvent(event *common.MapStr) bool } // NewProspector instantiates a new prospector @@ -84,6 +86,13 @@ func NewProspector(cfg *common.Config, outlet Outlet, beatDone chan struct{}) (* return nil, err } + f, err := processors.New(prospector.config.Filters) + if err != nil { + return nil, err + } + + prospector.filters = f + logp.Debug("prospector", "File Configs: %v", prospector.config.Paths) return prospector, nil @@ -214,12 +223,20 @@ func (p *Prospector) updateState(event *input.Event) error { event.Module = p.config.Module event.Fileset = p.config.Fileset - ok := p.outlet.OnEvent(event) - if !ok { - logp.Info("Prospector outlet closed") - return errors.New("prospector outlet closed") + eventMap := event.ToMapStr() + + //run the filters before sending to + eventMap = p.filters.Run(eventMap) + if eventMap != nil { + //processor might decide to drop the event + ok := p.outlet.OnEvent(&eventMap) + if !ok { + logp.Info("Prospector outlet closed") + return errors.New("prospector outlet closed") + } } + p.states.Update(event.State) return nil } diff --git a/filebeat/publisher/async.go b/filebeat/publisher/async.go index 38ef9af77baa..0fea824a4b8d 100644 --- a/filebeat/publisher/async.go +++ b/filebeat/publisher/async.go @@ -5,15 +5,15 @@ import ( "sync/atomic" "time" - "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/publisher" + "github.com/elastic/beats/libbeat/common" ) type asyncLogPublisher struct { pub publisher.Publisher client publisher.Client - in chan []*input.Event + in chan []*common.MapStr out SuccessLogger // list of in-flight batches @@ -29,7 +29,7 @@ type asyncLogPublisher struct { type eventsBatch struct { next *eventsBatch flag int32 - events []*input.Event + events []*common.MapStr } type batchList struct { @@ -50,7 +50,7 @@ const ( ) func newAsyncLogPublisher( - in chan []*input.Event, + in chan []*common.MapStr, out SuccessLogger, pub publisher.Publisher, ) *asyncLogPublisher { diff --git a/filebeat/publisher/publisher.go b/filebeat/publisher/publisher.go index 6f78fa48071c..bac60838ca33 100644 --- a/filebeat/publisher/publisher.go +++ b/filebeat/publisher/publisher.go @@ -3,7 +3,6 @@ package publisher import ( "errors" - "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" @@ -24,12 +23,12 @@ type LogPublisher interface { type SuccessLogger interface { // Published will be run after events have been acknowledged by the outputs. - Published(events []*input.Event) bool + Published(events []*common.MapStr) bool } func New( async bool, - in chan []*input.Event, + in chan []*common.MapStr, out SuccessLogger, pub publisher.Publisher, ) LogPublisher { @@ -46,14 +45,18 @@ var ( // getDataEvents returns all events which contain data (not only state updates) // together with their associated metadata -func getDataEvents(events []*input.Event) (dataEvents []common.MapStr, meta []common.MapStr) { +func getDataEvents(events []*common.MapStr) (dataEvents []common.MapStr, meta []common.MapStr) { dataEvents = make([]common.MapStr, 0, len(events)) meta = make([]common.MapStr, 0, len(events)) for _, event := range events { - if event.HasData() { - dataEvents = append(dataEvents, event.ToMapStr()) - meta = append(meta, event.Metadata()) + if ok, _ := event.HasKey("meta"); ok { + mIface, err := event.GetValue("meta"); if err != nil { + meta = append(meta, mIface.(common.MapStr)) + } + event.Delete("meta") } + dataEvents = append(dataEvents, *event) + } return dataEvents, meta } diff --git a/filebeat/publisher/sync.go b/filebeat/publisher/sync.go index 5b4885959dc8..1d8b7953adfe 100644 --- a/filebeat/publisher/sync.go +++ b/filebeat/publisher/sync.go @@ -3,15 +3,15 @@ package publisher import ( "sync" - "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/publisher" + "github.com/elastic/beats/libbeat/common" ) type syncLogPublisher struct { pub publisher.Publisher client publisher.Client - in chan []*input.Event + in chan []*common.MapStr out SuccessLogger done chan struct{} @@ -19,7 +19,7 @@ type syncLogPublisher struct { } func newSyncLogPublisher( - in chan []*input.Event, + in chan []*common.MapStr, out SuccessLogger, pub publisher.Publisher, ) *syncLogPublisher { @@ -51,7 +51,7 @@ func (p *syncLogPublisher) Start() { } func (p *syncLogPublisher) Publish() error { - var events []*input.Event + var events []*common.MapStr select { case <-p.done: return sigPublisherStop diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index 8e30ac7b61f7..5f106c4c92fe 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -8,16 +8,16 @@ import ( "sync" cfg "github.com/elastic/beats/filebeat/config" - "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/filebeat/publisher" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" "github.com/elastic/beats/libbeat/paths" + "github.com/elastic/beats/libbeat/common" ) type Registrar struct { - Channel chan []*input.Event + Channel chan []*common.MapStr out publisher.SuccessLogger done chan struct{} registryFile string // Path to the Registry File @@ -38,7 +38,7 @@ func New(registryFile string, out publisher.SuccessLogger) (*Registrar, error) { registryFile: registryFile, done: make(chan struct{}), states: file.NewStates(), - Channel: make(chan []*input.Event, 1), + Channel: make(chan []*common.MapStr, 1), out: out, wg: sync.WaitGroup{}, } @@ -153,7 +153,7 @@ func (r *Registrar) Run() { }() for { - var events []*input.Event + var events []*common.MapStr select { case <-r.done: @@ -183,18 +183,28 @@ func (r *Registrar) Run() { } // processEventStates gets the states from the events and writes them to the registrar state -func (r *Registrar) processEventStates(events []*input.Event) { +func (r *Registrar) processEventStates(events []*common.MapStr) { logp.Debug("registrar", "Processing %d events", len(events)) // Take the last event found for each file source for _, event := range events { // skip stdin - if event.InputType == cfg.StdinInputType { - continue + inputIface, err := event.GetValue("input_type"); if err == nil { + input_string, ok := inputIface.(string); if ok { + if input_string == cfg.StdinInputType { + continue + } + } } - r.states.Update(event.State) - statesUpdate.Add(1) + + stateIface, err := event.GetValue("state"); if err == nil { + state, ok := stateIface.(file.State); if ok { + r.states.Update(state) + statesUpdate.Add(1) + } + } + } } diff --git a/filebeat/spooler/spooler.go b/filebeat/spooler/spooler.go index a5d3c4e3ceae..2f38c71c0d63 100644 --- a/filebeat/spooler/spooler.go +++ b/filebeat/spooler/spooler.go @@ -5,8 +5,8 @@ import ( "time" cfg "github.com/elastic/beats/filebeat/config" - "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/common" ) var debugf = logp.MakeDebug("spooler") @@ -16,16 +16,16 @@ const channelSize = 16 // Spooler aggregates the events and sends the aggregated data to the publisher. type Spooler struct { - Channel chan *input.Event // Channel is the input to the Spooler. + Channel chan *common.MapStr // Channel is the input to the Spooler. config spoolerConfig output Output // batch event output on flush - spool []*input.Event // Events being held by the Spooler. + spool []*common.MapStr // Events being held by the Spooler. wg sync.WaitGroup // WaitGroup used to control the shutdown. } // Output spooler sends event to through Send method type Output interface { - Send(events []*input.Event) bool + Send(events []*common.MapStr) bool } type spoolerConfig struct { @@ -40,13 +40,13 @@ func New( out Output, ) (*Spooler, error) { return &Spooler{ - Channel: make(chan *input.Event, channelSize), + Channel: make(chan *common.MapStr, channelSize), config: spoolerConfig{ idleTimeout: config.IdleTimeout, spoolSize: config.SpoolSize, }, output: out, - spool: make([]*input.Event, 0, config.SpoolSize), + spool: make([]*common.MapStr, 0, config.SpoolSize), }, nil } @@ -74,7 +74,6 @@ func (s *Spooler) run() { if !ok { return } - if event != nil { flushed := s.queue(event) if flushed { @@ -112,7 +111,7 @@ func (s *Spooler) Stop() { // queue queues a single event to be spooled. If the queue reaches spoolSize // while calling this method then all events in the queue will be flushed to // the publisher. -func (s *Spooler) queue(event *input.Event) bool { +func (s *Spooler) queue(event *common.MapStr) bool { flushed := false s.spool = append(s.spool, event) if len(s.spool) == cap(s.spool) { @@ -132,7 +131,7 @@ func (s *Spooler) flush() int { } // copy buffer - tmpCopy := make([]*input.Event, count) + tmpCopy := make([]*common.MapStr, count) copy(tmpCopy, s.spool) // clear buffer