diff --git a/filebeat/channel/connector.go b/filebeat/channel/connector.go new file mode 100644 index 00000000000..af1cf33c06b --- /dev/null +++ b/filebeat/channel/connector.go @@ -0,0 +1,119 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package channel + +import ( + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/processors" +) + +// ConnectorFunc is an adapter for using ordinary functions as Connector. +type ConnectorFunc func(*common.Config, beat.ClientConfig) (Outleter, error) + +type pipelineConnector struct { + parent *OutletFactory + pipeline beat.Pipeline +} + +// Connect passes the cfg and the zero value of beat.ClientConfig to the underlying function. +func (fn ConnectorFunc) Connect(cfg *common.Config) (Outleter, error) { + return fn(cfg, beat.ClientConfig{}) +} + +// ConnectWith passes the configuration and the pipeline connection setting to the underlying function. +func (fn ConnectorFunc) ConnectWith(cfg *common.Config, clientCfg beat.ClientConfig) (Outleter, error) { + return fn(cfg, clientCfg) +} + +func (c *pipelineConnector) Connect(cfg *common.Config) (Outleter, error) { + return c.ConnectWith(cfg, beat.ClientConfig{}) +} + +func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.ClientConfig) (Outleter, error) { + config := inputOutletConfig{} + if err := cfg.Unpack(&config); err != nil { + return nil, err + } + + var err error + var userProcessors beat.ProcessorList + + userProcessors, err = processors.New(config.Processors) + if err != nil { + return nil, err + } + + if lst := clientCfg.Processing.Processor; lst != nil { + if len(userProcessors.All()) == 0 { + userProcessors = lst + } else if orig := lst.All(); len(orig) > 0 { + newLst := processors.NewList(nil) + newLst.List = append(newLst.List, lst, userProcessors) + userProcessors = newLst + } + } + + setOptional := func(to common.MapStr, key string, value string) { + if value != "" { + to.Put(key, value) + } + } + + meta := clientCfg.Processing.Meta.Clone() + fields := clientCfg.Processing.Fields.Clone() + + serviceType := config.ServiceType + if serviceType == "" { + serviceType = config.Module + } + + setOptional(meta, "pipeline", config.Pipeline) + setOptional(fields, "fileset.name", config.Fileset) + setOptional(fields, "service.type", serviceType) + setOptional(fields, "input.type", config.Type) + if config.Module != "" { + event := common.MapStr{"module": config.Module} + if config.Fileset != "" { + event["dataset"] = config.Module + "." + config.Fileset + } + fields["event"] = event + } + + mode := clientCfg.PublishMode + if mode == beat.DefaultGuarantees { + mode = beat.GuaranteedSend + } + + // connect with updated configuration + clientCfg.PublishMode = mode + clientCfg.Processing.EventMetadata = config.EventMetadata + clientCfg.Processing.Meta = meta + clientCfg.Processing.Fields = fields + clientCfg.Processing.Processor = userProcessors + client, err := c.pipeline.ConnectWith(clientCfg) + if err != nil { + return nil, err + } + + outlet := newOutlet(client, c.parent.wgEvents) + if c.parent.done != nil { + return CloseOnSignal(outlet, c.parent.done), nil + } + return outlet, nil +} diff --git a/filebeat/channel/factory.go b/filebeat/channel/factory.go index c295d1a4056..e31c3f5240d 100644 --- a/filebeat/channel/factory.go +++ b/filebeat/channel/factory.go @@ -82,75 +82,12 @@ func NewOutletFactory( // Inputs and all harvesters use the same pipeline client instance. // This guarantees ordering between events as required by the registrar for // file.State updates -func (f *OutletFactory) Create(p beat.Pipeline, cfg *common.Config, dynFields *common.MapStrPointer) (Outleter, error) { - config := inputOutletConfig{} - if err := cfg.Unpack(&config); err != nil { - return nil, err - } - - processors, err := processors.New(config.Processors) - if err != nil { - return nil, err - } - - setMeta := func(to common.MapStr, key, value string) { - if value != "" { - to[key] = value - } - } - - meta := common.MapStr{} - setMeta(meta, "pipeline", config.Pipeline) - - fields := common.MapStr{} - setMeta(fields, "module", config.Module) - if config.Module != "" && config.Fileset != "" { - setMeta(fields, "dataset", config.Module+"."+config.Fileset) - } - if len(fields) > 0 { - fields = common.MapStr{ - "event": fields, - } - } - if config.Fileset != "" { - fields.Put("fileset.name", config.Fileset) - } - if config.ServiceType != "" { - fields.Put("service.type", config.ServiceType) - } else if config.Module != "" { - fields.Put("service.type", config.Module) - } - if config.Type != "" { - fields.Put("input.type", config.Type) - } - - client, err := p.ConnectWith(beat.ClientConfig{ - PublishMode: beat.GuaranteedSend, - Processing: beat.ProcessingConfig{ - EventMetadata: config.EventMetadata, - DynamicFields: dynFields, - Meta: meta, - Fields: fields, - Processor: processors, - }, - Events: f.eventer, - }) - if err != nil { - return nil, err - } - - outlet := newOutlet(client, f.wgEvents) - if f.done != nil { - return CloseOnSignal(outlet, f.done), nil - } - return outlet, nil +func (f *OutletFactory) Create(p beat.Pipeline) Connector { + return &pipelineConnector{parent: f, pipeline: p} } -func (*clientEventer) Closing() {} -func (*clientEventer) Closed() {} -func (*clientEventer) Published() {} - -func (c *clientEventer) FilteredOut(_ beat.Event) {} -func (c *clientEventer) DroppedOnPublish(_ beat.Event) { - c.wgEvents.Done() -} +func (e *clientEventer) Closing() {} +func (e *clientEventer) Closed() {} +func (e *clientEventer) Published() {} +func (e *clientEventer) FilteredOut(evt beat.Event) {} +func (e *clientEventer) DroppedOnPublish(evt beat.Event) { e.wgEvents.Done() } diff --git a/filebeat/channel/interface.go b/filebeat/channel/interface.go index 877b818870a..30b41ac0461 100644 --- a/filebeat/channel/interface.go +++ b/filebeat/channel/interface.go @@ -18,20 +18,23 @@ package channel import ( - "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" ) // Factory is used to create a new Outlet instance -type Factory func(beat.Pipeline, *common.Config, *common.MapStrPointer) (Outleter, error) +type Factory func(beat.Pipeline) Connector // Connector creates an Outlet connecting the event publishing with some internal pipeline. -type Connector func(*common.Config, *common.MapStrPointer) (Outleter, error) +// type Connector func(*common.Config, *common.MapStrPointer) (Outleter, error) +type Connector interface { + Connect(*common.Config) (Outleter, error) + ConnectWith(*common.Config, beat.ClientConfig) (Outleter, error) +} // Outleter is the outlet for an input type Outleter interface { Close() error Done() <-chan struct{} - OnEvent(data *util.Data) bool + OnEvent(beat.Event) bool } diff --git a/filebeat/channel/outlet.go b/filebeat/channel/outlet.go index c0fe2b0c9e3..ce8e5de7996 100644 --- a/filebeat/channel/outlet.go +++ b/filebeat/channel/outlet.go @@ -18,7 +18,6 @@ package channel import ( - "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common/atomic" ) @@ -53,16 +52,11 @@ func (o *outlet) Done() <-chan struct{} { return o.done } -func (o *outlet) OnEvent(d *util.Data) bool { +func (o *outlet) OnEvent(event beat.Event) bool { if !o.isOpen.Load() { return false } - event := d.GetEvent() - if d.HasState() { - event.Private = d.GetState() - } - if o.wg != nil { o.wg.Add(1) } diff --git a/filebeat/channel/util.go b/filebeat/channel/util.go index aec2132fa20..f7579b34117 100644 --- a/filebeat/channel/util.go +++ b/filebeat/channel/util.go @@ -20,32 +20,23 @@ package channel import ( "sync" - "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/common" ) type subOutlet struct { done chan struct{} - ch chan *util.Data + ch chan beat.Event res chan bool mutex sync.Mutex closeOnce sync.Once } -// ConnectTo creates a new Connector, combining a beat.Pipeline with an outlet Factory. -func ConnectTo(pipeline beat.Pipeline, factory Factory) Connector { - return func(cfg *common.Config, m *common.MapStrPointer) (Outleter, error) { - return factory(pipeline, cfg, m) - } -} - // SubOutlet create a sub-outlet, which can be closed individually, without closing the // underlying outlet. func SubOutlet(out Outleter) Outleter { s := &subOutlet{ done: make(chan struct{}), - ch: make(chan *util.Data), + ch: make(chan beat.Event), res: make(chan bool, 1), } @@ -75,7 +66,7 @@ func (o *subOutlet) Done() <-chan struct{} { return o.done } -func (o *subOutlet) OnEvent(d *util.Data) bool { +func (o *subOutlet) OnEvent(event beat.Event) bool { o.mutex.Lock() defer o.mutex.Unlock() @@ -89,7 +80,7 @@ func (o *subOutlet) OnEvent(d *util.Data) bool { case <-o.done: return false - case o.ch <- d: + case o.ch <- event: select { case <-o.done: diff --git a/filebeat/channel/util_test.go b/filebeat/channel/util_test.go index 8d3d3e818ca..c3a534f52bd 100644 --- a/filebeat/channel/util_test.go +++ b/filebeat/channel/util_test.go @@ -22,7 +22,7 @@ import ( "github.com/stretchr/testify/assert" - "github.com/elastic/beats/filebeat/util" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/tests/resources" ) @@ -31,7 +31,7 @@ type dummyOutletter struct { c chan struct{} } -func (o *dummyOutletter) OnEvent(event *util.Data) bool { +func (o *dummyOutletter) OnEvent(event beat.Event) bool { return true } diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index 288f102a2c3..d53511b90f0 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -117,7 +117,7 @@ func (c *Crawler) startInput( return nil } - connector := channel.ConnectTo(pipeline, c.out) + connector := c.out(pipeline) p, err := input.New(config, connector, c.beatDone, states, nil) if err != nil { return fmt.Errorf("Error while initializing input: %s", err) diff --git a/filebeat/fileset/factory.go b/filebeat/fileset/factory.go index 8bf3443f6dd..2d534e04573 100644 --- a/filebeat/fileset/factory.go +++ b/filebeat/fileset/factory.go @@ -98,7 +98,7 @@ func (f *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrP } inputs := make([]*input.Runner, len(pConfigs)) - connector := channel.ConnectTo(p, f.outlet) + connector := f.outlet(p) for i, pConfig := range pConfigs { inputs[i], err = input.New(pConfig, connector, f.beatDone, f.registrar.GetStates(), meta) if err != nil { diff --git a/filebeat/harvester/forwarder.go b/filebeat/harvester/forwarder.go index da485d85305..935555f777f 100644 --- a/filebeat/harvester/forwarder.go +++ b/filebeat/harvester/forwarder.go @@ -20,13 +20,13 @@ package harvester import ( "errors" - "github.com/elastic/beats/filebeat/util" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/logp" ) // Outlet interface is used for forwarding events type Outlet interface { - OnEvent(data *util.Data) bool + OnEvent(data beat.Event) bool } // Forwarder contains shared options between all harvesters needed to forward events @@ -46,8 +46,8 @@ func NewForwarder(outlet Outlet) *Forwarder { // Send updates the input state and sends the event to the spooler // All state updates done by the input itself are synchronous to make sure no states are overwritten -func (f *Forwarder) Send(data *util.Data) error { - ok := f.Outlet.OnEvent(data) +func (f *Forwarder) Send(event beat.Event) error { + ok := f.Outlet.OnEvent(event) if !ok { logp.Info("Input outlet closed") return errors.New("input outlet closed") diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index d56ed0f6796..8abe9fcf559 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -48,7 +48,6 @@ import ( "github.com/elastic/beats/filebeat/channel" "github.com/elastic/beats/filebeat/harvester" "github.com/elastic/beats/filebeat/input/file" - "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/reader" "github.com/elastic/beats/libbeat/reader/debug" "github.com/elastic/beats/libbeat/reader/multiline" @@ -99,7 +98,7 @@ type Harvester struct { // event/state publishing outletFactory OutletFactory - publishState func(*util.Data) bool + publishState func(file.State) bool onTerminate func() } @@ -109,7 +108,7 @@ func NewHarvester( config *common.Config, state file.State, states *file.States, - publishState func(*util.Data) bool, + publishState func(file.State) bool, outletFactory OutletFactory, ) (*Harvester, error) { @@ -231,7 +230,6 @@ func (h *Harvester) Run() error { // Closes reader after timeout or when done channel is closed // This routine is also responsible to properly stop the reader go func(source string) { - closeTimeout := make(<-chan time.Time) // starts close_timeout timer if h.config.CloseTimeout > 0 { @@ -293,56 +291,8 @@ func (h *Harvester) Run() error { startingOffset := state.Offset state.Offset += int64(message.Bytes) - // Create state event - data := util.NewData() - if h.source.HasState() { - data.SetState(state) - } - - text := string(message.Content) - - // Check if data should be added to event. Only export non empty events. - if !message.IsEmpty() && h.shouldExportLine(text) { - fields := common.MapStr{ - "log": common.MapStr{ - "offset": startingOffset, // Offset here is the offset before the starting char. - "file": common.MapStr{ - "path": state.Source, - }, - }, - } - fields.DeepUpdate(message.Fields) - - // Check if json fields exist - var jsonFields common.MapStr - if f, ok := fields["json"]; ok { - jsonFields = f.(common.MapStr) - } - - data.Event = beat.Event{ - Timestamp: message.Ts, - } - - if h.config.JSON != nil && len(jsonFields) > 0 { - ts := readjson.MergeJSONFields(fields, jsonFields, &text, *h.config.JSON) - if !ts.IsZero() { - // there was a `@timestamp` key in the event, so overwrite - // the resulting timestamp - data.Event.Timestamp = ts - } - } else if &text != nil { - if fields == nil { - fields = common.MapStr{} - } - fields["message"] = text - } - - data.Event.Fields = fields - } - - // Always send event to update state, also if lines was skipped // Stop harvester in case of an error - if !h.sendEvent(data, forwarder) { + if !h.onMessage(forwarder, state, message, startingOffset) { return nil } @@ -367,14 +317,70 @@ func (h *Harvester) Stop() { h.stopLock.Unlock() } -// sendEvent sends event to the spooler channel -// Return false if event was not sent -func (h *Harvester) sendEvent(data *util.Data, forwarder *harvester.Forwarder) bool { +// onMessage processes a new message read from the reader. +// This results in a state update and possibly an event would be send. +// A state update first updates the in memory state held by the prospector, +// and finally sends the file.State indirectly to the registrar. +// The events Private field is used to forward the file state update. +// +// onMessage returns 'false' if it was interrupted in the process of sending the event. +// This normally signals a harvester shutdown. +func (h *Harvester) onMessage( + forwarder *harvester.Forwarder, + state file.State, + message reader.Message, + messageOffset int64, +) bool { if h.source.HasState() { - h.states.Update(data.GetState()) + h.states.Update(state) + } + + text := string(message.Content) + if message.IsEmpty() || !h.shouldExportLine(text) { + // No data or event is filtered out -> send empty event with state update + // only. The call can fail on filebeat shutdown. + // The event will be filtered out, but forwarded to the registry as is. + err := forwarder.Send(beat.Event{Private: state}) + return err == nil + } + + fields := common.MapStr{ + "log": common.MapStr{ + "offset": messageOffset, // Offset here is the offset before the starting char. + "file": common.MapStr{ + "path": state.Source, + }, + }, + } + fields.DeepUpdate(message.Fields) + + // Check if json fields exist + var jsonFields common.MapStr + if f, ok := fields["json"]; ok { + jsonFields = f.(common.MapStr) } - err := forwarder.Send(data) + timestamp := message.Ts + + if h.config.JSON != nil && len(jsonFields) > 0 { + ts := readjson.MergeJSONFields(fields, jsonFields, &text, *h.config.JSON) + if !ts.IsZero() { + // there was a `@timestamp` key in the event, so overwrite + // the resulting timestamp + timestamp = ts + } + } else if &text != nil { + if fields == nil { + fields = common.MapStr{} + } + fields["message"] = text + } + + err := forwarder.Send(beat.Event{ + Timestamp: timestamp, + Fields: fields, + Private: state, + }) return err == nil } @@ -388,9 +394,7 @@ func (h *Harvester) SendStateUpdate() { return } - d := util.NewData() - d.SetState(h.state) - h.publishState(d) + h.publishState(h.state) logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset) h.states.Update(h.state) diff --git a/filebeat/input/log/harvester_test.go b/filebeat/input/log/harvester_test.go index a538f8f92ba..2ee03824efb 100644 --- a/filebeat/input/log/harvester_test.go +++ b/filebeat/input/log/harvester_test.go @@ -20,7 +20,6 @@ package log import ( - "fmt" "math/rand" "os" "path/filepath" @@ -102,21 +101,21 @@ func TestReadLine(t *testing.T) { // Read third line _, text, bytesread, _, err := readLine(r) - fmt.Printf("received line: '%s'\n", text) + t.Logf("received line: '%s'\n", text) assert.Nil(t, err) assert.Equal(t, text, firstLineString[0:len(firstLineString)-1]) assert.Equal(t, bytesread, len(firstLineString)) // read second line _, text, bytesread, _, err = readLine(r) - fmt.Printf("received line: '%s'\n", text) + t.Logf("received line: '%s'\n", text) assert.Equal(t, text, secondLineString[0:len(secondLineString)-1]) assert.Equal(t, bytesread, len(secondLineString)) assert.Nil(t, err) // Read third line, which doesn't exist _, text, bytesread, _, err = readLine(r) - fmt.Printf("received line: '%s'\n", text) + t.Logf("received line: '%s'\n", text) assert.Equal(t, "", text) assert.Equal(t, bytesread, 0) assert.Equal(t, err, ErrInactive) diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 6c19762ede9..24cbdf33abc 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -31,7 +31,7 @@ import ( "github.com/elastic/beats/filebeat/harvester" "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/filebeat/input/file" - "github.com/elastic/beats/filebeat/util" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/atomic" "github.com/elastic/beats/libbeat/logp" @@ -91,7 +91,11 @@ func NewInput( // The outlet generated here is the underlying outlet, only closed // once all workers have been shut down. // For state updates and events, separate sub-outlets will be used. - out, err := outlet(cfg, context.DynamicFields) + out, err := outlet.ConnectWith(cfg, beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + DynamicFields: context.DynamicFields, + }, + }) if err != nil { return nil, err } @@ -650,8 +654,8 @@ func (p *Input) createHarvester(state file.State, onTerminate func()) (*Harveste p.cfg, state, p.states, - func(d *util.Data) bool { - return p.stateOutlet.OnEvent(d) + func(state file.State) bool { + return p.stateOutlet.OnEvent(beat.Event{Private: state}) }, subOutletWrap(p.outlet), ) @@ -711,10 +715,9 @@ func (p *Input) updateState(state file.State) error { // Update first internal state p.states.Update(state) - - data := util.NewData() - data.SetState(state) - ok := p.outlet.OnEvent(data) + ok := p.outlet.OnEvent(beat.Event{ + Private: state, + }) if !ok { logp.Info("input outlet closed") return errors.New("input outlet closed") diff --git a/filebeat/input/log/input_test.go b/filebeat/input/log/input_test.go index d91bdabd964..eaf080d39d6 100644 --- a/filebeat/input/log/input_test.go +++ b/filebeat/input/log/input_test.go @@ -32,7 +32,7 @@ import ( "github.com/elastic/beats/filebeat/channel" "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/filebeat/input/file" - "github.com/elastic/beats/filebeat/util" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/match" "github.com/elastic/beats/libbeat/tests/resources" @@ -147,13 +147,13 @@ func testInputLifecycle(t *testing.T, context input.Context, closer func(input.C "close_eof": true, }) - events := make(chan *util.Data, 100) + events := make(chan beat.Event, 100) defer close(events) capturer := NewEventCapturer(events) defer capturer.Close() - connector := func(*common.Config, *common.MapStrPointer) (channel.Outleter, error) { + connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) { return channel.SubOutlet(capturer), nil - } + }) input, err := NewInput(config, connector, context) if err != nil { @@ -169,7 +169,7 @@ func testInputLifecycle(t *testing.T, context input.Context, closer func(input.C for { select { case event := <-events: - if state := event.GetState(); state.Finished { + if state, ok := event.Private.(file.State); ok && state.Finished { assert.Equal(t, len(logs), int(state.Offset), "file has not been fully read") go func() { closer(context, input.(*Input)) @@ -192,9 +192,9 @@ func TestNewInputDone(t *testing.T) { "paths": path.Join(os.TempDir(), "logs", "*.log"), }) - connector := func(*common.Config, *common.MapStrPointer) (channel.Outleter, error) { + connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) { return TestOutlet{}, nil - } + }) context := input.Context{ Done: make(chan struct{}), @@ -212,9 +212,9 @@ func TestNewInputError(t *testing.T) { config := common.NewConfig() - connector := func(*common.Config, *common.MapStrPointer) (channel.Outleter, error) { + connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) { return TestOutlet{}, nil - } + }) context := input.Context{} @@ -292,17 +292,17 @@ type eventCapturer struct { closed bool c chan struct{} closeOnce sync.Once - events chan *util.Data + events chan beat.Event } -func NewEventCapturer(events chan *util.Data) channel.Outleter { +func NewEventCapturer(events chan beat.Event) channel.Outleter { return &eventCapturer{ c: make(chan struct{}), events: events, } } -func (o *eventCapturer) OnEvent(event *util.Data) bool { +func (o *eventCapturer) OnEvent(event beat.Event) bool { o.events <- event return true } @@ -322,6 +322,6 @@ func (o *eventCapturer) Done() <-chan struct{} { // TestOutlet is an empty outlet for testing type TestOutlet struct{} -func (o TestOutlet) OnEvent(event *util.Data) bool { return true } +func (o TestOutlet) OnEvent(event beat.Event) bool { return true } func (o TestOutlet) Close() error { return nil } func (o TestOutlet) Done() <-chan struct{} { return nil } diff --git a/filebeat/input/redis/harvester.go b/filebeat/input/redis/harvester.go index 437bc2da75b..73b9111a872 100644 --- a/filebeat/input/redis/harvester.go +++ b/filebeat/input/redis/harvester.go @@ -30,7 +30,6 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/filebeat/harvester" - "github.com/elastic/beats/filebeat/util" ) // Harvester contains all redis harvester data @@ -133,8 +132,7 @@ func (h *Harvester) Run() error { log.args = args[2:] } - data := util.NewData() - subEvent := common.MapStr{ + slowlogEntry := common.MapStr{ "id": log.id, "cmd": log.cmd, "key": log.key, @@ -144,24 +142,21 @@ func (h *Harvester) Run() error { } if log.args != nil { - subEvent["args"] = log.args - + slowlogEntry["args"] = log.args } - data.Event = beat.Event{ + h.forwarder.Send(beat.Event{ Timestamp: time.Unix(log.timestamp, 0).UTC(), Fields: common.MapStr{ "message": strings.Join(args, " "), "redis": common.MapStr{ - "slowlog": subEvent, + "slowlog": slowlogEntry, }, "event": common.MapStr{ "created": time.Now(), }, }, - } - - h.forwarder.Send(data) + }) } return nil } diff --git a/filebeat/input/redis/input.go b/filebeat/input/redis/input.go index 583a8f748b4..a1a6babca47 100644 --- a/filebeat/input/redis/input.go +++ b/filebeat/input/redis/input.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/filebeat/harvester" "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/filebeat/input/file" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/logp" @@ -48,7 +49,7 @@ type Input struct { } // NewInput creates a new redis input -func NewInput(cfg *common.Config, outletFactory channel.Connector, context input.Context) (input.Input, error) { +func NewInput(cfg *common.Config, connector channel.Connector, context input.Context) (input.Input, error) { cfgwarn.Experimental("Redis slowlog input is enabled.") config := defaultConfig @@ -58,14 +59,18 @@ func NewInput(cfg *common.Config, outletFactory channel.Connector, context input return nil, err } - outlet, err := outletFactory(cfg, context.DynamicFields) + out, err := connector.ConnectWith(cfg, beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + DynamicFields: context.DynamicFields, + }, + }) if err != nil { return nil, err } p := &Input{ started: false, - outlet: outlet, + outlet: out, config: config, cfg: cfg, registry: harvester.NewRegistry(), diff --git a/filebeat/input/runnerfactory.go b/filebeat/input/runnerfactory.go index fb21ae330c1..c4bb0818031 100644 --- a/filebeat/input/runnerfactory.go +++ b/filebeat/input/runnerfactory.go @@ -47,7 +47,7 @@ func (r *RunnerFactory) Create( c *common.Config, meta *common.MapStrPointer, ) (cfgfile.Runner, error) { - connector := channel.ConnectTo(pipeline, r.outlet) + connector := r.outlet(pipeline) p, err := New(c, connector, r.beatDone, r.registrar.GetStates(), meta) if err != nil { // In case of error with loading state, input is still returned diff --git a/filebeat/input/stdin/input.go b/filebeat/input/stdin/input.go index 00fb59d8735..f59c0da521e 100644 --- a/filebeat/input/stdin/input.go +++ b/filebeat/input/stdin/input.go @@ -25,6 +25,7 @@ import ( "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/filebeat/input/log" + "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" ) @@ -48,7 +49,11 @@ type Input struct { // NewInput creates a new stdin input // This input contains one harvester which is reading from stdin func NewInput(cfg *common.Config, outlet channel.Connector, context input.Context) (input.Input, error) { - out, err := outlet(cfg, context.DynamicFields) + out, err := outlet.ConnectWith(cfg, beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + DynamicFields: context.DynamicFields, + }, + }) if err != nil { return nil, err } diff --git a/filebeat/input/syslog/input.go b/filebeat/input/syslog/input.go index e7bcdb3cc0c..10365c6d3f9 100644 --- a/filebeat/input/syslog/input.go +++ b/filebeat/input/syslog/input.go @@ -28,7 +28,6 @@ import ( "github.com/elastic/beats/filebeat/harvester" "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/filebeat/inputsource" - "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" @@ -113,7 +112,11 @@ func NewInput( log := logp.NewLogger("syslog") - out, err := outlet(cfg, context.DynamicFields) + out, err := outlet.ConnectWith(cfg, beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + DynamicFields: context.DynamicFields, + }, + }) if err != nil { return nil, err } @@ -127,28 +130,22 @@ func NewInput( cb := func(data []byte, metadata inputsource.NetworkMetadata) { ev := newEvent() Parse(data, ev) - var d *util.Data if !ev.IsValid() { log.Errorw("can't parse event as syslog rfc3164", "message", string(data)) // On error revert to the raw bytes content, we need a better way to communicate this kind of // error upstream this should be a global effort. - d = &util.Data{ - Event: beat.Event{ - Timestamp: time.Now(), - Meta: common.MapStr{ - "truncated": metadata.Truncated, - }, - Fields: common.MapStr{ - "message": string(data), - }, + forwarder.Send(beat.Event{ + Timestamp: time.Now(), + Meta: common.MapStr{ + "truncated": metadata.Truncated, + }, + Fields: common.MapStr{ + "message": string(data), }, - } + }) } else { - event := createEvent(ev, metadata, time.Local, log) - d = &util.Data{Event: *event} + forwarder.Send(createEvent(ev, metadata, time.Local, log)) } - - forwarder.Send(d) } server, err := factory(cb, config.Protocol) @@ -201,7 +198,7 @@ func (p *Input) Wait() { p.Stop() } -func createEvent(ev *event, metadata inputsource.NetworkMetadata, timezone *time.Location, log *logp.Logger) *beat.Event { +func createEvent(ev *event, metadata inputsource.NetworkMetadata, timezone *time.Location, log *logp.Logger) beat.Event { f := common.MapStr{ "message": strings.TrimRight(ev.Message(), "\n"), "log": common.MapStr{ @@ -257,7 +254,7 @@ func createEvent(ev *event, metadata inputsource.NetworkMetadata, timezone *time f["event.sequence"] = ev.Sequence() } - return &beat.Event{ + return beat.Event{ Timestamp: ev.Timestamp(timezone), Meta: common.MapStr{ "truncated": metadata.Truncated, diff --git a/filebeat/input/tcp/input.go b/filebeat/input/tcp/input.go index 3ebf90657ae..77a7162b039 100644 --- a/filebeat/input/tcp/input.go +++ b/filebeat/input/tcp/input.go @@ -27,7 +27,6 @@ import ( "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/filebeat/inputsource" "github.com/elastic/beats/filebeat/inputsource/tcp" - "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -53,11 +52,15 @@ type Input struct { // NewInput creates a new TCP input func NewInput( cfg *common.Config, - outlet channel.Connector, + connector channel.Connector, context input.Context, ) (input.Input, error) { - out, err := outlet(cfg, context.DynamicFields) + out, err := connector.ConnectWith(cfg, beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + DynamicFields: context.DynamicFields, + }, + }) if err != nil { return nil, err } @@ -127,9 +130,8 @@ func (p *Input) Wait() { p.Stop() } -func createEvent(raw []byte, metadata inputsource.NetworkMetadata) *util.Data { - data := util.NewData() - data.Event = beat.Event{ +func createEvent(raw []byte, metadata inputsource.NetworkMetadata) beat.Event { + return beat.Event{ Timestamp: time.Now(), Fields: common.MapStr{ "message": string(raw), @@ -140,5 +142,4 @@ func createEvent(raw []byte, metadata inputsource.NetworkMetadata) *util.Data { }, }, } - return data } diff --git a/filebeat/input/tcp/input_test.go b/filebeat/input/tcp/input_test.go index 17463ebcea5..a7ca64ba59d 100644 --- a/filebeat/input/tcp/input_test.go +++ b/filebeat/input/tcp/input_test.go @@ -35,8 +35,7 @@ func TestCreateEvent(t *testing.T) { message := []byte(hello) mt := inputsource.NetworkMetadata{RemoteAddr: addr} - data := createEvent(message, mt) - event := data.GetEvent() + event := createEvent(message, mt) m, err := event.GetValue("message") assert.NoError(t, err) diff --git a/filebeat/input/udp/input.go b/filebeat/input/udp/input.go index 5b4432f172a..30bb818d2a5 100644 --- a/filebeat/input/udp/input.go +++ b/filebeat/input/udp/input.go @@ -26,7 +26,6 @@ import ( "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/filebeat/inputsource" "github.com/elastic/beats/filebeat/inputsource/udp" - "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -54,7 +53,11 @@ func NewInput( context input.Context, ) (input.Input, error) { - out, err := outlet(cfg, context.DynamicFields) + out, err := outlet.ConnectWith(cfg, beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + DynamicFields: context.DynamicFields, + }, + }) if err != nil { return nil, err } @@ -66,8 +69,7 @@ func NewInput( forwarder := harvester.NewForwarder(out) callback := func(data []byte, metadata inputsource.NetworkMetadata) { - e := util.NewData() - e.Event = beat.Event{ + forwarder.Send(beat.Event{ Timestamp: time.Now(), Meta: common.MapStr{ "truncated": metadata.Truncated, @@ -80,8 +82,7 @@ func NewInput( }, }, }, - } - forwarder.Send(e) + }) } udp := udp.New(&config.Config, callback) diff --git a/filebeat/util/data.go b/filebeat/util/data.go deleted file mode 100644 index 132209e73f7..00000000000 --- a/filebeat/util/data.go +++ /dev/null @@ -1,66 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package util - -import ( - "github.com/elastic/beats/libbeat/beat" - "github.com/elastic/beats/libbeat/common" - - "github.com/elastic/beats/filebeat/input/file" -) - -type Data struct { - Event beat.Event - state file.State -} - -func NewData() *Data { - return &Data{} -} - -// SetState sets the state -func (d *Data) SetState(state file.State) { - d.state = state -} - -// GetState returns the current state -func (d *Data) GetState() file.State { - return d.state -} - -// HasState returns true if the data object contains state data -func (d *Data) HasState() bool { - return !d.state.IsEmpty() -} - -// GetEvent returns the event in the data object -// In case meta data contains module and fileset data, the event is enriched with it -func (d *Data) GetEvent() beat.Event { - return d.Event -} - -// GetMetadata creates a common.MapStr containing the metadata to -// be associated with the event. -func (d *Data) GetMetadata() common.MapStr { - return d.Event.Meta -} - -// HasEvent returns true if the data object contains event data -func (d *Data) HasEvent() bool { - return d.Event.Fields != nil -} diff --git a/filebeat/util/data_test.go b/filebeat/util/data_test.go deleted file mode 100644 index 10b5b67a197..00000000000 --- a/filebeat/util/data_test.go +++ /dev/null @@ -1,51 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package util - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/elastic/beats/filebeat/input/file" - "github.com/elastic/beats/libbeat/common" -) - -func TestNewData(t *testing.T) { - data := NewData() - - assert.False(t, data.HasEvent()) - assert.False(t, data.HasState()) - - data.SetState(file.State{Source: "-"}) - - assert.False(t, data.HasEvent()) - assert.True(t, data.HasState()) - - data.Event.Fields = common.MapStr{} - - assert.True(t, data.HasEvent()) - assert.True(t, data.HasState()) -} - -func TestGetEvent(t *testing.T) { - data := NewData() - data.Event.Fields = common.MapStr{"hello": "world"} - out := common.MapStr{"hello": "world"} - assert.Equal(t, out, data.GetEvent().Fields) -} diff --git a/x-pack/filebeat/input/googlepubsub/input.go b/x-pack/filebeat/input/googlepubsub/input.go index 17eac3d9f81..831a5628f79 100644 --- a/x-pack/filebeat/input/googlepubsub/input.go +++ b/x-pack/filebeat/input/googlepubsub/input.go @@ -19,7 +19,6 @@ import ( "github.com/elastic/beats/filebeat/channel" "github.com/elastic/beats/filebeat/input" - "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/atomic" @@ -57,7 +56,7 @@ type pubsubInput struct { // a topic subscription. func NewInput( cfg *common.Config, - outlet channel.Connector, + connector channel.Connector, inputContext input.Context, ) (input.Input, error) { // Extract and validate the input's configuration. @@ -67,7 +66,11 @@ func NewInput( } // Build outlet for events. - out, err := outlet(cfg, inputContext.DynamicFields) + out, err := connector.ConnectWith(cfg, beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + DynamicFields: inputContext.DynamicFields, + }, + }) if err != nil { return nil, err } @@ -191,27 +194,27 @@ func makeTopicID(project, topic string) string { return prefix[:10] } -func makeEvent(topicID string, msg *pubsub.Message) *util.Data { +func makeEvent(topicID string, msg *pubsub.Message) beat.Event { id := topicID + "-" + msg.ID - event := beat.Event{ - Timestamp: msg.PublishTime.UTC(), - Meta: common.MapStr{ - "id": id, - }, - Fields: common.MapStr{ - "event": common.MapStr{ - "id": id, - "created": time.Now().UTC(), - }, - "message": string(msg.Data), + fields := common.MapStr{ + "event": common.MapStr{ + "id": id, + "created": time.Now().UTC(), }, + "message": string(msg.Data), } if len(msg.Attributes) > 0 { - event.Fields.Put("labels", msg.Attributes) + fields.Put("labels", msg.Attributes) } - return &util.Data{Event: event} + return beat.Event{ + Timestamp: msg.PublishTime.UTC(), + Meta: common.MapStr{ + "id": id, + }, + Fields: fields, + } } func (in *pubsubInput) getOrCreateSubscription(ctx context.Context, client *pubsub.Client) (*pubsub.Subscription, error) { diff --git a/x-pack/filebeat/input/googlepubsub/pubsub_test.go b/x-pack/filebeat/input/googlepubsub/pubsub_test.go index 20d07ee4de1..2f09c1ff8ab 100644 --- a/x-pack/filebeat/input/googlepubsub/pubsub_test.go +++ b/x-pack/filebeat/input/googlepubsub/pubsub_test.go @@ -21,7 +21,6 @@ import ( "github.com/elastic/beats/filebeat/channel" "github.com/elastic/beats/filebeat/input" - "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -230,7 +229,11 @@ func runTest(t *testing.T, cfg *common.Config, run func(client *pubsub.Client, i eventOutlet := newStubOutlet() defer eventOutlet.Close() - in, err := NewInput(cfg, eventOutlet.outlet, inputCtx) + connector := channel.ConnectorFunc(func(_ *common.Config, _ beat.ClientConfig) (channel.Outleter, error) { + return eventOutlet, nil + }) + + in, err := NewInput(cfg, connector, inputCtx) if err != nil { t.Fatal(err) } @@ -259,10 +262,6 @@ func newStubOutlet() *stubOutleter { return o } -func (o *stubOutleter) outlet(_ *common.Config, _ *common.MapStrPointer) (channel.Outleter, error) { - return o, nil -} - func (o *stubOutleter) waitForEvents(numEvents int) ([]beat.Event, bool) { o.Lock() defer o.Unlock() @@ -290,10 +289,10 @@ func (o *stubOutleter) Close() error { func (o *stubOutleter) Done() <-chan struct{} { return nil } -func (o *stubOutleter) OnEvent(data *util.Data) bool { +func (o *stubOutleter) OnEvent(event beat.Event) bool { o.Lock() defer o.Unlock() - o.Events = append(o.Events, data.Event) + o.Events = append(o.Events, event) o.cond.Broadcast() return !o.done } diff --git a/x-pack/filebeat/input/netflow/input.go b/x-pack/filebeat/input/netflow/input.go index f9483ea1bd1..47fbda63ee1 100644 --- a/x-pack/filebeat/input/netflow/input.go +++ b/x-pack/filebeat/input/netflow/input.go @@ -17,7 +17,6 @@ import ( "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/filebeat/inputsource" "github.com/elastic/beats/filebeat/inputsource/udp" - "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/atomic" @@ -67,13 +66,18 @@ func init() { // NewInput creates a new Netflow input func NewInput( cfg *common.Config, - outlet channel.Connector, + connector channel.Connector, context input.Context, ) (input.Input, error) { initLogger.Do(func() { logger = logp.NewLogger(inputName) }) - out, err := outlet(cfg, context.DynamicFields) + + out, err := connector.ConnectWith(cfg, beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + DynamicFields: context.DynamicFields, + }, + }) if err != nil { return nil, err } @@ -113,10 +117,8 @@ func NewInput( } func (p *netflowInput) Publish(events []beat.Event) error { - for _, ev := range events { - e := util.NewData() - e.Event = ev - p.forwarder.Send(e) + for _, evt := range events { + p.forwarder.Send(evt) } return nil }