Skip to content

Commit

Permalink
Renaming EventHolder to Data
Browse files Browse the repository at this point in the history
  • Loading branch information
vjsamuel committed Mar 29, 2017
1 parent c78db55 commit 3a620e6
Show file tree
Hide file tree
Showing 12 changed files with 65 additions and 89 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]
- The `symlinks` and `harverster_limit` settings are now GA, instead of experimental. {pull}3525[3525]
- close_timeout is also applied when the output is blocking. {pull}3511[3511]
- Improve handling of different path variants on Windows. {pull}3781[3781]
- Add support for prospector level processors. {pull}3823[3823]
- Restructure input.Event to be inline with outputs.Data {pull}3823[3823]

*Heartbeat*

Expand Down
14 changes: 7 additions & 7 deletions filebeat/beater/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ type spoolerOutlet struct {

type publisherChannel struct {
done chan struct{}
ch chan []*input.EventHolder
ch chan []*input.Data
}

type registrarLogger struct {
done chan struct{}
ch chan<- []*input.EventHolder
ch chan<- []*input.Data
}

type finishedLogger struct {
Expand All @@ -44,7 +44,7 @@ func newSpoolerOutlet(
}
}

func (o *spoolerOutlet) OnEvent(event *input.EventHolder) bool {
func (o *spoolerOutlet) OnEvent(event *input.Data) bool {
open := atomic.LoadInt32(&o.isOpen) == 1
if !open {
return false
Expand All @@ -69,12 +69,12 @@ func (o *spoolerOutlet) OnEvent(event *input.EventHolder) bool {
func newPublisherChannel() *publisherChannel {
return &publisherChannel{
done: make(chan struct{}),
ch: make(chan []*input.EventHolder, 1),
ch: make(chan []*input.Data, 1),
}
}

func (c *publisherChannel) Close() { close(c.done) }
func (c *publisherChannel) Send(events []*input.EventHolder) bool {
func (c *publisherChannel) Send(events []*input.Data) bool {
select {
case <-c.done:
// set ch to nil, so no more events will be send after channel close signal
Expand All @@ -96,7 +96,7 @@ func newRegistrarLogger(reg *registrar.Registrar) *registrarLogger {
}

func (l *registrarLogger) Close() { close(l.done) }
func (l *registrarLogger) Published(events []*input.EventHolder) bool {
func (l *registrarLogger) Published(events []*input.Data) bool {
select {
case <-l.done:
// set ch to nil, so no more events will be send after channel close signal
Expand All @@ -114,7 +114,7 @@ func newFinishedLogger(wg *sync.WaitGroup) *finishedLogger {
return &finishedLogger{wg}
}

func (l *finishedLogger) Published(events []*input.EventHolder) bool {
func (l *finishedLogger) Published(events []*input.Data) bool {
for range events {
l.wg.Done()
}
Expand Down
18 changes: 9 additions & 9 deletions filebeat/input/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ type Event struct {
}

type EventMeta struct {
common.EventMetadata
Pipeline string
Fileset string
Module string
InputType string
DocumentType string
common.EventMetadata
ReadTime time.Time
Bytes int
State file.State
ReadTime time.Time
Bytes int
State file.State
}

type EventHolder struct {
type Data struct {
Event common.MapStr
Metadata EventMeta
}
Expand Down Expand Up @@ -81,8 +81,8 @@ func (e *Event) ToMapStr() common.MapStr {
return event
}

func (e *Event) GetEventHolder() EventHolder {
return EventHolder{
func (e *Event) GetEventHolder() Data {
return Data{
Event: e.ToMapStr(),
Metadata: EventMeta{
Pipeline: e.Pipeline,
Expand All @@ -98,7 +98,7 @@ func (e *Event) GetEventHolder() EventHolder {

// Metadata creates a common.MapStr containing the metadata to
// be associated with the event.
func (eh *EventHolder) GetMetadata() common.MapStr {
func (eh *Data) GetMetadata() common.MapStr {
if eh.Metadata.Pipeline != "" {
return common.MapStr{
"pipeline": eh.Metadata.Pipeline,
Expand All @@ -109,7 +109,7 @@ func (eh *EventHolder) GetMetadata() common.MapStr {

// HasData returns true if the event itself contains data
// Events without data are only state updates
func (eh *EventHolder) HasData() bool {
func (eh *Data) HasData() bool {
return eh.Metadata.Bytes > 0
}

Expand Down
36 changes: 17 additions & 19 deletions filebeat/prospector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/elastic/beats/filebeat/harvester/reader"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/match"
"github.com/elastic/beats/libbeat/processors"
)

var (
Expand All @@ -27,24 +26,23 @@ var (
)

type prospectorConfig struct {
common.EventMetadata `config:",inline"` // Fields and tags to add to events.
Enabled bool `config:"enabled"`
DocumentType string `config:"document_type"`
ExcludeFiles []match.Matcher `config:"exclude_files"`
IgnoreOlder time.Duration `config:"ignore_older"`
Paths []string `config:"paths"`
ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"`
InputType string `config:"input_type"`
CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"`
CleanRemoved bool `config:"clean_removed"`
HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"`
Symlinks bool `config:"symlinks"`
TailFiles bool `config:"tail_files"`
JSON *reader.JSONConfig `config:"json"`
Pipeline string `config:"pipeline"`
Module string `config:"_module_name"` // hidden option to set the module name
Fileset string `config:"_fileset_name"` // hidden option to set the fileset name
Processors processors.PluginConfig `config:"processors"`
common.EventMetadata `config:",inline"` // Fields and tags to add to events.
Enabled bool `config:"enabled"`
DocumentType string `config:"document_type"`
ExcludeFiles []match.Matcher `config:"exclude_files"`
IgnoreOlder time.Duration `config:"ignore_older"`
Paths []string `config:"paths"`
ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"`
InputType string `config:"input_type"`
CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"`
CleanRemoved bool `config:"clean_removed"`
HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"`
Symlinks bool `config:"symlinks"`
TailFiles bool `config:"tail_files"`
JSON *reader.JSONConfig `config:"json"`
Pipeline string `config:"pipeline"`
Module string `config:"_module_name"` // hidden option to set the module name
Fileset string `config:"_fileset_name"` // hidden option to set the fileset name
}

func (config *prospectorConfig) Validate() error {
Expand Down
26 changes: 2 additions & 24 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/processors"
)

var (
Expand All @@ -41,7 +40,6 @@ type Prospector struct {
registry *harvesterRegistry
beatDone chan struct{}
eventCounter *sync.WaitGroup
processors *processors.Processors
}

// Prospectorer is the interface common to all prospectors
Expand All @@ -52,7 +50,7 @@ type Prospectorer interface {

// Outlet is the outlet for a prospector
type Outlet interface {
OnEvent(event *input.EventHolder) bool
OnEvent(event *input.Data) bool
}

// NewProspector instantiates a new prospector
Expand Down Expand Up @@ -86,13 +84,6 @@ func NewProspector(cfg *common.Config, outlet Outlet, beatDone chan struct{}) (*
return nil, err
}

f, err := processors.New(prospector.config.Processors)
if err != nil {
return nil, err
}

prospector.processors = f

logp.Debug("prospector", "File Configs: %v", prospector.config.Paths)

return prospector, nil
Expand Down Expand Up @@ -224,20 +215,7 @@ func (p *Prospector) updateState(event *input.Event) error {
event.Fileset = p.config.Fileset

eventHolder := event.GetEventHolder()
//run the filters before sending to
if event.Bytes > 0 {
eventHolder.Event = p.processors.Run(eventHolder.Event)
}

var ok bool
if eventHolder.Event != nil {
//processor might decide to drop the event
ok = p.outlet.OnEvent(&eventHolder)

} else {
eventHolder.Metadata.Bytes = 0
ok = p.outlet.OnEvent(&eventHolder)
}
ok := p.outlet.OnEvent(&eventHolder)

if !ok {
logp.Info("Prospector outlet closed")
Expand Down
2 changes: 1 addition & 1 deletion filebeat/prospector/prospector_log_other_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,4 @@ func TestInit(t *testing.T) {
// TestOutlet is an empty outlet for testing
type TestOutlet struct{}

func (o TestOutlet) OnEvent(event *input.EventHolder) bool { return true }
func (o TestOutlet) OnEvent(event *input.Data) bool { return true }
6 changes: 3 additions & 3 deletions filebeat/publisher/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
type asyncLogPublisher struct {
pub publisher.Publisher
client publisher.Client
in chan []*input.EventHolder
in chan []*input.Data
out SuccessLogger

// list of in-flight batches
Expand All @@ -29,7 +29,7 @@ type asyncLogPublisher struct {
type eventsBatch struct {
next *eventsBatch
flag int32
events []*input.EventHolder
events []*input.Data
}

type batchList struct {
Expand All @@ -50,7 +50,7 @@ const (
)

func newAsyncLogPublisher(
in chan []*input.EventHolder,
in chan []*input.Data,
out SuccessLogger,
pub publisher.Publisher,
) *asyncLogPublisher {
Expand Down
6 changes: 3 additions & 3 deletions filebeat/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ type LogPublisher interface {
type SuccessLogger interface {

// Published will be run after events have been acknowledged by the outputs.
Published(events []*input.EventHolder) bool
Published(events []*input.Data) bool
}

func New(
async bool,
in chan []*input.EventHolder,
in chan []*input.Data,
out SuccessLogger,
pub publisher.Publisher,
) LogPublisher {
Expand All @@ -46,7 +46,7 @@ var (

// getDataEvents returns all events which contain data (not only state updates)
// together with their associated metadata
func getDataEvents(events []*input.EventHolder) (dataEvents []common.MapStr, meta []common.MapStr) {
func getDataEvents(events []*input.Data) (dataEvents []common.MapStr, meta []common.MapStr) {
dataEvents = make([]common.MapStr, 0, len(events))
meta = make([]common.MapStr, 0, len(events))
for _, event := range events {
Expand Down
12 changes: 6 additions & 6 deletions filebeat/publisher/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@ import (

type collectLogger struct {
wg *sync.WaitGroup
events [][]*input.EventHolder
events [][]*input.Data
}

func (l *collectLogger) Published(events []*input.EventHolder) bool {
func (l *collectLogger) Published(events []*input.Data) bool {
l.wg.Done()
l.events = append(l.events, events)
return true
}

func makeEvents(name string, n int) []*input.EventHolder {
var events []*input.EventHolder
func makeEvents(name string, n int) []*input.Data {
var events []*input.Data
for i := 0; i < n; i++ {
event := &input.Event{
EventMeta: input.EventMeta{
Expand Down Expand Up @@ -59,15 +59,15 @@ func TestPublisherModes(t *testing.T) {

wg := sync.WaitGroup{}

pubChan := make(chan []*input.EventHolder, len(test.order)+1)
pubChan := make(chan []*input.Data, len(test.order)+1)
collector := &collectLogger{&wg, nil}
client := pubtest.NewChanClient(0)

pub := New(test.async, pubChan, collector,
pubtest.PublisherWithClient(client))
pub.Start()

var events [][]*input.EventHolder
var events [][]*input.Data
for i := range test.order {
tmp := makeEvents(fmt.Sprintf("msg: %v", i), 1)
wg.Add(1)
Expand Down
6 changes: 3 additions & 3 deletions filebeat/publisher/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import (
type syncLogPublisher struct {
pub publisher.Publisher
client publisher.Client
in chan []*input.EventHolder
in chan []*input.Data
out SuccessLogger

done chan struct{}
wg sync.WaitGroup
}

func newSyncLogPublisher(
in chan []*input.EventHolder,
in chan []*input.Data,
out SuccessLogger,
pub publisher.Publisher,
) *syncLogPublisher {
Expand Down Expand Up @@ -51,7 +51,7 @@ func (p *syncLogPublisher) Start() {
}

func (p *syncLogPublisher) Publish() error {
var events []*input.EventHolder
var events []*input.Data
select {
case <-p.done:
return sigPublisherStop
Expand Down
8 changes: 4 additions & 4 deletions filebeat/registrar/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

type Registrar struct {
Channel chan []*input.EventHolder
Channel chan []*input.Data
out publisher.SuccessLogger
done chan struct{}
registryFile string // Path to the Registry File
Expand All @@ -38,7 +38,7 @@ func New(registryFile string, out publisher.SuccessLogger) (*Registrar, error) {
registryFile: registryFile,
done: make(chan struct{}),
states: file.NewStates(),
Channel: make(chan []*input.EventHolder, 1),
Channel: make(chan []*input.Data, 1),
out: out,
wg: sync.WaitGroup{},
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func (r *Registrar) Run() {
}()

for {
var events []*input.EventHolder
var events []*input.Data

select {
case <-r.done:
Expand Down Expand Up @@ -183,7 +183,7 @@ func (r *Registrar) Run() {
}

// processEventStates gets the states from the events and writes them to the registrar state
func (r *Registrar) processEventStates(events []*input.EventHolder) {
func (r *Registrar) processEventStates(events []*input.Data) {
logp.Debug("registrar", "Processing %d events", len(events))

// skip stdin
Expand Down
Loading

0 comments on commit 3a620e6

Please sign in to comment.