diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 969d42ab8f2..aab5377f19b 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -87,14 +87,16 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d *Packetbeat* - - Add cassandra protocol analyzer to packetbeat. {pull}1959[1959] - - Match connections with IPv6 addresses to processes {pull}2254[2254] - - Add IP address to -devices command output {pull}2327[2327] +- Add cassandra protocol analyzer to packetbeat. {pull}1959[1959] +- Match connections with IPv6 addresses to processes {pull}2254[2254] +- Add IP address to -devices command output {pull}2327[2327] *Topbeat* *Filebeat* +- Add harvester_limit option {pull}2417[2417] + *Winlogbeat* diff --git a/filebeat/docs/reference/configuration/filebeat-options.asciidoc b/filebeat/docs/reference/configuration/filebeat-options.asciidoc index a43bb4e4dd8..9f78ed04110 100644 --- a/filebeat/docs/reference/configuration/filebeat-options.asciidoc +++ b/filebeat/docs/reference/configuration/filebeat-options.asciidoc @@ -209,7 +209,6 @@ The timestamp for closing a file does not depend on the modification time of the You can use time strings like 2h (2 hours) and 5m (5 minutes). The default is 5m. - ===== close_renamed WARNING: Only use this options if you understand that data loss is a potential side effect. @@ -410,6 +409,24 @@ the backoff algorithm is disabled, and the `backoff` value is used for waiting f lines. The `backoff` value will be multiplied each time with the `backoff_factor` until `max_backoff` is reached. The default is 2. +===== harvester_limit + +EXPERIMENTAL + +harvester_limit limits the number of harvesters that are started in parallel for one prospector. This directly relates +to the maximum number of file handlers that are opened. The default is 0 which means there is no limit. This configuration +is useful if the number of files to be harvested exceeds the open file handler limit of the operating system. + +As setting a limit on harvester means that potentially not all files are opened in parallel, it is recommended to use +this option in combination with the close_* options to make sure harvesters are stopped more often so new files can be +picked up. + +Currently if a new harvester can be started again, the new harvester to be started is picked randomly. This means it can +happen that a harvester for a file which was just closed and the file was updated again will be started instead of a +harvester for a file which wasn't harvested for a longer period of time. + +This configuration option applies per prospector. This can be indirectly used to set higher priorities on certain prospectors +by assining a higher limit of harvesters. [[configuration-global-options]] === Filebeat Global Configuration diff --git a/filebeat/etc/beat.full.yml b/filebeat/etc/beat.full.yml index 759e0e80a4d..a77eb4c1fd6 100644 --- a/filebeat/etc/beat.full.yml +++ b/filebeat/etc/beat.full.yml @@ -161,6 +161,10 @@ filebeat.prospectors: # The backoff value will be multiplied each time with the backoff_factor until max_backoff is reached #backoff_factor: 2 + # Experimental: Max number of harvesters that are started in parallel. + # Default is 0 which means unlimited + #harvester_limit: 0 + ### Harvester closing options # Close inactive closes the file handler after the predefined period. diff --git a/filebeat/filebeat.full.yml b/filebeat/filebeat.full.yml index 4e1bf7fa444..590b208239a 100644 --- a/filebeat/filebeat.full.yml +++ b/filebeat/filebeat.full.yml @@ -161,6 +161,10 @@ filebeat.prospectors: # The backoff value will be multiplied each time with the backoff_factor until max_backoff is reached #backoff_factor: 2 + # Experimental: Max number of harvesters that are started in parallel. + # Default is 0 which means unlimited + #harvester_limit: 0 + ### Harvester closing options # Close inactive closes the file handler after the predefined period. diff --git a/filebeat/prospector/config.go b/filebeat/prospector/config.go index 316710fec63..ecd886dbcc0 100644 --- a/filebeat/prospector/config.go +++ b/filebeat/prospector/config.go @@ -10,22 +10,24 @@ import ( var ( defaultConfig = prospectorConfig{ - IgnoreOlder: 0, - ScanFrequency: 10 * time.Second, - InputType: cfg.DefaultInputType, - CleanInactive: 0, - CleanRemoved: false, + IgnoreOlder: 0, + ScanFrequency: 10 * time.Second, + InputType: cfg.DefaultInputType, + CleanInactive: 0, + CleanRemoved: false, + HarvesterLimit: 0, } ) type prospectorConfig struct { - ExcludeFiles []*regexp.Regexp `config:"exclude_files"` - IgnoreOlder time.Duration `config:"ignore_older"` - Paths []string `config:"paths"` - ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"` - InputType string `config:"input_type"` - CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"` - CleanRemoved bool `config:"clean_removed"` + ExcludeFiles []*regexp.Regexp `config:"exclude_files"` + IgnoreOlder time.Duration `config:"ignore_older"` + Paths []string `config:"paths"` + ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"` + InputType string `config:"input_type"` + CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"` + CleanRemoved bool `config:"clean_removed"` + HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"` } func (config *prospectorConfig) Validate() error { diff --git a/filebeat/prospector/prospector.go b/filebeat/prospector/prospector.go index 4886445132c..f90011dd11a 100644 --- a/filebeat/prospector/prospector.go +++ b/filebeat/prospector/prospector.go @@ -5,6 +5,8 @@ import ( "sync" "time" + "sync/atomic" + cfg "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester" "github.com/elastic/beats/filebeat/input" @@ -14,14 +16,15 @@ import ( ) type Prospector struct { - cfg *common.Config // Raw config - config prospectorConfig - prospectorer Prospectorer - spoolerChan chan *input.Event - harvesterChan chan *input.Event - done chan struct{} - states *file.States - wg sync.WaitGroup + cfg *common.Config // Raw config + config prospectorConfig + prospectorer Prospectorer + spoolerChan chan *input.Event + harvesterChan chan *input.Event + done chan struct{} + states *file.States + wg sync.WaitGroup + harvesterCounter uint64 } type Prospectorer interface { @@ -154,7 +157,14 @@ func (p *Prospector) createHarvester(state file.State) (*harvester.Harvester, er return h, err } +// startHarvester starts a new harvester with the given offset +// In case the HarvesterLimit is reached, an error is returned func (p *Prospector) startHarvester(state file.State, offset int64) error { + + if p.config.HarvesterLimit > 0 && atomic.LoadUint64(&p.harvesterCounter) >= p.config.HarvesterLimit { + return fmt.Errorf("Harvester limit reached.") + } + state.Offset = offset // Create harvester with state h, err := p.createHarvester(state) @@ -163,8 +173,14 @@ func (p *Prospector) startHarvester(state file.State, offset int64) error { } p.wg.Add(1) + // startHarvester is not run concurrently, but atomic operations are need for the decrementing of the counter + // inside the following go routine + atomic.AddUint64(&p.harvesterCounter, 1) go func() { - defer p.wg.Done() + defer func() { + atomic.AddUint64(&p.harvesterCounter, ^uint64(0)) + p.wg.Done() + }() // Starts harvester and picks the right type. In case type is not set, set it to defeault (log) h.Harvest() }() diff --git a/filebeat/prospector/prospector_log.go b/filebeat/prospector/prospector_log.go index 099f20ac228..6133f29134e 100644 --- a/filebeat/prospector/prospector_log.go +++ b/filebeat/prospector/prospector_log.go @@ -159,7 +159,7 @@ func (p *ProspectorLog) scan() { logp.Debug("prospector", "Start harvester for new file: %s", newState.Source) err := p.Prospector.startHarvester(newState, 0) if err != nil { - logp.Err("Harvester could not be started on new file: %s", err) + logp.Err("Harvester could not be started on new file: %s, Err: %s", newState.Source, err) } } else { p.harvestExistingFile(newState, lastState) @@ -182,7 +182,7 @@ func (p *ProspectorLog) harvestExistingFile(newState file.State, oldState file.S logp.Debug("prospector", "Resuming harvesting of file: %s, offset: %v", newState.Source, oldState.Offset) err := p.Prospector.startHarvester(newState, oldState.Offset) if err != nil { - logp.Err("Harvester could not be started on existing file: %s", err) + logp.Err("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err) } return } @@ -192,7 +192,7 @@ func (p *ProspectorLog) harvestExistingFile(newState file.State, oldState file.S logp.Debug("prospector", "Old file was truncated. Starting from the beginning: %s", newState.Source) err := p.Prospector.startHarvester(newState, 0) if err != nil { - logp.Err("Harvester could not be started on truncated file: %s", err) + logp.Err("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err) } filesTrucated.Add(1) diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index cafcc58c552..ffe95c9b5cf 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -25,6 +25,7 @@ filebeat.prospectors: force_close_files: {{force_close_files}} clean_inactive: {{clean_inactive}} clean_removed: {{clean_removed}} + harvester_limit: {{harvester_limit | default(0) }} {% if fields %} fields: diff --git a/filebeat/tests/system/test_prospector.py b/filebeat/tests/system/test_prospector.py index 7bc49f548bd..e5baa31ffde 100644 --- a/filebeat/tests/system/test_prospector.py +++ b/filebeat/tests/system/test_prospector.py @@ -574,10 +574,57 @@ def test_skip_symlinks(self): lambda: self.output_has(lines=1), max_timeout=15) - time.sleep(5) filebeat.check_kill_and_wait() data = self.read_output() # Make sure there is only one entry, means it didn't follow the symlink assert len(data) == 1 + + def test_harvester_limit(self): + """ + Test if harvester_limit applies + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + harvester_limit=1, + close_inactive="1s", + scan_frequency="1s", + ) + + os.mkdir(self.working_dir + "/log/") + testfile1 = self.working_dir + "/log/test1.log" + testfile2 = self.working_dir + "/log/test2.log" + testfile3 = self.working_dir + "/log/test3.log" + + with open(testfile1, 'w') as file: + file.write("Line1\n") + + with open(testfile2, 'w') as file: + file.write("Line2\n") + + with open(testfile3, 'w') as file: + file.write("Line3\n") + + filebeat = self.start_beat() + + # check that not all harvesters were started + self.wait_until( + lambda: self.log_contains("Harvester limit reached"), + max_timeout=10) + + # wait for registry to be written + self.wait_until( + lambda: self.log_contains("Registry file updated"), + max_timeout=10) + + # Make sure not all events were written so far + data = self.read_output() + assert len(data) < 3 + + self.wait_until(lambda: self.output_has(lines=3), max_timeout=15) + + data = self.read_output() + assert len(data) == 3 + + filebeat.check_kill_and_wait()