Skip to content

Commit

Permalink
Refactor harvester to send events directly to the spooler (#4070)
Browse files Browse the repository at this point in the history
* Refactor harvester to send events directly to the spooler

Previously all events were sent through the prospector to update the send and process the data. The state update in the prospector is now done through a mutex and processing happens directly in the harvester. This is a major change in the architecture on how filebeat works. It should simplify the shutting down of harvester and prospectors as less channels are involved in the communication. Some of the configs were moved around between the prospector and harvester package, but in the long run the idea is to merge the two into one.

Further changes:

* Add read/write lock to state handling to make it more efficent
* Did some work on the communication channels. It's still a bit messy and needs cleanup in a follow up PR.
* Processing happens now in the harvester directly. This should be part of the publisher in the future.
* Prospector now only communicates with the spooler to update state, never events.
* Move initial state update logic to harvester to simplify code

This PR should not change any behavior in filebeat.

* add stdin hack

* Introduce HasState for source. This is a hack to circumvent #3376 (comment)
  • Loading branch information
ruflin authored and Steffen Siering committed Apr 27, 2017
1 parent bee77ee commit 24aed8c
Show file tree
Hide file tree
Showing 14 changed files with 168 additions and 143 deletions.
3 changes: 2 additions & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/elasticsearch"

"github.com/elastic/beats/filebeat/channel"
cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/crawler"
"github.com/elastic/beats/filebeat/fileset"
Expand Down Expand Up @@ -148,7 +149,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
return err
}

crawler, err := crawler.New(newSpoolerOutlet(fb.done, spooler, wgEvents), config.Prospectors, fb.done, *once)
crawler, err := crawler.New(channel.NewOutlet(fb.done, spooler.Channel, wgEvents), config.Prospectors, fb.done, *once)
if err != nil {
logp.Err("Could not init crawler: %v", err)
return err
Expand Down
13 changes: 9 additions & 4 deletions filebeat/channel/outlet.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync/atomic"

"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/prospector"
)

// Outlet struct is used to be passed to an object which needs an outlet
Expand All @@ -19,13 +20,13 @@ type Outlet struct {
wg *sync.WaitGroup // Use for counting active events
done <-chan struct{}
signal <-chan struct{}
channel chan *input.Event
channel chan *input.Data
isOpen int32 // atomic indicator
}

func NewOutlet(
done <-chan struct{},
c chan *input.Event,
c chan *input.Data,
wg *sync.WaitGroup,
) *Outlet {
return &Outlet{
Expand All @@ -42,7 +43,7 @@ func (o *Outlet) SetSignal(signal <-chan struct{}) {
o.signal = signal
}

func (o *Outlet) OnEvent(event *input.Event) bool {
func (o *Outlet) OnEvent(event *input.Data) bool {
open := atomic.LoadInt32(&o.isOpen) == 1
if !open {
return false
Expand All @@ -67,7 +68,7 @@ func (o *Outlet) OnEvent(event *input.Event) bool {
// OnEventSignal can be stopped by the signal that is set with SetSignal
// This does not close the outlet. Only OnEvent does close the outlet.
// If OnEventSignal is used, it must be ensured that only one producer is used.
func (o *Outlet) OnEventSignal(event *input.Event) bool {
func (o *Outlet) OnEventSignal(event *input.Data) bool {
open := atomic.LoadInt32(&o.isOpen) == 1
if !open {
return false
Expand All @@ -88,3 +89,7 @@ func (o *Outlet) OnEventSignal(event *input.Event) bool {
return true
}
}

func (o *Outlet) Copy() prospector.Outlet {
return NewOutlet(o.done, o.channel, o.wg)
}
43 changes: 27 additions & 16 deletions filebeat/harvester/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"github.com/elastic/beats/filebeat/harvester/reader"

"github.com/dustin/go-humanize"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/match"
"github.com/elastic/beats/libbeat/processors"
)

var (
Expand All @@ -24,26 +26,35 @@ var (
CloseRenamed: false,
CloseEOF: false,
CloseTimeout: 0,
DocumentType: "log",
CleanInactive: 0,
}
)

type harvesterConfig struct {
BufferSize int `config:"harvester_buffer_size"`
Encoding string `config:"encoding"`
InputType string `config:"input_type"`
Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"`
BackoffFactor int `config:"backoff_factor" validate:"min=1"`
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
CloseInactive time.Duration `config:"close_inactive"`
CloseRemoved bool `config:"close_removed"`
CloseRenamed bool `config:"close_renamed"`
CloseEOF bool `config:"close_eof"`
CloseTimeout time.Duration `config:"close_timeout" validate:"min=0"`
ExcludeLines []match.Matcher `config:"exclude_lines"`
IncludeLines []match.Matcher `config:"include_lines"`
MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"`
Multiline *reader.MultilineConfig `config:"multiline"`
JSON *reader.JSONConfig `config:"json"`
common.EventMetadata `config:",inline"` // Fields and tags to add to events.
BufferSize int `config:"harvester_buffer_size"`
Encoding string `config:"encoding"`
InputType string `config:"input_type"`
Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"`
BackoffFactor int `config:"backoff_factor" validate:"min=1"`
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
CloseInactive time.Duration `config:"close_inactive"`
CloseRemoved bool `config:"close_removed"`
CloseRenamed bool `config:"close_renamed"`
CloseEOF bool `config:"close_eof"`
CloseTimeout time.Duration `config:"close_timeout" validate:"min=0"`
ExcludeLines []match.Matcher `config:"exclude_lines"`
IncludeLines []match.Matcher `config:"include_lines"`
MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"`
Multiline *reader.MultilineConfig `config:"multiline"`
JSON *reader.JSONConfig `config:"json"`
DocumentType string `config:"document_type"`
CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"`
Pipeline string `config:"pipeline"`
Module string `config:"_module_name"` // hidden option to set the module name
Fileset string `config:"_fileset_name"` // hidden option to set the fileset name
Processors processors.PluginConfig `config:"processors"`
}

func (config *harvesterConfig) Validate() error {
Expand Down
62 changes: 59 additions & 3 deletions filebeat/harvester/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ import (

"github.com/satori/go.uuid"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/encoding"
"github.com/elastic/beats/filebeat/harvester/source"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
)

var (
Expand All @@ -35,9 +36,16 @@ var (
ErrClosed = errors.New("reader closed")
)

type Outlet interface {
SetSignal(signal <-chan struct{})
OnEventSignal(event *input.Data) bool
OnEvent(event *input.Data) bool
}

type Harvester struct {
config harvesterConfig
state file.State
states *file.States
prospectorChan chan *input.Event
file source.FileSource /* the file being watched */
fileReader *LogFile
Expand All @@ -46,19 +54,22 @@ type Harvester struct {
done chan struct{}
stopOnce sync.Once
stopWg *sync.WaitGroup
outlet *channel.Outlet
outlet Outlet
ID uuid.UUID
processors *processors.Processors
}

func NewHarvester(
cfg *common.Config,
state file.State,
outlet *channel.Outlet,
states *file.States,
outlet Outlet,
) (*Harvester, error) {

h := &Harvester{
config: defaultConfig,
state: state,
states: states,
done: make(chan struct{}),
stopWg: &sync.WaitGroup{},
outlet: outlet,
Expand All @@ -75,6 +86,18 @@ func NewHarvester(
}
h.encodingFactory = encodingFactory

f, err := processors.New(h.config.Processors)
if err != nil {
return nil, err
}

h.processors = f

// Add ttl if clean_inactive is set
if h.config.CleanInactive > 0 {
h.state.TTL = h.config.CleanInactive
}

// Add outlet signal so harvester can also stop itself
h.outlet.SetSignal(h.done)

Expand All @@ -93,3 +116,36 @@ func (h *Harvester) open() error {
return fmt.Errorf("Invalid input type")
}
}

// updateState updates the prospector state and forwards the event to the spooler
// All state updates done by the prospector itself are synchronous to make sure not states are overwritten
func (h *Harvester) forwardEvent(event *input.Event) error {

// Add additional prospector meta data to the event
event.EventMetadata = h.config.EventMetadata
event.InputType = h.config.InputType
event.DocumentType = h.config.DocumentType
event.JSONConfig = h.config.JSON
event.Pipeline = h.config.Pipeline
event.Module = h.config.Module
event.Fileset = h.config.Fileset

eventHolder := event.GetData()
//run the filters before sending to spooler
if event.Bytes > 0 {
eventHolder.Event = h.processors.Run(eventHolder.Event)
}

if eventHolder.Event == nil {
eventHolder.Metadata.Bytes = 0
}

ok := h.outlet.OnEventSignal(&eventHolder)

if !ok {
logp.Info("Prospector outlet closed")
return errors.New("prospector outlet closed")
}

return nil
}
28 changes: 20 additions & 8 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,20 @@ package harvester
import (
"bytes"
"errors"
"fmt"
"io"
"os"
"time"

"golang.org/x/text/transform"

"fmt"

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester/reader"
"github.com/elastic/beats/filebeat/harvester/source"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"

"golang.org/x/text/transform"
)

var (
Expand Down Expand Up @@ -171,18 +170,31 @@ func (h *Harvester) Stop() {
// sendEvent sends event to the spooler channel
// Return false if event was not sent
func (h *Harvester) sendEvent(event *input.Event) bool {
return h.outlet.OnEventSignal(event)
if h.file.HasState() {
h.states.Update(event.State)
}
err := h.forwardEvent(event)
return err == nil
}

// sendStateUpdate send an empty event with the current state to update the registry
// close_timeout does not apply here to make sure a harvester is closed properly. In
// case the output is blocked the harvester will stay open to make sure no new harvester
// is started. As soon as the output becomes available again, the finished state is written
// and processing can continue.
func (h *Harvester) sendStateUpdate() {
func (h *Harvester) SendStateUpdate() {

if !h.file.HasState() {
return
}

logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset)

event := input.NewEvent(h.state)
h.outlet.OnEvent(event)
h.states.Update(event.State)

data := event.GetData()
h.outlet.OnEvent(&data)
}

// shouldExportLine decides if the line is exported or not based on
Expand Down Expand Up @@ -317,7 +329,7 @@ func (h *Harvester) close() {

// On completion, push offset so we can continue where we left off if we relaunch on the same file
// Only send offset if file object was created successfully
h.sendStateUpdate()
h.SendStateUpdate()
} else {
logp.Warn("Stopping harvester, NOT closing file as file info not available: %s", h.state.Source)
}
Expand Down
1 change: 1 addition & 0 deletions filebeat/harvester/source/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ type File struct {
}

func (File) Continuable() bool { return true }
func (File) HasState() bool { return true }
1 change: 1 addition & 0 deletions filebeat/harvester/source/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ func (p Pipe) Close() error { return p.File.Close() }
func (p Pipe) Name() string { return p.File.Name() }
func (p Pipe) Stat() (os.FileInfo, error) { return p.File.Stat() }
func (p Pipe) Continuable() bool { return false }
func (p Pipe) HasState() bool { return false }
1 change: 1 addition & 0 deletions filebeat/harvester/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ type FileSource interface {
LogSource
Stat() (os.FileInfo, error)
Continuable() bool // can we continue processing after EOF?
HasState() bool // does this source have a state?
}
15 changes: 7 additions & 8 deletions filebeat/input/file/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (s *State) IsEmpty() bool {
// States handles list of FileState
type States struct {
states []State
sync.Mutex
sync.RWMutex
}

func NewStates() *States {
Expand All @@ -66,9 +66,8 @@ func (s *States) Update(newState State) {
}

func (s *States) FindPrevious(newState State) State {
// TODO: This currently blocks writing updates every time state is fetched. Should be improved for performance
s.Lock()
defer s.Unlock()
s.RLock()
defer s.RUnlock()
_, state := s.findPrevious(newState)
return state
}
Expand Down Expand Up @@ -122,16 +121,16 @@ func (s *States) Cleanup() int {

// Count returns number of states
func (s *States) Count() int {
s.Lock()
defer s.Unlock()
s.RLock()
defer s.RUnlock()

return len(s.states)
}

// Returns a copy of the file states
func (s *States) GetStates() []State {
s.Lock()
defer s.Unlock()
s.RLock()
defer s.RUnlock()

newStates := make([]State, len(s.states))
copy(newStates, s.states)
Expand Down
Loading

0 comments on commit 24aed8c

Please sign in to comment.