diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 48d422c2dc74..9f82276f55c8 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -11,6 +11,7 @@ import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs/elasticsearch" + "github.com/elastic/beats/filebeat/channel" cfg "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/crawler" "github.com/elastic/beats/filebeat/fileset" @@ -148,7 +149,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { return err } - crawler, err := crawler.New(newSpoolerOutlet(fb.done, spooler, wgEvents), config.Prospectors, fb.done, *once) + crawler, err := crawler.New(channel.NewOutlet(fb.done, spooler.Channel, wgEvents), config.Prospectors, fb.done, *once) if err != nil { logp.Err("Could not init crawler: %v", err) return err diff --git a/filebeat/channel/outlet.go b/filebeat/channel/outlet.go index 4695b302899a..aee6524671f2 100644 --- a/filebeat/channel/outlet.go +++ b/filebeat/channel/outlet.go @@ -5,6 +5,7 @@ import ( "sync/atomic" "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/filebeat/prospector" ) // Outlet struct is used to be passed to an object which needs an outlet @@ -19,13 +20,13 @@ type Outlet struct { wg *sync.WaitGroup // Use for counting active events done <-chan struct{} signal <-chan struct{} - channel chan *input.Event + channel chan *input.Data isOpen int32 // atomic indicator } func NewOutlet( done <-chan struct{}, - c chan *input.Event, + c chan *input.Data, wg *sync.WaitGroup, ) *Outlet { return &Outlet{ @@ -42,7 +43,7 @@ func (o *Outlet) SetSignal(signal <-chan struct{}) { o.signal = signal } -func (o *Outlet) OnEvent(event *input.Event) bool { +func (o *Outlet) OnEvent(event *input.Data) bool { open := atomic.LoadInt32(&o.isOpen) == 1 if !open { return false @@ -67,7 +68,7 @@ func (o *Outlet) OnEvent(event *input.Event) bool { // OnEventSignal can be stopped by the signal that is set with SetSignal // This does not close the outlet. Only OnEvent does close the outlet. // If OnEventSignal is used, it must be ensured that only one producer is used. -func (o *Outlet) OnEventSignal(event *input.Event) bool { +func (o *Outlet) OnEventSignal(event *input.Data) bool { open := atomic.LoadInt32(&o.isOpen) == 1 if !open { return false @@ -88,3 +89,7 @@ func (o *Outlet) OnEventSignal(event *input.Event) bool { return true } } + +func (o *Outlet) Copy() prospector.Outlet { + return NewOutlet(o.done, o.channel, o.wg) +} diff --git a/filebeat/harvester/config.go b/filebeat/harvester/config.go index 60b4b5de2f7d..05b653f403a2 100644 --- a/filebeat/harvester/config.go +++ b/filebeat/harvester/config.go @@ -8,7 +8,9 @@ import ( "github.com/elastic/beats/filebeat/harvester/reader" "github.com/dustin/go-humanize" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/match" + "github.com/elastic/beats/libbeat/processors" ) var ( @@ -24,26 +26,35 @@ var ( CloseRenamed: false, CloseEOF: false, CloseTimeout: 0, + DocumentType: "log", + CleanInactive: 0, } ) type harvesterConfig struct { - BufferSize int `config:"harvester_buffer_size"` - Encoding string `config:"encoding"` - InputType string `config:"input_type"` - Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"` - BackoffFactor int `config:"backoff_factor" validate:"min=1"` - MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` - CloseInactive time.Duration `config:"close_inactive"` - CloseRemoved bool `config:"close_removed"` - CloseRenamed bool `config:"close_renamed"` - CloseEOF bool `config:"close_eof"` - CloseTimeout time.Duration `config:"close_timeout" validate:"min=0"` - ExcludeLines []match.Matcher `config:"exclude_lines"` - IncludeLines []match.Matcher `config:"include_lines"` - MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"` - Multiline *reader.MultilineConfig `config:"multiline"` - JSON *reader.JSONConfig `config:"json"` + common.EventMetadata `config:",inline"` // Fields and tags to add to events. + BufferSize int `config:"harvester_buffer_size"` + Encoding string `config:"encoding"` + InputType string `config:"input_type"` + Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"` + BackoffFactor int `config:"backoff_factor" validate:"min=1"` + MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` + CloseInactive time.Duration `config:"close_inactive"` + CloseRemoved bool `config:"close_removed"` + CloseRenamed bool `config:"close_renamed"` + CloseEOF bool `config:"close_eof"` + CloseTimeout time.Duration `config:"close_timeout" validate:"min=0"` + ExcludeLines []match.Matcher `config:"exclude_lines"` + IncludeLines []match.Matcher `config:"include_lines"` + MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"` + Multiline *reader.MultilineConfig `config:"multiline"` + JSON *reader.JSONConfig `config:"json"` + DocumentType string `config:"document_type"` + CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"` + Pipeline string `config:"pipeline"` + Module string `config:"_module_name"` // hidden option to set the module name + Fileset string `config:"_fileset_name"` // hidden option to set the fileset name + Processors processors.PluginConfig `config:"processors"` } func (config *harvesterConfig) Validate() error { diff --git a/filebeat/harvester/harvester.go b/filebeat/harvester/harvester.go index 731eb17ace01..fcc64ec1538c 100644 --- a/filebeat/harvester/harvester.go +++ b/filebeat/harvester/harvester.go @@ -18,13 +18,14 @@ import ( "github.com/satori/go.uuid" - "github.com/elastic/beats/filebeat/channel" "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester/encoding" "github.com/elastic/beats/filebeat/harvester/source" "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" ) var ( @@ -35,9 +36,16 @@ var ( ErrClosed = errors.New("reader closed") ) +type Outlet interface { + SetSignal(signal <-chan struct{}) + OnEventSignal(event *input.Data) bool + OnEvent(event *input.Data) bool +} + type Harvester struct { config harvesterConfig state file.State + states *file.States prospectorChan chan *input.Event file source.FileSource /* the file being watched */ fileReader *LogFile @@ -46,19 +54,22 @@ type Harvester struct { done chan struct{} stopOnce sync.Once stopWg *sync.WaitGroup - outlet *channel.Outlet + outlet Outlet ID uuid.UUID + processors *processors.Processors } func NewHarvester( cfg *common.Config, state file.State, - outlet *channel.Outlet, + states *file.States, + outlet Outlet, ) (*Harvester, error) { h := &Harvester{ config: defaultConfig, state: state, + states: states, done: make(chan struct{}), stopWg: &sync.WaitGroup{}, outlet: outlet, @@ -75,6 +86,18 @@ func NewHarvester( } h.encodingFactory = encodingFactory + f, err := processors.New(h.config.Processors) + if err != nil { + return nil, err + } + + h.processors = f + + // Add ttl if clean_inactive is set + if h.config.CleanInactive > 0 { + h.state.TTL = h.config.CleanInactive + } + // Add outlet signal so harvester can also stop itself h.outlet.SetSignal(h.done) @@ -93,3 +116,36 @@ func (h *Harvester) open() error { return fmt.Errorf("Invalid input type") } } + +// updateState updates the prospector state and forwards the event to the spooler +// All state updates done by the prospector itself are synchronous to make sure not states are overwritten +func (h *Harvester) forwardEvent(event *input.Event) error { + + // Add additional prospector meta data to the event + event.EventMetadata = h.config.EventMetadata + event.InputType = h.config.InputType + event.DocumentType = h.config.DocumentType + event.JSONConfig = h.config.JSON + event.Pipeline = h.config.Pipeline + event.Module = h.config.Module + event.Fileset = h.config.Fileset + + eventHolder := event.GetData() + //run the filters before sending to spooler + if event.Bytes > 0 { + eventHolder.Event = h.processors.Run(eventHolder.Event) + } + + if eventHolder.Event == nil { + eventHolder.Metadata.Bytes = 0 + } + + ok := h.outlet.OnEventSignal(&eventHolder) + + if !ok { + logp.Info("Prospector outlet closed") + return errors.New("prospector outlet closed") + } + + return nil +} diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index f9e925842767..d4bfe145498f 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -3,14 +3,11 @@ package harvester import ( "bytes" "errors" + "fmt" "io" "os" "time" - "golang.org/x/text/transform" - - "fmt" - "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester/reader" "github.com/elastic/beats/filebeat/harvester/source" @@ -18,6 +15,8 @@ import ( "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" + + "golang.org/x/text/transform" ) var ( @@ -171,7 +170,11 @@ func (h *Harvester) Stop() { // sendEvent sends event to the spooler channel // Return false if event was not sent func (h *Harvester) sendEvent(event *input.Event) bool { - return h.outlet.OnEventSignal(event) + if h.file.HasState() { + h.states.Update(event.State) + } + err := h.forwardEvent(event) + return err == nil } // sendStateUpdate send an empty event with the current state to update the registry @@ -179,10 +182,19 @@ func (h *Harvester) sendEvent(event *input.Event) bool { // case the output is blocked the harvester will stay open to make sure no new harvester // is started. As soon as the output becomes available again, the finished state is written // and processing can continue. -func (h *Harvester) sendStateUpdate() { +func (h *Harvester) SendStateUpdate() { + + if !h.file.HasState() { + return + } + logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset) + event := input.NewEvent(h.state) - h.outlet.OnEvent(event) + h.states.Update(event.State) + + data := event.GetData() + h.outlet.OnEvent(&data) } // shouldExportLine decides if the line is exported or not based on @@ -317,7 +329,7 @@ func (h *Harvester) close() { // On completion, push offset so we can continue where we left off if we relaunch on the same file // Only send offset if file object was created successfully - h.sendStateUpdate() + h.SendStateUpdate() } else { logp.Warn("Stopping harvester, NOT closing file as file info not available: %s", h.state.Source) } diff --git a/filebeat/harvester/source/file.go b/filebeat/harvester/source/file.go index d55fe2cfb723..752324fc7d02 100644 --- a/filebeat/harvester/source/file.go +++ b/filebeat/harvester/source/file.go @@ -7,3 +7,4 @@ type File struct { } func (File) Continuable() bool { return true } +func (File) HasState() bool { return true } diff --git a/filebeat/harvester/source/pipe.go b/filebeat/harvester/source/pipe.go index 3b81b5a54ea0..413233e081c1 100644 --- a/filebeat/harvester/source/pipe.go +++ b/filebeat/harvester/source/pipe.go @@ -13,3 +13,4 @@ func (p Pipe) Close() error { return p.File.Close() } func (p Pipe) Name() string { return p.File.Name() } func (p Pipe) Stat() (os.FileInfo, error) { return p.File.Stat() } func (p Pipe) Continuable() bool { return false } +func (p Pipe) HasState() bool { return false } diff --git a/filebeat/harvester/source/source.go b/filebeat/harvester/source/source.go index c913908d8685..733f113f187c 100644 --- a/filebeat/harvester/source/source.go +++ b/filebeat/harvester/source/source.go @@ -14,4 +14,5 @@ type FileSource interface { LogSource Stat() (os.FileInfo, error) Continuable() bool // can we continue processing after EOF? + HasState() bool // does this source have a state? } diff --git a/filebeat/input/file/state.go b/filebeat/input/file/state.go index d227b25175d6..7626678eb935 100644 --- a/filebeat/input/file/state.go +++ b/filebeat/input/file/state.go @@ -39,7 +39,7 @@ func (s *State) IsEmpty() bool { // States handles list of FileState type States struct { states []State - sync.Mutex + sync.RWMutex } func NewStates() *States { @@ -66,9 +66,8 @@ func (s *States) Update(newState State) { } func (s *States) FindPrevious(newState State) State { - // TODO: This currently blocks writing updates every time state is fetched. Should be improved for performance - s.Lock() - defer s.Unlock() + s.RLock() + defer s.RUnlock() _, state := s.findPrevious(newState) return state } @@ -122,16 +121,16 @@ func (s *States) Cleanup() int { // Count returns number of states func (s *States) Count() int { - s.Lock() - defer s.Unlock() + s.RLock() + defer s.RUnlock() return len(s.states) } // Returns a copy of the file states func (s *States) GetStates() []State { - s.Lock() - defer s.Unlock() + s.RLock() + defer s.RUnlock() newStates := make([]State, len(s.states)) copy(newStates, s.states) diff --git a/filebeat/prospector/config.go b/filebeat/prospector/config.go index 1cef49f67b4d..190a5a569517 100644 --- a/filebeat/prospector/config.go +++ b/filebeat/prospector/config.go @@ -5,16 +5,12 @@ import ( "time" cfg "github.com/elastic/beats/filebeat/config" - "github.com/elastic/beats/filebeat/harvester/reader" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/match" - "github.com/elastic/beats/libbeat/processors" ) var ( defaultConfig = prospectorConfig{ Enabled: true, - DocumentType: "log", IgnoreOlder: 0, ScanFrequency: 10 * time.Second, InputType: cfg.DefaultInputType, @@ -27,25 +23,18 @@ var ( ) type prospectorConfig struct { - common.EventMetadata `config:",inline"` // Fields and tags to add to events. - Enabled bool `config:"enabled"` - DocumentType string `config:"document_type"` - ExcludeFiles []match.Matcher `config:"exclude_files"` - IgnoreOlder time.Duration `config:"ignore_older"` - Paths []string `config:"paths"` - ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"` - InputType string `config:"input_type"` - CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"` - CleanRemoved bool `config:"clean_removed"` - HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"` - Symlinks bool `config:"symlinks"` - TailFiles bool `config:"tail_files"` - JSON *reader.JSONConfig `config:"json"` - Pipeline string `config:"pipeline"` - Module string `config:"_module_name"` // hidden option to set the module name - Fileset string `config:"_fileset_name"` // hidden option to set the fileset name - Processors processors.PluginConfig `config:"processors"` - recursiveGlob bool `config:"recursive_glob.enabled"` + Enabled bool `config:"enabled"` + ExcludeFiles []match.Matcher `config:"exclude_files"` + IgnoreOlder time.Duration `config:"ignore_older"` + Paths []string `config:"paths"` + ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"` + InputType string `config:"input_type"` + CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"` + CleanRemoved bool `config:"clean_removed"` + HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"` + Symlinks bool `config:"symlinks"` + TailFiles bool `config:"tail_files"` + recursiveGlob bool `config:"recursive_glob.enabled"` } func (config *prospectorConfig) Validate() error { diff --git a/filebeat/prospector/prospector.go b/filebeat/prospector/prospector.go index 604c5cdddd1f..3a9c27273af7 100644 --- a/filebeat/prospector/prospector.go +++ b/filebeat/prospector/prospector.go @@ -8,7 +8,6 @@ import ( "github.com/mitchellh/hashstructure" - "github.com/elastic/beats/filebeat/channel" cfg "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester" "github.com/elastic/beats/filebeat/input" @@ -16,7 +15,6 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" - "github.com/elastic/beats/libbeat/processors" ) var ( @@ -35,13 +33,11 @@ type Prospector struct { runWg *sync.WaitGroup states *file.States wg *sync.WaitGroup - channelWg *sync.WaitGroup // Separate waitgroup for channels as not stopped on completion id uint64 Once bool registry *harvesterRegistry beatDone chan struct{} eventCounter *sync.WaitGroup - processors *processors.Processors } // Prospectorer is the interface common to all prospectors @@ -52,7 +48,10 @@ type Prospectorer interface { // Outlet is the outlet for a prospector type Outlet interface { + SetSignal(signal <-chan struct{}) + OnEventSignal(event *input.Data) bool OnEvent(event *input.Data) bool + Copy() Outlet } // NewProspector instantiates a new prospector @@ -67,7 +66,6 @@ func NewProspector(cfg *common.Config, outlet Outlet, beatDone chan struct{}) (* runDone: make(chan struct{}), runWg: &sync.WaitGroup{}, states: &file.States{}, - channelWg: &sync.WaitGroup{}, Once: false, registry: newHarvesterRegistry(), beatDone: beatDone, @@ -86,13 +84,6 @@ func NewProspector(cfg *common.Config, outlet Outlet, beatDone chan struct{}) (* return nil, err } - f, err := processors.New(prospector.config.Processors) - if err != nil { - return nil, err - } - - prospector.processors = f - logp.Debug("prospector", "File Configs: %v", prospector.config.Paths) return prospector, nil @@ -137,28 +128,6 @@ func (p *Prospector) Start() { p.wg.Add(1) logp.Info("Starting prospector of type: %v; id: %v ", p.config.InputType, p.ID()) - // Open channel to receive events from harvester and forward them to spooler - // Here potential filtering can happen - p.channelWg.Add(1) - go func() { - defer p.channelWg.Done() - for { - select { - case <-p.channelDone: - logp.Info("Prospector channel stopped") - return - case <-p.beatDone: - logp.Info("Prospector channel stopped because beat is stopping.") - return - case event := <-p.harvesterChan: - // No stopping on error, because on error it is expected that beatDone is closed - // in the next run. If not, this will further drain the channel. - p.updateState(event) - p.eventCounter.Done() - } - } - }() - if p.Once { // Makes sure prospectors can complete first scan before stopped defer p.runWg.Wait() @@ -207,31 +176,19 @@ func (p *Prospector) ID() uint64 { // updateState updates the prospector state and forwards the event to the spooler // All state updates done by the prospector itself are synchronous to make sure not states are overwritten -func (p *Prospector) updateState(event *input.Event) error { +func (p *Prospector) updateState(state file.State) error { // Add ttl if cleanOlder is enabled and TTL is not already 0 - if p.config.CleanInactive > 0 && event.State.TTL != 0 { - event.State.TTL = p.config.CleanInactive + if p.config.CleanInactive > 0 && state.TTL != 0 { + state.TTL = p.config.CleanInactive } - // Add additional prospector meta data to the event - event.EventMetadata = p.config.EventMetadata - event.InputType = p.config.InputType - event.DocumentType = p.config.DocumentType - event.JSONConfig = p.config.JSON - event.Pipeline = p.config.Pipeline - event.Module = p.config.Module - event.Fileset = p.config.Fileset - - eventHolder := event.GetData() - //run the filters before sending to spooler - if event.Bytes > 0 { - eventHolder.Event = p.processors.Run(eventHolder.Event) - } + // Update first internal state + p.states.Update(state) - if eventHolder.Event == nil { - eventHolder.Metadata.Bytes = 0 - } + eventHolder := input.NewEvent(state).GetData() + // Set to 0 as these are state updates only + eventHolder.Metadata.Bytes = 0 ok := p.outlet.OnEvent(&eventHolder) @@ -240,7 +197,6 @@ func (p *Prospector) updateState(event *input.Event) error { return errors.New("prospector outlet closed") } - p.states.Update(event.State) return nil } @@ -297,17 +253,17 @@ func (p *Prospector) waitEvents() { close(p.channelDone) case <-p.beatDone: } - // Waits until channel go-routine properly stopped - p.channelWg.Wait() } // createHarvester creates a new harvester instance from the given state func (p *Prospector) createHarvester(state file.State) (*harvester.Harvester, error) { - outlet := channel.NewOutlet(p.beatDone, p.harvesterChan, p.eventCounter) + // Each harvester gets its own copy of the outlet + outlet := p.outlet.Copy() h, err := harvester.NewHarvester( p.cfg, state, + p.states, outlet, ) @@ -323,9 +279,9 @@ func (p *Prospector) startHarvester(state file.State, offset int64) error { return fmt.Errorf("Harvester limit reached") } - state.Offset = offset // Set state to "not" finished to indicate that a harvester is running state.Finished = false + state.Offset = offset // Create harvester with state h, err := p.createHarvester(state) @@ -333,23 +289,8 @@ func (p *Prospector) startHarvester(state file.State, offset int64) error { return err } - // State is directly updated and not through channel to make state update synchronous - err = p.updateState(input.NewEvent(state)) - if err != nil { - return err - } - reader, err := h.Setup() if err != nil { - // Set state to finished True again in case of setup failure to make sure - // file can be picked up again by a future harvester - state.Finished = true - - updateErr := p.updateState(input.NewEvent(state)) - // This should only happen in the case that filebeat is stopped - if updateErr != nil { - logp.Err("Error updating state: %v", updateErr) - } return fmt.Errorf("Error setting up harvester: %s", err) } diff --git a/filebeat/prospector/prospector_log.go b/filebeat/prospector/prospector_log.go index 01ad464263d3..b471fb4ea1ab 100644 --- a/filebeat/prospector/prospector_log.go +++ b/filebeat/prospector/prospector_log.go @@ -7,7 +7,6 @@ import ( "time" "github.com/elastic/beats/filebeat/harvester" - "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/filebeat/input/file" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" @@ -60,7 +59,7 @@ func (l *Log) LoadStates(states []file.State) error { } // Update prospector states and send new states to registry - err := l.Prospector.updateState(input.NewEvent(state)) + err := l.Prospector.updateState(state) if err != nil { logp.Err("Problem putting initial state: %+v", err) return err @@ -131,7 +130,7 @@ func (l *Log) removeState(state file.State) { } state.TTL = 0 - err := l.Prospector.updateState(input.NewEvent(state)) + err := l.Prospector.updateState(state) if err != nil { logp.Err("File cleanup state update error: %s", err) } @@ -323,7 +322,7 @@ func (l *Log) harvestExistingFile(newState file.State, oldState file.State) { logp.Debug("prospector", "Updating state for renamed file: %s -> %s, Current offset: %v", oldState.Source, newState.Source, oldState.Offset) // Update state because of file rotation oldState.Source = newState.Source - err := l.Prospector.updateState(input.NewEvent(oldState)) + err := l.Prospector.updateState(oldState) if err != nil { logp.Err("File rotation state update error: %s", err) } @@ -367,7 +366,7 @@ func (l *Log) handleIgnoreOlder(lastState, newState file.State) error { // Write state for ignore_older file as none exists yet newState.Finished = true - err := l.Prospector.updateState(input.NewEvent(newState)) + err := l.Prospector.updateState(newState) if err != nil { return err } diff --git a/filebeat/prospector/prospector_log_other_test.go b/filebeat/prospector/prospector_log_other_test.go index da0afe50a5b2..543399c561c4 100644 --- a/filebeat/prospector/prospector_log_other_test.go +++ b/filebeat/prospector/prospector_log_other_test.go @@ -155,4 +155,7 @@ func TestInit(t *testing.T) { // TestOutlet is an empty outlet for testing type TestOutlet struct{} -func (o TestOutlet) OnEvent(event *input.Data) bool { return true } +func (o TestOutlet) OnEvent(event *input.Data) bool { return true } +func (o TestOutlet) OnEventSignal(event *input.Data) bool { return true } +func (o TestOutlet) SetSignal(signal <-chan struct{}) {} +func (o TestOutlet) Copy() Outlet { return o } diff --git a/filebeat/prospector/registry.go b/filebeat/prospector/registry.go index 86c8aa1161d7..0f600f5d9f86 100644 --- a/filebeat/prospector/registry.go +++ b/filebeat/prospector/registry.go @@ -53,6 +53,12 @@ func (hr *harvesterRegistry) start(h *harvester.Harvester, r reader.Reader) { hr.wg.Add(1) hr.add(h) + + // Update state before staring harvester + // This makes sure the states is set to Finished: false + // This is synchronous state update as part of the scan + h.SendStateUpdate() + go func() { defer func() { hr.remove(h)