Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor filebeat connector #12997

Merged
merged 8 commits into from
Jul 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions filebeat/channel/connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package channel

import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
)

// ConnectorFunc is an adapter for using ordinary functions as Connector.
type ConnectorFunc func(*common.Config, beat.ClientConfig) (Outleter, error)

type pipelineConnector struct {
parent *OutletFactory
pipeline beat.Pipeline
}

// Connect passes the cfg and the zero value of beat.ClientConfig to the underlying function.
func (fn ConnectorFunc) Connect(cfg *common.Config) (Outleter, error) {
urso marked this conversation as resolved.
Show resolved Hide resolved
urso marked this conversation as resolved.
Show resolved Hide resolved
urso marked this conversation as resolved.
Show resolved Hide resolved
return fn(cfg, beat.ClientConfig{})
}

// ConnectWith passes the configuration and the pipeline connection setting to the underlying function.
func (fn ConnectorFunc) ConnectWith(cfg *common.Config, clientCfg beat.ClientConfig) (Outleter, error) {
urso marked this conversation as resolved.
Show resolved Hide resolved
urso marked this conversation as resolved.
Show resolved Hide resolved
urso marked this conversation as resolved.
Show resolved Hide resolved
return fn(cfg, clientCfg)
}

func (c *pipelineConnector) Connect(cfg *common.Config) (Outleter, error) {
return c.ConnectWith(cfg, beat.ClientConfig{})
}

func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.ClientConfig) (Outleter, error) {
config := inputOutletConfig{}
if err := cfg.Unpack(&config); err != nil {
return nil, err
}

var err error
var userProcessors beat.ProcessorList

userProcessors, err = processors.New(config.Processors)
if err != nil {
return nil, err
}

if lst := clientCfg.Processing.Processor; lst != nil {
if len(userProcessors.All()) == 0 {
userProcessors = lst
} else if orig := lst.All(); len(orig) > 0 {
newLst := processors.NewList(nil)
newLst.List = append(newLst.List, lst, userProcessors)
userProcessors = newLst
}
}

setOptional := func(to common.MapStr, key string, value string) {
if value != "" {
to.Put(key, value)
}
}

meta := clientCfg.Processing.Meta.Clone()
fields := clientCfg.Processing.Fields.Clone()

serviceType := config.ServiceType
if serviceType == "" {
serviceType = config.Module
}

setOptional(meta, "pipeline", config.Pipeline)
setOptional(fields, "fileset.name", config.Fileset)
setOptional(fields, "service.type", serviceType)
setOptional(fields, "input.type", config.Type)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: What if we centralize this operation in a single method?

meta, fields, err := withOptional(clients.Processing) 

Note that the err is not mandatory but could be used if values should not be changed or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's worth it to have a central method that change the default values of the meta or fields in an event. I am just trying to put myself in the following mind: Where would I look for default added fields or meta?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered this, but not only for fields and meta. This function basically merges some default ClientConfig with the one provided by the input. So I was wondering how generic this merge should be. But yet the order might be somewhat filebeat specific, so I decided to postpone it if there is need.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am OK to postpone it, it might be premature to extract it.

if config.Module != "" {
event := common.MapStr{"module": config.Module}
if config.Fileset != "" {
event["dataset"] = config.Module + "." + config.Fileset
}
fields["event"] = event
}

mode := clientCfg.PublishMode
if mode == beat.DefaultGuarantees {
mode = beat.GuaranteedSend
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth to log it in debug? it would be explicit but might scare people. :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't log it here. If we log it, then we should also log the input type and maybe some config id. This information is lost here. So far GuaranteedSend is the default for everyone and no input allows users to change this.
If an input supports multiple modes in the future, then it should log the mode being used.


// connect with updated configuration
clientCfg.PublishMode = mode
clientCfg.Processing.EventMetadata = config.EventMetadata
clientCfg.Processing.Meta = meta
clientCfg.Processing.Fields = fields
clientCfg.Processing.Processor = userProcessors
client, err := c.pipeline.ConnectWith(clientCfg)
if err != nil {
return nil, err
}

outlet := newOutlet(client, c.parent.wgEvents)
if c.parent.done != nil {
return CloseOnSignal(outlet, c.parent.done), nil
}
return outlet, nil
}
77 changes: 7 additions & 70 deletions filebeat/channel/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,75 +82,12 @@ func NewOutletFactory(
// Inputs and all harvesters use the same pipeline client instance.
// This guarantees ordering between events as required by the registrar for
// file.State updates
func (f *OutletFactory) Create(p beat.Pipeline, cfg *common.Config, dynFields *common.MapStrPointer) (Outleter, error) {
config := inputOutletConfig{}
if err := cfg.Unpack(&config); err != nil {
return nil, err
}

processors, err := processors.New(config.Processors)
if err != nil {
return nil, err
}

setMeta := func(to common.MapStr, key, value string) {
if value != "" {
to[key] = value
}
}

meta := common.MapStr{}
setMeta(meta, "pipeline", config.Pipeline)

fields := common.MapStr{}
setMeta(fields, "module", config.Module)
if config.Module != "" && config.Fileset != "" {
setMeta(fields, "dataset", config.Module+"."+config.Fileset)
}
if len(fields) > 0 {
fields = common.MapStr{
"event": fields,
}
}
if config.Fileset != "" {
fields.Put("fileset.name", config.Fileset)
}
if config.ServiceType != "" {
fields.Put("service.type", config.ServiceType)
} else if config.Module != "" {
fields.Put("service.type", config.Module)
}
if config.Type != "" {
fields.Put("input.type", config.Type)
}

client, err := p.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
Processing: beat.ProcessingConfig{
EventMetadata: config.EventMetadata,
DynamicFields: dynFields,
Meta: meta,
Fields: fields,
Processor: processors,
},
Events: f.eventer,
})
if err != nil {
return nil, err
}

outlet := newOutlet(client, f.wgEvents)
if f.done != nil {
return CloseOnSignal(outlet, f.done), nil
}
return outlet, nil
func (f *OutletFactory) Create(p beat.Pipeline) Connector {
return &pipelineConnector{parent: f, pipeline: p}
}

func (*clientEventer) Closing() {}
func (*clientEventer) Closed() {}
func (*clientEventer) Published() {}

func (c *clientEventer) FilteredOut(_ beat.Event) {}
func (c *clientEventer) DroppedOnPublish(_ beat.Event) {
c.wgEvents.Done()
}
func (e *clientEventer) Closing() {}
func (e *clientEventer) Closed() {}
func (e *clientEventer) Published() {}
func (e *clientEventer) FilteredOut(evt beat.Event) {}
func (e *clientEventer) DroppedOnPublish(evt beat.Event) { e.wgEvents.Done() }
11 changes: 7 additions & 4 deletions filebeat/channel/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,23 @@
package channel

import (
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
)

// Factory is used to create a new Outlet instance
type Factory func(beat.Pipeline, *common.Config, *common.MapStrPointer) (Outleter, error)
type Factory func(beat.Pipeline) Connector

// Connector creates an Outlet connecting the event publishing with some internal pipeline.
type Connector func(*common.Config, *common.MapStrPointer) (Outleter, error)
// type Connector func(*common.Config, *common.MapStrPointer) (Outleter, error)
type Connector interface {
Connect(*common.Config) (Outleter, error)
ConnectWith(*common.Config, beat.ClientConfig) (Outleter, error)
}

// Outleter is the outlet for an input
type Outleter interface {
Close() error
Done() <-chan struct{}
OnEvent(data *util.Data) bool
OnEvent(beat.Event) bool
}
8 changes: 1 addition & 7 deletions filebeat/channel/outlet.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package channel

import (
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common/atomic"
)
Expand Down Expand Up @@ -53,16 +52,11 @@ func (o *outlet) Done() <-chan struct{} {
return o.done
}

func (o *outlet) OnEvent(d *util.Data) bool {
func (o *outlet) OnEvent(event beat.Event) bool {
if !o.isOpen.Load() {
return false
}

event := d.GetEvent()
if d.HasState() {
event.Private = d.GetState()
}

if o.wg != nil {
o.wg.Add(1)
}
Expand Down
17 changes: 4 additions & 13 deletions filebeat/channel/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,23 @@ package channel
import (
"sync"

"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
)

type subOutlet struct {
done chan struct{}
ch chan *util.Data
ch chan beat.Event
res chan bool
mutex sync.Mutex
closeOnce sync.Once
}

// ConnectTo creates a new Connector, combining a beat.Pipeline with an outlet Factory.
func ConnectTo(pipeline beat.Pipeline, factory Factory) Connector {
return func(cfg *common.Config, m *common.MapStrPointer) (Outleter, error) {
return factory(pipeline, cfg, m)
}
}

// SubOutlet create a sub-outlet, which can be closed individually, without closing the
// underlying outlet.
func SubOutlet(out Outleter) Outleter {
s := &subOutlet{
done: make(chan struct{}),
ch: make(chan *util.Data),
ch: make(chan beat.Event),
res: make(chan bool, 1),
}

Expand Down Expand Up @@ -75,7 +66,7 @@ func (o *subOutlet) Done() <-chan struct{} {
return o.done
}

func (o *subOutlet) OnEvent(d *util.Data) bool {
func (o *subOutlet) OnEvent(event beat.Event) bool {

o.mutex.Lock()
defer o.mutex.Unlock()
Expand All @@ -89,7 +80,7 @@ func (o *subOutlet) OnEvent(d *util.Data) bool {
case <-o.done:
return false

case o.ch <- d:
case o.ch <- event:
select {
case <-o.done:

Expand Down
4 changes: 2 additions & 2 deletions filebeat/channel/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/tests/resources"
)

Expand All @@ -31,7 +31,7 @@ type dummyOutletter struct {
c chan struct{}
}

func (o *dummyOutletter) OnEvent(event *util.Data) bool {
func (o *dummyOutletter) OnEvent(event beat.Event) bool {
return true
}

Expand Down
2 changes: 1 addition & 1 deletion filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (c *Crawler) startInput(
return nil
}

connector := channel.ConnectTo(pipeline, c.out)
connector := c.out(pipeline)
p, err := input.New(config, connector, c.beatDone, states, nil)
if err != nil {
return fmt.Errorf("Error while initializing input: %s", err)
Expand Down
2 changes: 1 addition & 1 deletion filebeat/fileset/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (f *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrP
}

inputs := make([]*input.Runner, len(pConfigs))
connector := channel.ConnectTo(p, f.outlet)
connector := f.outlet(p)
for i, pConfig := range pConfigs {
inputs[i], err = input.New(pConfig, connector, f.beatDone, f.registrar.GetStates(), meta)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions filebeat/harvester/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ package harvester
import (
"errors"

"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/logp"
)

// Outlet interface is used for forwarding events
type Outlet interface {
OnEvent(data *util.Data) bool
OnEvent(data beat.Event) bool
}

// Forwarder contains shared options between all harvesters needed to forward events
Expand All @@ -46,8 +46,8 @@ func NewForwarder(outlet Outlet) *Forwarder {

// Send updates the input state and sends the event to the spooler
// All state updates done by the input itself are synchronous to make sure no states are overwritten
func (f *Forwarder) Send(data *util.Data) error {
ok := f.Outlet.OnEvent(data)
func (f *Forwarder) Send(event beat.Event) error {
ok := f.Outlet.OnEvent(event)
if !ok {
logp.Info("Input outlet closed")
return errors.New("input outlet closed")
Expand Down
Loading