From 24aed8cc5aed185071a0aeb61ab3b12b2882b508 Mon Sep 17 00:00:00 2001 From: Nicolas Ruflin Date: Thu, 27 Apr 2017 15:22:02 +0200 Subject: [PATCH] Refactor harvester to send events directly to the spooler (#4070) * Refactor harvester to send events directly to the spooler Previously all events were sent through the prospector to update the send and process the data. The state update in the prospector is now done through a mutex and processing happens directly in the harvester. This is a major change in the architecture on how filebeat works. It should simplify the shutting down of harvester and prospectors as less channels are involved in the communication. Some of the configs were moved around between the prospector and harvester package, but in the long run the idea is to merge the two into one. Further changes: * Add read/write lock to state handling to make it more efficent * Did some work on the communication channels. It's still a bit messy and needs cleanup in a follow up PR. * Processing happens now in the harvester directly. This should be part of the publisher in the future. * Prospector now only communicates with the spooler to update state, never events. * Move initial state update logic to harvester to simplify code This PR should not change any behavior in filebeat. * add stdin hack * Introduce HasState for source. This is a hack to circumvent https://github.com/elastic/beats/pull/3376#issuecomment-297674262 --- filebeat/beater/filebeat.go | 3 +- filebeat/channel/outlet.go | 13 ++- filebeat/harvester/config.go | 43 +++++---- filebeat/harvester/harvester.go | 62 ++++++++++++- filebeat/harvester/log.go | 28 ++++-- filebeat/harvester/source/file.go | 1 + filebeat/harvester/source/pipe.go | 1 + filebeat/harvester/source/source.go | 1 + filebeat/input/file/state.go | 15 ++-- filebeat/prospector/config.go | 35 +++----- filebeat/prospector/prospector.go | 89 ++++--------------- filebeat/prospector/prospector_log.go | 9 +- .../prospector/prospector_log_other_test.go | 5 +- filebeat/prospector/registry.go | 6 ++ 14 files changed, 168 insertions(+), 143 deletions(-) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 48d422c2dc7..9f82276f55c 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -11,6 +11,7 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs/elasticsearch" + "github.com/elastic/beats/filebeat/channel" cfg "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/crawler" "github.com/elastic/beats/filebeat/fileset" @@ -148,7 +149,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { return err } - crawler, err := crawler.New(newSpoolerOutlet(fb.done, spooler, wgEvents), config.Prospectors, fb.done, *once) + crawler, err := crawler.New(channel.NewOutlet(fb.done, spooler.Channel, wgEvents), config.Prospectors, fb.done, *once) if err != nil { logp.Err("Could not init crawler: %v", err) return err diff --git a/filebeat/channel/outlet.go b/filebeat/channel/outlet.go index 4695b302899..aee6524671f 100644 --- a/filebeat/channel/outlet.go +++ b/filebeat/channel/outlet.go @@ -5,6 +5,7 @@ import ( "sync/atomic" "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/filebeat/prospector" ) // Outlet struct is used to be passed to an object which needs an outlet @@ -19,13 +20,13 @@ type Outlet struct { wg *sync.WaitGroup // Use for counting active events done <-chan struct{} signal <-chan struct{} - channel chan *input.Event + channel chan *input.Data isOpen int32 // atomic indicator } func NewOutlet( done <-chan struct{}, - c chan *input.Event, + c chan *input.Data, wg *sync.WaitGroup, ) *Outlet { return &Outlet{ @@ -42,7 +43,7 @@ func (o *Outlet) SetSignal(signal <-chan struct{}) { o.signal = signal } -func (o *Outlet) OnEvent(event *input.Event) bool { +func (o *Outlet) OnEvent(event *input.Data) bool { open := atomic.LoadInt32(&o.isOpen) == 1 if !open { return false @@ -67,7 +68,7 @@ func (o *Outlet) OnEvent(event *input.Event) bool { // OnEventSignal can be stopped by the signal that is set with SetSignal // This does not close the outlet. Only OnEvent does close the outlet. // If OnEventSignal is used, it must be ensured that only one producer is used. -func (o *Outlet) OnEventSignal(event *input.Event) bool { +func (o *Outlet) OnEventSignal(event *input.Data) bool { open := atomic.LoadInt32(&o.isOpen) == 1 if !open { return false @@ -88,3 +89,7 @@ func (o *Outlet) OnEventSignal(event *input.Event) bool { return true } } + +func (o *Outlet) Copy() prospector.Outlet { + return NewOutlet(o.done, o.channel, o.wg) +} diff --git a/filebeat/harvester/config.go b/filebeat/harvester/config.go index 60b4b5de2f7..05b653f403a 100644 --- a/filebeat/harvester/config.go +++ b/filebeat/harvester/config.go @@ -8,7 +8,9 @@ import ( "github.com/elastic/beats/filebeat/harvester/reader" "github.com/dustin/go-humanize" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/match" + "github.com/elastic/beats/libbeat/processors" ) var ( @@ -24,26 +26,35 @@ var ( CloseRenamed: false, CloseEOF: false, CloseTimeout: 0, + DocumentType: "log", + CleanInactive: 0, } ) type harvesterConfig struct { - BufferSize int `config:"harvester_buffer_size"` - Encoding string `config:"encoding"` - InputType string `config:"input_type"` - Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"` - BackoffFactor int `config:"backoff_factor" validate:"min=1"` - MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` - CloseInactive time.Duration `config:"close_inactive"` - CloseRemoved bool `config:"close_removed"` - CloseRenamed bool `config:"close_renamed"` - CloseEOF bool `config:"close_eof"` - CloseTimeout time.Duration `config:"close_timeout" validate:"min=0"` - ExcludeLines []match.Matcher `config:"exclude_lines"` - IncludeLines []match.Matcher `config:"include_lines"` - MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"` - Multiline *reader.MultilineConfig `config:"multiline"` - JSON *reader.JSONConfig `config:"json"` + common.EventMetadata `config:",inline"` // Fields and tags to add to events. + BufferSize int `config:"harvester_buffer_size"` + Encoding string `config:"encoding"` + InputType string `config:"input_type"` + Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"` + BackoffFactor int `config:"backoff_factor" validate:"min=1"` + MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` + CloseInactive time.Duration `config:"close_inactive"` + CloseRemoved bool `config:"close_removed"` + CloseRenamed bool `config:"close_renamed"` + CloseEOF bool `config:"close_eof"` + CloseTimeout time.Duration `config:"close_timeout" validate:"min=0"` + ExcludeLines []match.Matcher `config:"exclude_lines"` + IncludeLines []match.Matcher `config:"include_lines"` + MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"` + Multiline *reader.MultilineConfig `config:"multiline"` + JSON *reader.JSONConfig `config:"json"` + DocumentType string `config:"document_type"` + CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"` + Pipeline string `config:"pipeline"` + Module string `config:"_module_name"` // hidden option to set the module name + Fileset string `config:"_fileset_name"` // hidden option to set the fileset name + Processors processors.PluginConfig `config:"processors"` } func (config *harvesterConfig) Validate() error { diff --git a/filebeat/harvester/harvester.go b/filebeat/harvester/harvester.go index 731eb17ace0..fcc64ec1538 100644 --- a/filebeat/harvester/harvester.go +++ b/filebeat/harvester/harvester.go @@ -18,13 +18,14 @@ import ( "github.com/satori/go.uuid" - "github.com/elastic/beats/filebeat/channel" "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester/encoding" "github.com/elastic/beats/filebeat/harvester/source" "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" ) var ( @@ -35,9 +36,16 @@ var ( ErrClosed = errors.New("reader closed") ) +type Outlet interface { + SetSignal(signal <-chan struct{}) + OnEventSignal(event *input.Data) bool + OnEvent(event *input.Data) bool +} + type Harvester struct { config harvesterConfig state file.State + states *file.States prospectorChan chan *input.Event file source.FileSource /* the file being watched */ fileReader *LogFile @@ -46,19 +54,22 @@ type Harvester struct { done chan struct{} stopOnce sync.Once stopWg *sync.WaitGroup - outlet *channel.Outlet + outlet Outlet ID uuid.UUID + processors *processors.Processors } func NewHarvester( cfg *common.Config, state file.State, - outlet *channel.Outlet, + states *file.States, + outlet Outlet, ) (*Harvester, error) { h := &Harvester{ config: defaultConfig, state: state, + states: states, done: make(chan struct{}), stopWg: &sync.WaitGroup{}, outlet: outlet, @@ -75,6 +86,18 @@ func NewHarvester( } h.encodingFactory = encodingFactory + f, err := processors.New(h.config.Processors) + if err != nil { + return nil, err + } + + h.processors = f + + // Add ttl if clean_inactive is set + if h.config.CleanInactive > 0 { + h.state.TTL = h.config.CleanInactive + } + // Add outlet signal so harvester can also stop itself h.outlet.SetSignal(h.done) @@ -93,3 +116,36 @@ func (h *Harvester) open() error { return fmt.Errorf("Invalid input type") } } + +// updateState updates the prospector state and forwards the event to the spooler +// All state updates done by the prospector itself are synchronous to make sure not states are overwritten +func (h *Harvester) forwardEvent(event *input.Event) error { + + // Add additional prospector meta data to the event + event.EventMetadata = h.config.EventMetadata + event.InputType = h.config.InputType + event.DocumentType = h.config.DocumentType + event.JSONConfig = h.config.JSON + event.Pipeline = h.config.Pipeline + event.Module = h.config.Module + event.Fileset = h.config.Fileset + + eventHolder := event.GetData() + //run the filters before sending to spooler + if event.Bytes > 0 { + eventHolder.Event = h.processors.Run(eventHolder.Event) + } + + if eventHolder.Event == nil { + eventHolder.Metadata.Bytes = 0 + } + + ok := h.outlet.OnEventSignal(&eventHolder) + + if !ok { + logp.Info("Prospector outlet closed") + return errors.New("prospector outlet closed") + } + + return nil +} diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index f9e92584276..d4bfe145498 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -3,14 +3,11 @@ package harvester import ( "bytes" "errors" + "fmt" "io" "os" "time" - "golang.org/x/text/transform" - - "fmt" - "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester/reader" "github.com/elastic/beats/filebeat/harvester/source" @@ -18,6 +15,8 @@ import ( "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" + + "golang.org/x/text/transform" ) var ( @@ -171,7 +170,11 @@ func (h *Harvester) Stop() { // sendEvent sends event to the spooler channel // Return false if event was not sent func (h *Harvester) sendEvent(event *input.Event) bool { - return h.outlet.OnEventSignal(event) + if h.file.HasState() { + h.states.Update(event.State) + } + err := h.forwardEvent(event) + return err == nil } // sendStateUpdate send an empty event with the current state to update the registry @@ -179,10 +182,19 @@ func (h *Harvester) sendEvent(event *input.Event) bool { // case the output is blocked the harvester will stay open to make sure no new harvester // is started. As soon as the output becomes available again, the finished state is written // and processing can continue. -func (h *Harvester) sendStateUpdate() { +func (h *Harvester) SendStateUpdate() { + + if !h.file.HasState() { + return + } + logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset) + event := input.NewEvent(h.state) - h.outlet.OnEvent(event) + h.states.Update(event.State) + + data := event.GetData() + h.outlet.OnEvent(&data) } // shouldExportLine decides if the line is exported or not based on @@ -317,7 +329,7 @@ func (h *Harvester) close() { // On completion, push offset so we can continue where we left off if we relaunch on the same file // Only send offset if file object was created successfully - h.sendStateUpdate() + h.SendStateUpdate() } else { logp.Warn("Stopping harvester, NOT closing file as file info not available: %s", h.state.Source) } diff --git a/filebeat/harvester/source/file.go b/filebeat/harvester/source/file.go index d55fe2cfb72..752324fc7d0 100644 --- a/filebeat/harvester/source/file.go +++ b/filebeat/harvester/source/file.go @@ -7,3 +7,4 @@ type File struct { } func (File) Continuable() bool { return true } +func (File) HasState() bool { return true } diff --git a/filebeat/harvester/source/pipe.go b/filebeat/harvester/source/pipe.go index 3b81b5a54ea..413233e081c 100644 --- a/filebeat/harvester/source/pipe.go +++ b/filebeat/harvester/source/pipe.go @@ -13,3 +13,4 @@ func (p Pipe) Close() error { return p.File.Close() } func (p Pipe) Name() string { return p.File.Name() } func (p Pipe) Stat() (os.FileInfo, error) { return p.File.Stat() } func (p Pipe) Continuable() bool { return false } +func (p Pipe) HasState() bool { return false } diff --git a/filebeat/harvester/source/source.go b/filebeat/harvester/source/source.go index c913908d868..733f113f187 100644 --- a/filebeat/harvester/source/source.go +++ b/filebeat/harvester/source/source.go @@ -14,4 +14,5 @@ type FileSource interface { LogSource Stat() (os.FileInfo, error) Continuable() bool // can we continue processing after EOF? + HasState() bool // does this source have a state? } diff --git a/filebeat/input/file/state.go b/filebeat/input/file/state.go index d227b25175d..7626678eb93 100644 --- a/filebeat/input/file/state.go +++ b/filebeat/input/file/state.go @@ -39,7 +39,7 @@ func (s *State) IsEmpty() bool { // States handles list of FileState type States struct { states []State - sync.Mutex + sync.RWMutex } func NewStates() *States { @@ -66,9 +66,8 @@ func (s *States) Update(newState State) { } func (s *States) FindPrevious(newState State) State { - // TODO: This currently blocks writing updates every time state is fetched. Should be improved for performance - s.Lock() - defer s.Unlock() + s.RLock() + defer s.RUnlock() _, state := s.findPrevious(newState) return state } @@ -122,16 +121,16 @@ func (s *States) Cleanup() int { // Count returns number of states func (s *States) Count() int { - s.Lock() - defer s.Unlock() + s.RLock() + defer s.RUnlock() return len(s.states) } // Returns a copy of the file states func (s *States) GetStates() []State { - s.Lock() - defer s.Unlock() + s.RLock() + defer s.RUnlock() newStates := make([]State, len(s.states)) copy(newStates, s.states) diff --git a/filebeat/prospector/config.go b/filebeat/prospector/config.go index 1cef49f67b4..190a5a56951 100644 --- a/filebeat/prospector/config.go +++ b/filebeat/prospector/config.go @@ -5,16 +5,12 @@ import ( "time" cfg "github.com/elastic/beats/filebeat/config" - "github.com/elastic/beats/filebeat/harvester/reader" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/match" - "github.com/elastic/beats/libbeat/processors" ) var ( defaultConfig = prospectorConfig{ Enabled: true, - DocumentType: "log", IgnoreOlder: 0, ScanFrequency: 10 * time.Second, InputType: cfg.DefaultInputType, @@ -27,25 +23,18 @@ var ( ) type prospectorConfig struct { - common.EventMetadata `config:",inline"` // Fields and tags to add to events. - Enabled bool `config:"enabled"` - DocumentType string `config:"document_type"` - ExcludeFiles []match.Matcher `config:"exclude_files"` - IgnoreOlder time.Duration `config:"ignore_older"` - Paths []string `config:"paths"` - ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"` - InputType string `config:"input_type"` - CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"` - CleanRemoved bool `config:"clean_removed"` - HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"` - Symlinks bool `config:"symlinks"` - TailFiles bool `config:"tail_files"` - JSON *reader.JSONConfig `config:"json"` - Pipeline string `config:"pipeline"` - Module string `config:"_module_name"` // hidden option to set the module name - Fileset string `config:"_fileset_name"` // hidden option to set the fileset name - Processors processors.PluginConfig `config:"processors"` - recursiveGlob bool `config:"recursive_glob.enabled"` + Enabled bool `config:"enabled"` + ExcludeFiles []match.Matcher `config:"exclude_files"` + IgnoreOlder time.Duration `config:"ignore_older"` + Paths []string `config:"paths"` + ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"` + InputType string `config:"input_type"` + CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"` + CleanRemoved bool `config:"clean_removed"` + HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"` + Symlinks bool `config:"symlinks"` + TailFiles bool `config:"tail_files"` + recursiveGlob bool `config:"recursive_glob.enabled"` } func (config *prospectorConfig) Validate() error { diff --git a/filebeat/prospector/prospector.go b/filebeat/prospector/prospector.go index 604c5cdddd1..3a9c27273af 100644 --- a/filebeat/prospector/prospector.go +++ b/filebeat/prospector/prospector.go @@ -8,7 +8,6 @@ import ( "github.com/mitchellh/hashstructure" - "github.com/elastic/beats/filebeat/channel" cfg "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester" "github.com/elastic/beats/filebeat/input" @@ -16,7 +15,6 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" - "github.com/elastic/beats/libbeat/processors" ) var ( @@ -35,13 +33,11 @@ type Prospector struct { runWg *sync.WaitGroup states *file.States wg *sync.WaitGroup - channelWg *sync.WaitGroup // Separate waitgroup for channels as not stopped on completion id uint64 Once bool registry *harvesterRegistry beatDone chan struct{} eventCounter *sync.WaitGroup - processors *processors.Processors } // Prospectorer is the interface common to all prospectors @@ -52,7 +48,10 @@ type Prospectorer interface { // Outlet is the outlet for a prospector type Outlet interface { + SetSignal(signal <-chan struct{}) + OnEventSignal(event *input.Data) bool OnEvent(event *input.Data) bool + Copy() Outlet } // NewProspector instantiates a new prospector @@ -67,7 +66,6 @@ func NewProspector(cfg *common.Config, outlet Outlet, beatDone chan struct{}) (* runDone: make(chan struct{}), runWg: &sync.WaitGroup{}, states: &file.States{}, - channelWg: &sync.WaitGroup{}, Once: false, registry: newHarvesterRegistry(), beatDone: beatDone, @@ -86,13 +84,6 @@ func NewProspector(cfg *common.Config, outlet Outlet, beatDone chan struct{}) (* return nil, err } - f, err := processors.New(prospector.config.Processors) - if err != nil { - return nil, err - } - - prospector.processors = f - logp.Debug("prospector", "File Configs: %v", prospector.config.Paths) return prospector, nil @@ -137,28 +128,6 @@ func (p *Prospector) Start() { p.wg.Add(1) logp.Info("Starting prospector of type: %v; id: %v ", p.config.InputType, p.ID()) - // Open channel to receive events from harvester and forward them to spooler - // Here potential filtering can happen - p.channelWg.Add(1) - go func() { - defer p.channelWg.Done() - for { - select { - case <-p.channelDone: - logp.Info("Prospector channel stopped") - return - case <-p.beatDone: - logp.Info("Prospector channel stopped because beat is stopping.") - return - case event := <-p.harvesterChan: - // No stopping on error, because on error it is expected that beatDone is closed - // in the next run. If not, this will further drain the channel. - p.updateState(event) - p.eventCounter.Done() - } - } - }() - if p.Once { // Makes sure prospectors can complete first scan before stopped defer p.runWg.Wait() @@ -207,31 +176,19 @@ func (p *Prospector) ID() uint64 { // updateState updates the prospector state and forwards the event to the spooler // All state updates done by the prospector itself are synchronous to make sure not states are overwritten -func (p *Prospector) updateState(event *input.Event) error { +func (p *Prospector) updateState(state file.State) error { // Add ttl if cleanOlder is enabled and TTL is not already 0 - if p.config.CleanInactive > 0 && event.State.TTL != 0 { - event.State.TTL = p.config.CleanInactive + if p.config.CleanInactive > 0 && state.TTL != 0 { + state.TTL = p.config.CleanInactive } - // Add additional prospector meta data to the event - event.EventMetadata = p.config.EventMetadata - event.InputType = p.config.InputType - event.DocumentType = p.config.DocumentType - event.JSONConfig = p.config.JSON - event.Pipeline = p.config.Pipeline - event.Module = p.config.Module - event.Fileset = p.config.Fileset - - eventHolder := event.GetData() - //run the filters before sending to spooler - if event.Bytes > 0 { - eventHolder.Event = p.processors.Run(eventHolder.Event) - } + // Update first internal state + p.states.Update(state) - if eventHolder.Event == nil { - eventHolder.Metadata.Bytes = 0 - } + eventHolder := input.NewEvent(state).GetData() + // Set to 0 as these are state updates only + eventHolder.Metadata.Bytes = 0 ok := p.outlet.OnEvent(&eventHolder) @@ -240,7 +197,6 @@ func (p *Prospector) updateState(event *input.Event) error { return errors.New("prospector outlet closed") } - p.states.Update(event.State) return nil } @@ -297,17 +253,17 @@ func (p *Prospector) waitEvents() { close(p.channelDone) case <-p.beatDone: } - // Waits until channel go-routine properly stopped - p.channelWg.Wait() } // createHarvester creates a new harvester instance from the given state func (p *Prospector) createHarvester(state file.State) (*harvester.Harvester, error) { - outlet := channel.NewOutlet(p.beatDone, p.harvesterChan, p.eventCounter) + // Each harvester gets its own copy of the outlet + outlet := p.outlet.Copy() h, err := harvester.NewHarvester( p.cfg, state, + p.states, outlet, ) @@ -323,9 +279,9 @@ func (p *Prospector) startHarvester(state file.State, offset int64) error { return fmt.Errorf("Harvester limit reached") } - state.Offset = offset // Set state to "not" finished to indicate that a harvester is running state.Finished = false + state.Offset = offset // Create harvester with state h, err := p.createHarvester(state) @@ -333,23 +289,8 @@ func (p *Prospector) startHarvester(state file.State, offset int64) error { return err } - // State is directly updated and not through channel to make state update synchronous - err = p.updateState(input.NewEvent(state)) - if err != nil { - return err - } - reader, err := h.Setup() if err != nil { - // Set state to finished True again in case of setup failure to make sure - // file can be picked up again by a future harvester - state.Finished = true - - updateErr := p.updateState(input.NewEvent(state)) - // This should only happen in the case that filebeat is stopped - if updateErr != nil { - logp.Err("Error updating state: %v", updateErr) - } return fmt.Errorf("Error setting up harvester: %s", err) } diff --git a/filebeat/prospector/prospector_log.go b/filebeat/prospector/prospector_log.go index 01ad464263d..b471fb4ea1a 100644 --- a/filebeat/prospector/prospector_log.go +++ b/filebeat/prospector/prospector_log.go @@ -7,7 +7,6 @@ import ( "time" "github.com/elastic/beats/filebeat/harvester" - "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" @@ -60,7 +59,7 @@ func (l *Log) LoadStates(states []file.State) error { } // Update prospector states and send new states to registry - err := l.Prospector.updateState(input.NewEvent(state)) + err := l.Prospector.updateState(state) if err != nil { logp.Err("Problem putting initial state: %+v", err) return err @@ -131,7 +130,7 @@ func (l *Log) removeState(state file.State) { } state.TTL = 0 - err := l.Prospector.updateState(input.NewEvent(state)) + err := l.Prospector.updateState(state) if err != nil { logp.Err("File cleanup state update error: %s", err) } @@ -323,7 +322,7 @@ func (l *Log) harvestExistingFile(newState file.State, oldState file.State) { logp.Debug("prospector", "Updating state for renamed file: %s -> %s, Current offset: %v", oldState.Source, newState.Source, oldState.Offset) // Update state because of file rotation oldState.Source = newState.Source - err := l.Prospector.updateState(input.NewEvent(oldState)) + err := l.Prospector.updateState(oldState) if err != nil { logp.Err("File rotation state update error: %s", err) } @@ -367,7 +366,7 @@ func (l *Log) handleIgnoreOlder(lastState, newState file.State) error { // Write state for ignore_older file as none exists yet newState.Finished = true - err := l.Prospector.updateState(input.NewEvent(newState)) + err := l.Prospector.updateState(newState) if err != nil { return err } diff --git a/filebeat/prospector/prospector_log_other_test.go b/filebeat/prospector/prospector_log_other_test.go index da0afe50a5b..543399c561c 100644 --- a/filebeat/prospector/prospector_log_other_test.go +++ b/filebeat/prospector/prospector_log_other_test.go @@ -155,4 +155,7 @@ func TestInit(t *testing.T) { // TestOutlet is an empty outlet for testing type TestOutlet struct{} -func (o TestOutlet) OnEvent(event *input.Data) bool { return true } +func (o TestOutlet) OnEvent(event *input.Data) bool { return true } +func (o TestOutlet) OnEventSignal(event *input.Data) bool { return true } +func (o TestOutlet) SetSignal(signal <-chan struct{}) {} +func (o TestOutlet) Copy() Outlet { return o } diff --git a/filebeat/prospector/registry.go b/filebeat/prospector/registry.go index 86c8aa1161d..0f600f5d9f8 100644 --- a/filebeat/prospector/registry.go +++ b/filebeat/prospector/registry.go @@ -53,6 +53,12 @@ func (hr *harvesterRegistry) start(h *harvester.Harvester, r reader.Reader) { hr.wg.Add(1) hr.add(h) + + // Update state before staring harvester + // This makes sure the states is set to Finished: false + // This is synchronous state update as part of the scan + h.SendStateUpdate() + go func() { defer func() { hr.remove(h)