Skip to content

Commit

Permalink
Fix concurrent harvesters (#2541)
Browse files Browse the repository at this point in the history
In case newly started harvesters did not persist their first state before the next scan started, it could have happened that multiple harvesters were started for the same file. This could have been cause by a large number of files or the output blocking.

The problem is solve that the Setup step of the Harvester is now synchronus and blocking the scan. Part of this is also updating the first state of the as part of the prospector.

The side affect of this change is that now a scan is blocking in case the channel is blocked which means the output is probably not responding. If the output is not responding, scans will not continue and new files will not be discovered until output is available again.

The code can be further simplified in the future by merging create/startHarvester. This will be done in a second step to keep backport commit to a minimum.

See also #2539
  • Loading branch information
ruflin authored and tsg committed Sep 14, 2016
1 parent 69e3585 commit dc80b9c
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d
*Filebeat*
- Fix processor failure in Filebeat when using regex, contain, or equals with the message field. {issue}2178[2178]
- Fix async publisher sending empty events {pull}2455[2455]
- Fix potential issue with multiple harvester per file on large file numbers or slow output {pull}2541[2541]

*Winlogbeat*
- Fix corrupt registry file that occurs on power loss by disabling file write caching. {issue}2313[2313]
Expand Down
43 changes: 21 additions & 22 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,38 +28,37 @@ var (
filesTruncated = expvar.NewInt("filebeat.harvester.files.truncated")
)

// Log harvester reads files line by line and sends events to the defined output
func (h *Harvester) Harvest() {
// Setup opens the file handler and creates the reader for the harvester
func (h *Harvester) Setup() (reader.Reader, error) {
err := h.open()
if err != nil {
return nil, fmt.Errorf("Harvester setup failed. Unexpected file opening error: %s", err)
}

r, err := h.newLogFileReader()
if err != nil {
if h.file != nil {
h.file.Close()
}
return nil, fmt.Errorf("Harvester setup failed. Unexpected encoding line reader error: %s", err)
}

return r, nil

}

// Harvest reads files line by line and sends events to the defined output
func (h *Harvester) Harvest(r reader.Reader) {

harvesterStarted.Add(1)
harvesterRunning.Add(1)
defer harvesterRunning.Add(-1)

h.state.Finished = false

// Makes sure file is properly closed when the harvester is stopped
defer h.close()

err := h.open()
if err != nil {
logp.Err("Stop Harvesting. Unexpected file opening error: %s", err)
return
}

logp.Info("Harvester started for file: %s", h.state.Source)

r, err := h.newLogFileReader()
if err != nil {
logp.Err("Stop Harvesting. Unexpected encoding line reader error: %s", err)
return
}

// Always report the state before starting a harvester
// This is useful in case the file was renamed
if !h.sendStateUpdate() {
return
}

for {
select {
case <-h.done:
Expand Down
50 changes: 39 additions & 11 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package prospector

import (
"errors"
"expvar"
"fmt"
"sync"
Expand Down Expand Up @@ -116,18 +117,10 @@ func (p *Prospector) Run() {
logp.Info("Prospector channel stopped")
return
case event := <-p.harvesterChan:
// Add ttl if cleanOlder is enabled
if p.config.CleanInactive > 0 {
event.State.TTL = p.config.CleanInactive
}

ok := p.outlet.OnEvent(event)
if !ok {
logp.Info("Prospector outlet closed")
err := p.updateState(event)
if err != nil {
return
}

p.states.Update(event.State)
}
}
}()
Expand All @@ -147,6 +140,25 @@ func (p *Prospector) Run() {
}
}

// updateState updates the prospector state and forwards the event to the spooler
// All state updates done by the prospector itself are synchronous to make sure not states are overwritten
func (p *Prospector) updateState(event *input.Event) error {

// Add ttl if cleanOlder is enabled
if p.config.CleanInactive > 0 {
event.State.TTL = p.config.CleanInactive
}

ok := p.outlet.OnEvent(event)
if !ok {
logp.Info("Prospector outlet closed")
return errors.New("prospector outlet closed")
}

p.states.Update(event.State)
return nil
}

func (p *Prospector) Stop() {
logp.Info("Stopping Prospector")
close(p.done)
Expand Down Expand Up @@ -176,12 +188,27 @@ func (p *Prospector) startHarvester(state file.State, offset int64) error {
}

state.Offset = offset
// Set state to "not" finished to indicate that a harvester is running
state.Finished = false

// Create harvester with state
h, err := p.createHarvester(state)
if err != nil {
return err
}

reader, err := h.Setup()
if err != nil {
return fmt.Errorf("Error setting up harvester: %s", err)
}

// State is directly updated and not through channel to make state update immidiate
// State is only updated after setup is completed successfully
err = p.updateState(input.NewEvent(state))
if err != nil {
return err
}

p.wg.Add(1)
// startHarvester is not run concurrently, but atomic operations are need for the decrementing of the counter
// inside the following go routine
Expand All @@ -191,8 +218,9 @@ func (p *Prospector) startHarvester(state file.State, offset int64) error {
atomic.AddUint64(&p.harvesterCounter, ^uint64(0))
p.wg.Done()
}()

// Starts harvester and picks the right type. In case type is not set, set it to defeault (log)
h.Harvest()
h.Harvest(reader)
}()

return nil
Expand Down
10 changes: 8 additions & 2 deletions filebeat/prospector/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ func (p *ProspectorLog) Run() {
if state.Finished {
state.TTL = 0
event := input.NewEvent(state)
p.Prospector.harvesterChan <- event
err := p.Prospector.updateState(event)
if err != nil {
logp.Err("File cleanup state update error: %s", err)
}
logp.Debug("prospector", "Remove state for file as file removed: %s", state.Source)
} else {
logp.Debug("prospector", "State for file not removed because not finished: %s", state.Source)
Expand Down Expand Up @@ -232,7 +235,10 @@ func (p *ProspectorLog) harvestExistingFile(newState file.State, oldState file.S
// Update state because of file rotation
oldState.Source = newState.Source
event := input.NewEvent(oldState)
p.Prospector.harvesterChan <- event
err := p.Prospector.updateState(event)
if err != nil {
logp.Err("File rotation state update error: %s", err)
}

filesRenamed.Add(1)
} else {
Expand Down
8 changes: 7 additions & 1 deletion filebeat/prospector/prospector_stdin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/logp"
)

type ProspectorStdin struct {
Expand Down Expand Up @@ -36,7 +37,12 @@ func (p *ProspectorStdin) Run() {

// Make sure stdin harvester is only started once
if !p.started {
go p.harvester.Harvest()
reader, err := p.harvester.Setup()
if err != nil {
logp.Err("Error starting stdin harvester: %s", err)
return
}
go p.harvester.Harvest(reader)
p.started = true
}
}
48 changes: 48 additions & 0 deletions filebeat/tests/system/test_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import logging.handlers
import json
import time
import unittest
from nose.plugins.skip import Skip, SkipTest
from nose.plugins.attrib import attr
Expand Down Expand Up @@ -146,3 +147,50 @@ def test_large_number_of_files(self):
assert len(data) == number_of_files


@unittest.skipUnless(LOAD_TESTS, "load test")
@attr('load')
def test_concurrent_harvesters(self):
"""
Test large number of files on startup if harvester overlap happens and would create too many events
"""
number_of_files = 5000
lines_per_file = 10

# Create content for each file
content = ""
for n in range(lines_per_file):
content += "Line " + str(n+1) + "\n"

os.mkdir(self.working_dir + "/log/")
testfile = self.working_dir + "/log/test"

for n in range(number_of_files):
with open(testfile + "-" + str(n+1), 'w') as f:
f.write(content)


self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
rotate_every_kb=number_of_files * lines_per_file * 12 * 2,
)
filebeat = self.start_beat()

total_lines = number_of_files * lines_per_file

print total_lines
# wait until all lines are read
self.wait_until(
lambda: self.output_has(lines=total_lines),
max_timeout=120)

time.sleep(2)

# make sure not further lines were read
self.wait_until(
lambda: self.output_has(lines=total_lines),
max_timeout=120)

filebeat.check_kill_and_wait()

data = self.get_registry()
assert len(data) == number_of_files

0 comments on commit dc80b9c

Please sign in to comment.