Skip to content

Commit

Permalink
Adding support for filebeat prospector level filters
Browse files Browse the repository at this point in the history
  • Loading branch information
vjsamuel committed Mar 28, 2017
1 parent c7017db commit d63d93b
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 46 deletions.
16 changes: 8 additions & 8 deletions filebeat/beater/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"sync"
"sync/atomic"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/registrar"
"github.com/elastic/beats/filebeat/spooler"
"github.com/elastic/beats/libbeat/common"
)

type spoolerOutlet struct {
Expand All @@ -19,12 +19,12 @@ type spoolerOutlet struct {

type publisherChannel struct {
done chan struct{}
ch chan []*input.Event
ch chan []*common.MapStr
}

type registrarLogger struct {
done chan struct{}
ch chan<- []*input.Event
ch chan<- []*common.MapStr
}

type finishedLogger struct {
Expand All @@ -44,7 +44,7 @@ func newSpoolerOutlet(
}
}

func (o *spoolerOutlet) OnEvent(event *input.Event) bool {
func (o *spoolerOutlet) OnEvent(event *common.MapStr) bool {
open := atomic.LoadInt32(&o.isOpen) == 1
if !open {
return false
Expand All @@ -69,12 +69,12 @@ func (o *spoolerOutlet) OnEvent(event *input.Event) bool {
func newPublisherChannel() *publisherChannel {
return &publisherChannel{
done: make(chan struct{}),
ch: make(chan []*input.Event, 1),
ch: make(chan []*common.MapStr, 1),
}
}

func (c *publisherChannel) Close() { close(c.done) }
func (c *publisherChannel) Send(events []*input.Event) bool {
func (c *publisherChannel) Send(events []*common.MapStr) bool {
select {
case <-c.done:
// set ch to nil, so no more events will be send after channel close signal
Expand All @@ -96,7 +96,7 @@ func newRegistrarLogger(reg *registrar.Registrar) *registrarLogger {
}

func (l *registrarLogger) Close() { close(l.done) }
func (l *registrarLogger) Published(events []*input.Event) bool {
func (l *registrarLogger) Published(events []*common.MapStr) bool {
select {
case <-l.done:
// set ch to nil, so no more events will be send after channel close signal
Expand All @@ -114,7 +114,7 @@ func newFinishedLogger(wg *sync.WaitGroup) *finishedLogger {
return &finishedLogger{wg}
}

func (l *finishedLogger) Published(events []*input.Event) bool {
func (l *finishedLogger) Published(events []*common.MapStr) bool {
for range events {
l.wg.Done()
}
Expand Down
4 changes: 4 additions & 0 deletions filebeat/input/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ func (e *Event) ToMapStr() common.MapStr {
event["message"] = *e.Text
}

meta := e.Metadata(); if meta != nil {
event["meta"] = meta
}

return event
}

Expand Down
2 changes: 2 additions & 0 deletions filebeat/prospector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/elastic/beats/filebeat/harvester/reader"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/match"
"github.com/elastic/beats/libbeat/processors"
)

var (
Expand Down Expand Up @@ -43,6 +44,7 @@ type prospectorConfig struct {
Pipeline string `config:"pipeline"`
Module string `config:"_module_name"` // hidden option to set the module name
Fileset string `config:"_fileset_name"` // hidden option to set the fileset name
Filters processors.PluginConfig `config:"filters"`
}

func (config *prospectorConfig) Validate() error {
Expand Down
27 changes: 22 additions & 5 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/processors"
)

var (
Expand All @@ -40,6 +41,7 @@ type Prospector struct {
registry *harvesterRegistry
beatDone chan struct{}
eventCounter *sync.WaitGroup
filters *processors.Processors
}

// Prospectorer is the interface common to all prospectors
Expand All @@ -50,7 +52,7 @@ type Prospectorer interface {

// Outlet is the outlet for a prospector
type Outlet interface {
OnEvent(event *input.Event) bool
OnEvent(event *common.MapStr) bool
}

// NewProspector instantiates a new prospector
Expand Down Expand Up @@ -84,6 +86,13 @@ func NewProspector(cfg *common.Config, outlet Outlet, beatDone chan struct{}) (*
return nil, err
}

f, err := processors.New(prospector.config.Filters)
if err != nil {
return nil, err
}

prospector.filters = f

logp.Debug("prospector", "File Configs: %v", prospector.config.Paths)

return prospector, nil
Expand Down Expand Up @@ -214,12 +223,20 @@ func (p *Prospector) updateState(event *input.Event) error {
event.Module = p.config.Module
event.Fileset = p.config.Fileset

ok := p.outlet.OnEvent(event)
if !ok {
logp.Info("Prospector outlet closed")
return errors.New("prospector outlet closed")
eventMap := event.ToMapStr()

//run the filters before sending to
eventMap = p.filters.Run(eventMap)
if eventMap != nil {
//processor might decide to drop the event
ok := p.outlet.OnEvent(&eventMap)
if !ok {
logp.Info("Prospector outlet closed")
return errors.New("prospector outlet closed")
}
}


p.states.Update(event.State)
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions filebeat/publisher/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import (
"sync/atomic"
"time"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/common"
)

type asyncLogPublisher struct {
pub publisher.Publisher
client publisher.Client
in chan []*input.Event
in chan []*common.MapStr
out SuccessLogger

// list of in-flight batches
Expand All @@ -29,7 +29,7 @@ type asyncLogPublisher struct {
type eventsBatch struct {
next *eventsBatch
flag int32
events []*input.Event
events []*common.MapStr
}

type batchList struct {
Expand All @@ -50,7 +50,7 @@ const (
)

func newAsyncLogPublisher(
in chan []*input.Event,
in chan []*common.MapStr,
out SuccessLogger,
pub publisher.Publisher,
) *asyncLogPublisher {
Expand Down
17 changes: 10 additions & 7 deletions filebeat/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package publisher
import (
"errors"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
Expand All @@ -24,12 +23,12 @@ type LogPublisher interface {
type SuccessLogger interface {

// Published will be run after events have been acknowledged by the outputs.
Published(events []*input.Event) bool
Published(events []*common.MapStr) bool
}

func New(
async bool,
in chan []*input.Event,
in chan []*common.MapStr,
out SuccessLogger,
pub publisher.Publisher,
) LogPublisher {
Expand All @@ -46,14 +45,18 @@ var (

// getDataEvents returns all events which contain data (not only state updates)
// together with their associated metadata
func getDataEvents(events []*input.Event) (dataEvents []common.MapStr, meta []common.MapStr) {
func getDataEvents(events []*common.MapStr) (dataEvents []common.MapStr, meta []common.MapStr) {
dataEvents = make([]common.MapStr, 0, len(events))
meta = make([]common.MapStr, 0, len(events))
for _, event := range events {
if event.HasData() {
dataEvents = append(dataEvents, event.ToMapStr())
meta = append(meta, event.Metadata())
if ok, _ := event.HasKey("meta"); ok {
mIface, err := event.GetValue("meta"); if err != nil {
meta = append(meta, mIface.(common.MapStr))
}
event.Delete("meta")
}
dataEvents = append(dataEvents, *event)

}
return dataEvents, meta
}
8 changes: 4 additions & 4 deletions filebeat/publisher/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,23 @@ package publisher
import (
"sync"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/common"
)

type syncLogPublisher struct {
pub publisher.Publisher
client publisher.Client
in chan []*input.Event
in chan []*common.MapStr
out SuccessLogger

done chan struct{}
wg sync.WaitGroup
}

func newSyncLogPublisher(
in chan []*input.Event,
in chan []*common.MapStr,
out SuccessLogger,
pub publisher.Publisher,
) *syncLogPublisher {
Expand Down Expand Up @@ -51,7 +51,7 @@ func (p *syncLogPublisher) Start() {
}

func (p *syncLogPublisher) Publish() error {
var events []*input.Event
var events []*common.MapStr
select {
case <-p.done:
return sigPublisherStop
Expand Down
28 changes: 19 additions & 9 deletions filebeat/registrar/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ import (
"sync"

cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/publisher"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/paths"
"github.com/elastic/beats/libbeat/common"
)

type Registrar struct {
Channel chan []*input.Event
Channel chan []*common.MapStr
out publisher.SuccessLogger
done chan struct{}
registryFile string // Path to the Registry File
Expand All @@ -38,7 +38,7 @@ func New(registryFile string, out publisher.SuccessLogger) (*Registrar, error) {
registryFile: registryFile,
done: make(chan struct{}),
states: file.NewStates(),
Channel: make(chan []*input.Event, 1),
Channel: make(chan []*common.MapStr, 1),
out: out,
wg: sync.WaitGroup{},
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func (r *Registrar) Run() {
}()

for {
var events []*input.Event
var events []*common.MapStr

select {
case <-r.done:
Expand Down Expand Up @@ -183,18 +183,28 @@ func (r *Registrar) Run() {
}

// processEventStates gets the states from the events and writes them to the registrar state
func (r *Registrar) processEventStates(events []*input.Event) {
func (r *Registrar) processEventStates(events []*common.MapStr) {
logp.Debug("registrar", "Processing %d events", len(events))

// Take the last event found for each file source
for _, event := range events {

// skip stdin
if event.InputType == cfg.StdinInputType {
continue
inputIface, err := event.GetValue("input_type"); if err == nil {
input_string, ok := inputIface.(string); if ok {
if input_string == cfg.StdinInputType {
continue
}
}
}
r.states.Update(event.State)
statesUpdate.Add(1)

stateIface, err := event.GetValue("state"); if err == nil {
state, ok := stateIface.(file.State); if ok {
r.states.Update(state)
statesUpdate.Add(1)
}
}

}
}

Expand Down
Loading

0 comments on commit d63d93b

Please sign in to comment.