From 0df1eabcedd2cad4b6179bdcb1a059fda89d9e4d Mon Sep 17 00:00:00 2001 From: ruflin Date: Wed, 29 Jun 2016 09:49:17 +0200 Subject: [PATCH] Add close_timeout option close_timeout will end the harvester after the predefined time. In case the output is blocked, close_timeout will only apply on the next event sent. This is identical with the close_* options. --- CHANGELOG.asciidoc | 2 + .../configuration/filebeat-options.asciidoc | 11 ++++ filebeat/docs/troubleshooting.asciidoc | 3 +- filebeat/etc/beat.full.yml | 5 ++ filebeat/filebeat.full.yml | 5 ++ filebeat/harvester/config.go | 2 + filebeat/harvester/harvester.go | 3 +- filebeat/harvester/log.go | 21 ++++++- filebeat/harvester/log_file.go | 13 +++- filebeat/prospector/prospector.go | 1 + filebeat/tests/system/config/filebeat.yml.j2 | 1 + filebeat/tests/system/test_harvester.py | 41 +++++++++++++ filebeat/tests/system/test_multiline.py | 59 ++++++++++++++++++- 13 files changed, 158 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index aab5377f19b..c124e84e6b6 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -94,6 +94,8 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d *Topbeat* *Filebeat* +- Introduce close_timeout harvester options {issue}1600[1600] + - Add harvester_limit option {pull}2417[2417] diff --git a/filebeat/docs/reference/configuration/filebeat-options.asciidoc b/filebeat/docs/reference/configuration/filebeat-options.asciidoc index 9f78ed04110..8521057d380 100644 --- a/filebeat/docs/reference/configuration/filebeat-options.asciidoc +++ b/filebeat/docs/reference/configuration/filebeat-options.asciidoc @@ -233,6 +233,17 @@ WARNING: Only use this options if you understand that data loss is a potential s Close eof closes a file as soon as the end of a file is reached. This is useful in case your files are only written once and not updated from time to time. This case can happen in case you are writing every single log event to a new file. +===== close_timeout + +WARNING: Only use this options if you understand the potential side affects with potential data loss. In addition it can happen that multiline events are not sent completely on timeout. + +Close timeout gives every harvester a predefined lifetime. Independent of the location of the reader, it will stop the reader after `close_timeout`. This option can be useful, if only a predefine time should be spent on older log files. Using this option in combination with `ignore_older` == `close_timeout` means the file is not picked up again in case it wasn't modified in between. This normally leads to data loss and not the complete file is sent. + +In case close_timeout is used in combination with multiline events, it can happen that the harvester will be stopped in the middle of a multiline event, means only parts of the event will be sent. In case the harvester is continued at a later stage again and the file still exists, only the second part of the event will be sent. + +Close timeout will not apply, in case your output is stuck and no further events can be sent. At least one event must be sent after close_timeout kicks in so the harvester can be closed afterwards. + + [[clean-options]] ===== clean_* diff --git a/filebeat/docs/troubleshooting.asciidoc b/filebeat/docs/troubleshooting.asciidoc index 52b74aea545..1e43dcd5344 100644 --- a/filebeat/docs/troubleshooting.asciidoc +++ b/filebeat/docs/troubleshooting.asciidoc @@ -22,8 +22,9 @@ There are 4 more configuration options which can be used to close file handlers, * close_renamed * close_removed * close_eof +* close_timeout -`close_renamed` and `close_removed` can be useful on Windows and issues related to file rotation, see <>. `close_eof` can be useful in environments with a large number of files with only very few entries. More details can be found in config options, see <>. +`close_renamed` and `close_removed` can be useful on Windows and issues related to file rotation, see <>. `close_eof` can be useful in environments with a large number of files with only very few entries. `close_timeout` in environments where it is more important to close file handlers then to send all log lines. More details can be found in config options, see <>. Before using any of these variables, make sure to study the documentation on each. diff --git a/filebeat/etc/beat.full.yml b/filebeat/etc/beat.full.yml index a77eb4c1fd6..8af8d676853 100644 --- a/filebeat/etc/beat.full.yml +++ b/filebeat/etc/beat.full.yml @@ -196,6 +196,11 @@ filebeat.prospectors: # Removes the state for file which cannot be found on disk anymore immediately #clean_removed: false + # Close timeout closes the harvester after the predefined time. + # This is independent if the harvester did finish reading the file or not. + # By default this option is disabled. + # Note: Potential data loss. Make sure to read and understand the docs for this option. + #close_timeout: 0 #----------------------------- Stdin prospector ------------------------------- # Configuration to use stdin input diff --git a/filebeat/filebeat.full.yml b/filebeat/filebeat.full.yml index 590b208239a..d991441c867 100644 --- a/filebeat/filebeat.full.yml +++ b/filebeat/filebeat.full.yml @@ -196,6 +196,11 @@ filebeat.prospectors: # Removes the state for file which cannot be found on disk anymore immediately #clean_removed: false + # Close timeout closes the harvester after the predefined time. + # This is independent if the harvester did finish reading the file or not. + # By default this option is disabled. + # Note: Potential data loss. Make sure to read and understand the docs for this option. + #close_timeout: 0 #----------------------------- Stdin prospector ------------------------------- # Configuration to use stdin input diff --git a/filebeat/harvester/config.go b/filebeat/harvester/config.go index e3b819f51ca..52a48fdd5b7 100644 --- a/filebeat/harvester/config.go +++ b/filebeat/harvester/config.go @@ -27,6 +27,7 @@ var ( CloseRemoved: false, CloseRenamed: false, CloseEOF: false, + CloseTimeout: 0, ForceCloseFiles: false, } ) @@ -46,6 +47,7 @@ type harvesterConfig struct { CloseRemoved bool `config:"close_removed"` CloseRenamed bool `config:"close_renamed"` CloseEOF bool `config:"close_eof"` + CloseTimeout time.Duration `config:"close_timeout" validate:"min=0"` ForceCloseFiles bool `config:"force_close_files"` ExcludeLines []*regexp.Regexp `config:"exclude_lines"` IncludeLines []*regexp.Regexp `config:"include_lines"` diff --git a/filebeat/harvester/harvester.go b/filebeat/harvester/harvester.go index 01ac3e4ef15..0a3c4416498 100644 --- a/filebeat/harvester/harvester.go +++ b/filebeat/harvester/harvester.go @@ -38,9 +38,10 @@ type Harvester struct { state file.State prospectorChan chan *input.Event file source.FileSource /* the file being watched */ - done chan struct{} + fileReader *LogFile encodingFactory encoding.EncodingFactory encoding encoding.Encoding + done chan struct{} } func NewHarvester( diff --git a/filebeat/harvester/log.go b/filebeat/harvester/log.go index 68cf765fd25..18a8bb98471 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/harvester/log.go @@ -5,6 +5,7 @@ import ( "expvar" "io" "os" + "time" "golang.org/x/text/transform" @@ -260,6 +261,7 @@ func (h *Harvester) close() { if h.file != nil { h.file.Close() + logp.Debug("harvester", "Closing file: %s", h.state.Source) harvesterOpenFiles.Add(-1) @@ -294,12 +296,27 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) { // TODO: NewLineReader uses additional buffering to deal with encoding and testing // for new lines in input stream. Simple 8-bit based encodings, or plain // don't require 'complicated' logic. - fileReader, err := NewLogFile(h.file, h.config, h.done) + h.fileReader, err = NewLogFile(h.file, h.config) if err != nil { return nil, err } - r, err = reader.NewEncode(fileReader, h.encoding, h.config.BufferSize) + // Closes reader after timeout or when done channel is closed + go func() { + var closeTimeout <-chan time.Time + if h.config.CloseTimeout > 0 { + closeTimeout = time.After(h.config.CloseTimeout) + } + + select { + case <-h.done: + case <-closeTimeout: + logp.Info("Closing harvester because close_timeout was reached: %s", h.state.Source) + } + h.fileReader.Close() + }() + + r, err = reader.NewEncode(h.fileReader, h.encoding, h.config.BufferSize) if err != nil { return nil, err } diff --git a/filebeat/harvester/log_file.go b/filebeat/harvester/log_file.go index 54018092b16..a1c6d89c141 100644 --- a/filebeat/harvester/log_file.go +++ b/filebeat/harvester/log_file.go @@ -3,6 +3,7 @@ package harvester import ( "io" "os" + "sync" "time" "github.com/elastic/beats/filebeat/harvester/source" @@ -17,12 +18,12 @@ type LogFile struct { lastTimeRead time.Time backoff time.Duration done chan struct{} + singleClose sync.Once } func NewLogFile( fs source.FileSource, config harvesterConfig, - done chan struct{}, ) (*LogFile, error) { var offset int64 if seeker, ok := fs.(io.Seeker); ok { @@ -39,7 +40,7 @@ func NewLogFile( config: config, lastTimeRead: time.Now(), backoff: config.Backoff, - done: done, + done: make(chan struct{}), }, nil } @@ -164,3 +165,11 @@ func (r *LogFile) wait() { } } } + +func (r *LogFile) Close() { + // Make sure reader is only closed once + r.singleClose.Do(func() { + close(r.done) + // Note: File reader is not closed here because that leads to race conditions + }) +} diff --git a/filebeat/prospector/prospector.go b/filebeat/prospector/prospector.go index f90011dd11a..33d01b4b16f 100644 --- a/filebeat/prospector/prospector.go +++ b/filebeat/prospector/prospector.go @@ -112,6 +112,7 @@ func (p *Prospector) Run() { if p.config.CleanInactive > 0 { event.State.TTL = p.config.CleanInactive } + select { case <-p.done: logp.Info("Prospector channel stopped") diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index ffe95c9b5cf..49153793d1e 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -22,6 +22,7 @@ filebeat.prospectors: close_removed: {{close_removed}} close_renamed: {{close_renamed}} close_eof: {{close_eof}} + close_timeout: {{close_timeout}} force_close_files: {{force_close_files}} clean_inactive: {{clean_inactive}} clean_removed: {{clean_removed}} diff --git a/filebeat/tests/system/test_harvester.py b/filebeat/tests/system/test_harvester.py index a93c73025b9..b4439c039b7 100644 --- a/filebeat/tests/system/test_harvester.py +++ b/filebeat/tests/system/test_harvester.py @@ -366,3 +366,44 @@ def test_truncated_file_closed(self): max_timeout=15) filebeat.check_kill_and_wait() + + def test_close_timeout(self): + """ + Checks that a file is closed after close_timeout + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/test.log", + close_timeout="1s", + scan_frequency="1s" + ) + os.mkdir(self.working_dir + "/log/") + + filebeat = self.start_beat() + + testfile1 = self.working_dir + "/log/test.log" + file = open(testfile1, 'w') + + # Write 1000 lines with a sleep between each line to make sure it takes more then 1s to complete + iterations1 = 1000 + for n in range(0, iterations1): + file.write("example data") + file.write("\n") + time.sleep(0.001) + + file.close() + + # Wait until harvester is closed because of ttl + self.wait_until( + lambda: self.log_contains( + "Closing harvester because close_timeout was reached"), + max_timeout=15) + + filebeat.check_kill_and_wait() + + data = self.get_registry() + assert len(data) == 1 + + # Check that not all but some lines were read + assert self.output_lines() < 1000 + assert self.output_lines() > 0 + diff --git a/filebeat/tests/system/test_multiline.py b/filebeat/tests/system/test_multiline.py index 565b0b7a337..0847f0b4d19 100644 --- a/filebeat/tests/system/test_multiline.py +++ b/filebeat/tests/system/test_multiline.py @@ -1,5 +1,6 @@ from filebeat import BaseTest import os +import time """ Tests for the multiline log messages @@ -115,9 +116,6 @@ def test_rabbitmq_multiline_log(self): # Check that output file has the same number of lines as the log file assert 3 == len(output) - - - def test_max_lines(self): """ Test the maximum number of lines that is sent by multiline @@ -238,3 +236,58 @@ def test_max_bytes(self): # Check that output file has the same number of lines as the log file assert 20 == len(output) + + def test_close_timeout_with_multiline(self): + """ + Test if multiline events are split up with close_timeout + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + multiline=True, + pattern="^\[", + negate="true", + match="after", + close_timeout="1s", + ) + + os.mkdir(self.working_dir + "/log/") + + testfile = self.working_dir + "/log/test.log" + + with open(testfile, 'w', 0) as file: + file.write("[2015] hello world") + file.write("\n") + file.write(" First Line\n") + file.write(" Second Line\n") + + proc = self.start_beat() + + # Wait until harvester is closed because of timeout + # This leads to the partial event above to be sent + self.wait_until( + lambda: self.log_contains( + "Closing harvester because close_timeout was reached"), + max_timeout=15) + + # Because of the timeout the following two lines should be put together + with open(testfile, 'a', 0) as file: + file.write(" This should not be third\n") + file.write(" This should not be fourth\n") + # This starts a new pattern + file.write("[2016] Hello world\n") + # This line should be appended + file.write(" First line again\n") + + self.wait_until( + lambda: self.output_has(lines=3), + max_timeout=10) + proc.check_kill_and_wait() + + # close_timeout must have closed the reader exactly twice + self.wait_until( + lambda: self.log_contains_count( + "Closing harvester because close_timeout was reached") == 2, + max_timeout=15) + + output = self.read_output() + assert 3 == len(output)