Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor input.Event similar to outputs.Data #3823

Merged
merged 5 commits into from
Mar 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]
- The `symlinks` and `harverster_limit` settings are now GA, instead of experimental. {pull}3525[3525]
- close_timeout is also applied when the output is blocking. {pull}3511[3511]
- Improve handling of different path variants on Windows. {pull}3781[3781]
- Restructure input.Event to be inline with outputs.Data {pull}3823[3823]

*Heartbeat*

Expand Down
14 changes: 7 additions & 7 deletions filebeat/beater/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ type spoolerOutlet struct {

type publisherChannel struct {
done chan struct{}
ch chan []*input.Event
ch chan []*input.Data
}

type registrarLogger struct {
done chan struct{}
ch chan<- []*input.Event
ch chan<- []*input.Data
}

type finishedLogger struct {
Expand All @@ -44,7 +44,7 @@ func newSpoolerOutlet(
}
}

func (o *spoolerOutlet) OnEvent(event *input.Event) bool {
func (o *spoolerOutlet) OnEvent(event *input.Data) bool {
open := atomic.LoadInt32(&o.isOpen) == 1
if !open {
return false
Expand All @@ -69,12 +69,12 @@ func (o *spoolerOutlet) OnEvent(event *input.Event) bool {
func newPublisherChannel() *publisherChannel {
return &publisherChannel{
done: make(chan struct{}),
ch: make(chan []*input.Event, 1),
ch: make(chan []*input.Data, 1),
}
}

func (c *publisherChannel) Close() { close(c.done) }
func (c *publisherChannel) Send(events []*input.Event) bool {
func (c *publisherChannel) Send(events []*input.Data) bool {
select {
case <-c.done:
// set ch to nil, so no more events will be send after channel close signal
Expand All @@ -96,7 +96,7 @@ func newRegistrarLogger(reg *registrar.Registrar) *registrarLogger {
}

func (l *registrarLogger) Close() { close(l.done) }
func (l *registrarLogger) Published(events []*input.Event) bool {
func (l *registrarLogger) Published(events []*input.Data) bool {
select {
case <-l.done:
// set ch to nil, so no more events will be send after channel close signal
Expand All @@ -114,7 +114,7 @@ func newFinishedLogger(wg *sync.WaitGroup) *finishedLogger {
return &finishedLogger{wg}
}

func (l *finishedLogger) Published(events []*input.Event) bool {
func (l *finishedLogger) Published(events []*input.Data) bool {
for range events {
l.wg.Done()
}
Expand Down
54 changes: 41 additions & 13 deletions filebeat/input/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,37 @@ import (

// Event is sent to the output and must contain all relevant information
type Event struct {
EventMeta
Text *string
JSONConfig *reader.JSONConfig
Data common.MapStr // Use in readers to add data to the event

}

type EventMeta struct {
common.EventMetadata
ReadTime time.Time
Pipeline string
Fileset string
Module string
InputType string
DocumentType string
ReadTime time.Time
Bytes int
Text *string
JSONConfig *reader.JSONConfig
State file.State
Data common.MapStr // Use in readers to add data to the event
Pipeline string
Fileset string
Module string
}

type Data struct {
Event common.MapStr
Metadata EventMeta
}

func NewEvent(state file.State) *Event {
return &Event{
State: state,
EventMeta: EventMeta{
State: state,
},
}

}

func (e *Event) ToMapStr() common.MapStr {
Expand Down Expand Up @@ -68,21 +81,36 @@ func (e *Event) ToMapStr() common.MapStr {
return event
}

func (e *Event) GetData() Data {
return Data{
Event: e.ToMapStr(),
Metadata: EventMeta{
Pipeline: e.Pipeline,
Bytes: e.Bytes,
State: e.State,
Fileset: e.Fileset,
Module: e.Module,
ReadTime: e.ReadTime,
EventMetadata: e.EventMetadata,
},
}
}

// Metadata creates a common.MapStr containing the metadata to
// be associated with the event.
func (e *Event) Metadata() common.MapStr {
if e.Pipeline != "" {
func (eh *Data) GetMetadata() common.MapStr {
if eh.Metadata.Pipeline != "" {
return common.MapStr{
"pipeline": e.Pipeline,
"pipeline": eh.Metadata.Pipeline,
}
}
return nil
}

// HasData returns true if the event itself contains data
// Events without data are only state updates
func (e *Event) HasData() bool {
return e.Bytes > 0
func (eh *Data) HasData() bool {
return eh.Metadata.Bytes > 0
}

// mergeJSONFields writes the JSON fields in the event map,
Expand Down
108 changes: 65 additions & 43 deletions filebeat/input/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ func TestEventToMapStrJSON(t *testing.T) {
{
// by default, don't overwrite keys
Event: Event{
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true},
EventMeta: EventMeta{
DocumentType: "test_type",
},

Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true},
},
ExpectedItems: common.MapStr{
"type": "test_type",
Expand All @@ -44,10 +47,12 @@ func TestEventToMapStrJSON(t *testing.T) {
{
// overwrite keys if asked
Event: Event{
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
EventMeta: EventMeta{
DocumentType: "test_type",
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"type": "test",
Expand All @@ -57,10 +62,12 @@ func TestEventToMapStrJSON(t *testing.T) {
{
// without keys_under_root, put everything in a json key
Event: Event{
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}},
JSONConfig: &reader.JSONConfig{},
EventMeta: EventMeta{
DocumentType: "test_type",
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hello"}},
JSONConfig: &reader.JSONConfig{},
},
ExpectedItems: common.MapStr{
"json": common.MapStr{"type": "test", "text": "hello"},
Expand All @@ -70,10 +77,12 @@ func TestEventToMapStrJSON(t *testing.T) {
{
// when MessageKey is defined, the Text overwrites the value of that key
Event: Event{
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hi"}},
JSONConfig: &reader.JSONConfig{MessageKey: "text"},
EventMeta: EventMeta{
DocumentType: "test_type",
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "text": "hi"}},
JSONConfig: &reader.JSONConfig{MessageKey: "text"},
},
ExpectedItems: common.MapStr{
"json": common.MapStr{"type": "test", "text": "hello"},
Expand All @@ -84,11 +93,13 @@ func TestEventToMapStrJSON(t *testing.T) {
// when @timestamp is in JSON and overwrite_keys is true, parse it
// in a common.Time
Event: Event{
ReadTime: now,
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.444Z"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
EventMeta: EventMeta{
DocumentType: "test_type",
ReadTime: now,
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.444Z"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"@timestamp": common.MustParseTime("2016-04-05T18:47:18.444Z"),
Expand All @@ -99,11 +110,13 @@ func TestEventToMapStrJSON(t *testing.T) {
// when the parsing on @timestamp fails, leave the existing value and add an error key
// in a common.Time
Event: Event{
ReadTime: now,
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.44XX4Z"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
EventMeta: EventMeta{
DocumentType: "test_type",
ReadTime: now,
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.44XX4Z"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"@timestamp": common.Time(now),
Expand All @@ -115,11 +128,13 @@ func TestEventToMapStrJSON(t *testing.T) {
// when the @timestamp has the wrong type, leave the existing value and add an error key
// in a common.Time
Event: Event{
ReadTime: now,
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": 42}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
EventMeta: EventMeta{
DocumentType: "test_type",
ReadTime: now,
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "test", "@timestamp": 42}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"@timestamp": common.Time(now),
Expand All @@ -130,10 +145,12 @@ func TestEventToMapStrJSON(t *testing.T) {
{
// if overwrite_keys is true, but the `type` key in json is not a string, ignore it
Event: Event{
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": 42}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
EventMeta: EventMeta{
DocumentType: "test_type",
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": 42}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"type": "test_type",
Expand All @@ -143,10 +160,12 @@ func TestEventToMapStrJSON(t *testing.T) {
{
// if overwrite_keys is true, but the `type` key in json is empty, ignore it
Event: Event{
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": ""}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
EventMeta: EventMeta{
DocumentType: "test_type",
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": ""}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"type": "test_type",
Expand All @@ -156,10 +175,13 @@ func TestEventToMapStrJSON(t *testing.T) {
{
// if overwrite_keys is true, but the `type` key in json starts with _, ignore it
Event: Event{
DocumentType: "test_type",
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "_type"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
EventMeta: EventMeta{
DocumentType: "test_type",
ReadTime: now,
},
Text: &text,
Data: common.MapStr{"json": common.MapStr{"type": "_type"}},
JSONConfig: &reader.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"type": "test_type",
Expand Down
6 changes: 4 additions & 2 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Prospectorer interface {

// Outlet is the outlet for a prospector
type Outlet interface {
OnEvent(event *input.Event) bool
OnEvent(event *input.Data) bool
}

// NewProspector instantiates a new prospector
Expand Down Expand Up @@ -214,7 +214,9 @@ func (p *Prospector) updateState(event *input.Event) error {
event.Module = p.config.Module
event.Fileset = p.config.Fileset

ok := p.outlet.OnEvent(event)
eventHolder := event.GetData()
ok := p.outlet.OnEvent(&eventHolder)

if !ok {
logp.Info("Prospector outlet closed")
return errors.New("prospector outlet closed")
Expand Down
2 changes: 1 addition & 1 deletion filebeat/prospector/prospector_log_other_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,4 @@ func TestInit(t *testing.T) {
// TestOutlet is an empty outlet for testing
type TestOutlet struct{}

func (o TestOutlet) OnEvent(event *input.Event) bool { return true }
func (o TestOutlet) OnEvent(event *input.Data) bool { return true }
6 changes: 3 additions & 3 deletions filebeat/publisher/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
type asyncLogPublisher struct {
pub publisher.Publisher
client publisher.Client
in chan []*input.Event
in chan []*input.Data
out SuccessLogger

// list of in-flight batches
Expand All @@ -29,7 +29,7 @@ type asyncLogPublisher struct {
type eventsBatch struct {
next *eventsBatch
flag int32
events []*input.Event
events []*input.Data
}

type batchList struct {
Expand All @@ -50,7 +50,7 @@ const (
)

func newAsyncLogPublisher(
in chan []*input.Event,
in chan []*input.Data,
out SuccessLogger,
pub publisher.Publisher,
) *asyncLogPublisher {
Expand Down
Loading