Skip to content

Commit

Permalink
Refactor filebeat connector (#12997)
Browse files Browse the repository at this point in the history
The filebeat channel package wraps the publisher pipeline,
providing support for preparing custom user settings. The drawback is,
that inputs can not configure custom fields, processors, and ACK
handlers. The later is required for implementing end-to-end ACK.

This refactoring modifies the channel.Factory and channel.Connector
types, such that input authors are able to make use of all the
capabilities of the publisher pipeline.

Inputs are free to configure the PublishMode. If not set, then filebeat
defaults to guaranteed sends.

The channel.Outleter should become more similar to beat.Client in the
future, and will be eventually removed. A side effect of moving closer
to beat.Client is the removal of the filebeat/util package.
The complex shutdown handling in filebeat still prevents us from
removing channel.Outlet. The complex shutdown handling is still required
for handling the interactions between the logs input and the registrar
on shutdown.
  • Loading branch information
Steffen Siering authored Jul 26, 2019
1 parent d5b8171 commit c8128cb
Show file tree
Hide file tree
Showing 26 changed files with 326 additions and 386 deletions.
119 changes: 119 additions & 0 deletions filebeat/channel/connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package channel

import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
)

// ConnectorFunc is an adapter for using ordinary functions as Connector.
type ConnectorFunc func(*common.Config, beat.ClientConfig) (Outleter, error)

type pipelineConnector struct {
parent *OutletFactory
pipeline beat.Pipeline
}

// Connect passes the cfg and the zero value of beat.ClientConfig to the underlying function.
func (fn ConnectorFunc) Connect(cfg *common.Config) (Outleter, error) {
return fn(cfg, beat.ClientConfig{})
}

// ConnectWith passes the configuration and the pipeline connection setting to the underlying function.
func (fn ConnectorFunc) ConnectWith(cfg *common.Config, clientCfg beat.ClientConfig) (Outleter, error) {
return fn(cfg, clientCfg)
}

func (c *pipelineConnector) Connect(cfg *common.Config) (Outleter, error) {
return c.ConnectWith(cfg, beat.ClientConfig{})
}

func (c *pipelineConnector) ConnectWith(cfg *common.Config, clientCfg beat.ClientConfig) (Outleter, error) {
config := inputOutletConfig{}
if err := cfg.Unpack(&config); err != nil {
return nil, err
}

var err error
var userProcessors beat.ProcessorList

userProcessors, err = processors.New(config.Processors)
if err != nil {
return nil, err
}

if lst := clientCfg.Processing.Processor; lst != nil {
if len(userProcessors.All()) == 0 {
userProcessors = lst
} else if orig := lst.All(); len(orig) > 0 {
newLst := processors.NewList(nil)
newLst.List = append(newLst.List, lst, userProcessors)
userProcessors = newLst
}
}

setOptional := func(to common.MapStr, key string, value string) {
if value != "" {
to.Put(key, value)
}
}

meta := clientCfg.Processing.Meta.Clone()
fields := clientCfg.Processing.Fields.Clone()

serviceType := config.ServiceType
if serviceType == "" {
serviceType = config.Module
}

setOptional(meta, "pipeline", config.Pipeline)
setOptional(fields, "fileset.name", config.Fileset)
setOptional(fields, "service.type", serviceType)
setOptional(fields, "input.type", config.Type)
if config.Module != "" {
event := common.MapStr{"module": config.Module}
if config.Fileset != "" {
event["dataset"] = config.Module + "." + config.Fileset
}
fields["event"] = event
}

mode := clientCfg.PublishMode
if mode == beat.DefaultGuarantees {
mode = beat.GuaranteedSend
}

// connect with updated configuration
clientCfg.PublishMode = mode
clientCfg.Processing.EventMetadata = config.EventMetadata
clientCfg.Processing.Meta = meta
clientCfg.Processing.Fields = fields
clientCfg.Processing.Processor = userProcessors
client, err := c.pipeline.ConnectWith(clientCfg)
if err != nil {
return nil, err
}

outlet := newOutlet(client, c.parent.wgEvents)
if c.parent.done != nil {
return CloseOnSignal(outlet, c.parent.done), nil
}
return outlet, nil
}
77 changes: 7 additions & 70 deletions filebeat/channel/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,75 +82,12 @@ func NewOutletFactory(
// Inputs and all harvesters use the same pipeline client instance.
// This guarantees ordering between events as required by the registrar for
// file.State updates
func (f *OutletFactory) Create(p beat.Pipeline, cfg *common.Config, dynFields *common.MapStrPointer) (Outleter, error) {
config := inputOutletConfig{}
if err := cfg.Unpack(&config); err != nil {
return nil, err
}

processors, err := processors.New(config.Processors)
if err != nil {
return nil, err
}

setMeta := func(to common.MapStr, key, value string) {
if value != "" {
to[key] = value
}
}

meta := common.MapStr{}
setMeta(meta, "pipeline", config.Pipeline)

fields := common.MapStr{}
setMeta(fields, "module", config.Module)
if config.Module != "" && config.Fileset != "" {
setMeta(fields, "dataset", config.Module+"."+config.Fileset)
}
if len(fields) > 0 {
fields = common.MapStr{
"event": fields,
}
}
if config.Fileset != "" {
fields.Put("fileset.name", config.Fileset)
}
if config.ServiceType != "" {
fields.Put("service.type", config.ServiceType)
} else if config.Module != "" {
fields.Put("service.type", config.Module)
}
if config.Type != "" {
fields.Put("input.type", config.Type)
}

client, err := p.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
Processing: beat.ProcessingConfig{
EventMetadata: config.EventMetadata,
DynamicFields: dynFields,
Meta: meta,
Fields: fields,
Processor: processors,
},
Events: f.eventer,
})
if err != nil {
return nil, err
}

outlet := newOutlet(client, f.wgEvents)
if f.done != nil {
return CloseOnSignal(outlet, f.done), nil
}
return outlet, nil
func (f *OutletFactory) Create(p beat.Pipeline) Connector {
return &pipelineConnector{parent: f, pipeline: p}
}

func (*clientEventer) Closing() {}
func (*clientEventer) Closed() {}
func (*clientEventer) Published() {}

func (c *clientEventer) FilteredOut(_ beat.Event) {}
func (c *clientEventer) DroppedOnPublish(_ beat.Event) {
c.wgEvents.Done()
}
func (e *clientEventer) Closing() {}
func (e *clientEventer) Closed() {}
func (e *clientEventer) Published() {}
func (e *clientEventer) FilteredOut(evt beat.Event) {}
func (e *clientEventer) DroppedOnPublish(evt beat.Event) { e.wgEvents.Done() }
11 changes: 7 additions & 4 deletions filebeat/channel/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,23 @@
package channel

import (
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
)

// Factory is used to create a new Outlet instance
type Factory func(beat.Pipeline, *common.Config, *common.MapStrPointer) (Outleter, error)
type Factory func(beat.Pipeline) Connector

// Connector creates an Outlet connecting the event publishing with some internal pipeline.
type Connector func(*common.Config, *common.MapStrPointer) (Outleter, error)
// type Connector func(*common.Config, *common.MapStrPointer) (Outleter, error)
type Connector interface {
Connect(*common.Config) (Outleter, error)
ConnectWith(*common.Config, beat.ClientConfig) (Outleter, error)
}

// Outleter is the outlet for an input
type Outleter interface {
Close() error
Done() <-chan struct{}
OnEvent(data *util.Data) bool
OnEvent(beat.Event) bool
}
8 changes: 1 addition & 7 deletions filebeat/channel/outlet.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package channel

import (
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common/atomic"
)
Expand Down Expand Up @@ -53,16 +52,11 @@ func (o *outlet) Done() <-chan struct{} {
return o.done
}

func (o *outlet) OnEvent(d *util.Data) bool {
func (o *outlet) OnEvent(event beat.Event) bool {
if !o.isOpen.Load() {
return false
}

event := d.GetEvent()
if d.HasState() {
event.Private = d.GetState()
}

if o.wg != nil {
o.wg.Add(1)
}
Expand Down
17 changes: 4 additions & 13 deletions filebeat/channel/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,23 @@ package channel
import (
"sync"

"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
)

type subOutlet struct {
done chan struct{}
ch chan *util.Data
ch chan beat.Event
res chan bool
mutex sync.Mutex
closeOnce sync.Once
}

// ConnectTo creates a new Connector, combining a beat.Pipeline with an outlet Factory.
func ConnectTo(pipeline beat.Pipeline, factory Factory) Connector {
return func(cfg *common.Config, m *common.MapStrPointer) (Outleter, error) {
return factory(pipeline, cfg, m)
}
}

// SubOutlet create a sub-outlet, which can be closed individually, without closing the
// underlying outlet.
func SubOutlet(out Outleter) Outleter {
s := &subOutlet{
done: make(chan struct{}),
ch: make(chan *util.Data),
ch: make(chan beat.Event),
res: make(chan bool, 1),
}

Expand Down Expand Up @@ -75,7 +66,7 @@ func (o *subOutlet) Done() <-chan struct{} {
return o.done
}

func (o *subOutlet) OnEvent(d *util.Data) bool {
func (o *subOutlet) OnEvent(event beat.Event) bool {

o.mutex.Lock()
defer o.mutex.Unlock()
Expand All @@ -89,7 +80,7 @@ func (o *subOutlet) OnEvent(d *util.Data) bool {
case <-o.done:
return false

case o.ch <- d:
case o.ch <- event:
select {
case <-o.done:

Expand Down
4 changes: 2 additions & 2 deletions filebeat/channel/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/tests/resources"
)

Expand All @@ -31,7 +31,7 @@ type dummyOutletter struct {
c chan struct{}
}

func (o *dummyOutletter) OnEvent(event *util.Data) bool {
func (o *dummyOutletter) OnEvent(event beat.Event) bool {
return true
}

Expand Down
2 changes: 1 addition & 1 deletion filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (c *Crawler) startInput(
return nil
}

connector := channel.ConnectTo(pipeline, c.out)
connector := c.out(pipeline)
p, err := input.New(config, connector, c.beatDone, states, nil)
if err != nil {
return fmt.Errorf("Error while initializing input: %s", err)
Expand Down
2 changes: 1 addition & 1 deletion filebeat/fileset/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (f *Factory) Create(p beat.Pipeline, c *common.Config, meta *common.MapStrP
}

inputs := make([]*input.Runner, len(pConfigs))
connector := channel.ConnectTo(p, f.outlet)
connector := f.outlet(p)
for i, pConfig := range pConfigs {
inputs[i], err = input.New(pConfig, connector, f.beatDone, f.registrar.GetStates(), meta)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions filebeat/harvester/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ package harvester
import (
"errors"

"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/logp"
)

// Outlet interface is used for forwarding events
type Outlet interface {
OnEvent(data *util.Data) bool
OnEvent(data beat.Event) bool
}

// Forwarder contains shared options between all harvesters needed to forward events
Expand All @@ -46,8 +46,8 @@ func NewForwarder(outlet Outlet) *Forwarder {

// Send updates the input state and sends the event to the spooler
// All state updates done by the input itself are synchronous to make sure no states are overwritten
func (f *Forwarder) Send(data *util.Data) error {
ok := f.Outlet.OnEvent(data)
func (f *Forwarder) Send(event beat.Event) error {
ok := f.Outlet.OnEvent(event)
if !ok {
logp.Info("Input outlet closed")
return errors.New("input outlet closed")
Expand Down
Loading

0 comments on commit c8128cb

Please sign in to comment.