Skip to content

Commit

Permalink
prospector: cache resolved glob patterns during init (#4269)
Browse files Browse the repository at this point in the history
resolves #4182
  • Loading branch information
7AC authored and ruflin committed May 19, 2017
1 parent 4c24665 commit 8fb1cf9
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha1...master[Check the HEAD d

*Filebeat*
- Fix race condition on harvester stopping with reloading enabled. {issue}3779[3779]
- Fix recursive glob config parsing and resolution across restarts. {pull}4269[4269]

*Heartbeat*

Expand Down
6 changes: 3 additions & 3 deletions filebeat/input/file/glob.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func wildcards(doubleStarPatternDepth uint8, dir string, suffix string) []string
return wildcardList
}

// globPattern detects the use of "**" and expands it to standard glob patterns up to a max depth
func globPatterns(pattern string, doubleStarPatternDepth uint8) ([]string, error) {
// GlobPatterns detects the use of "**" and expands it to standard glob patterns up to a max depth
func GlobPatterns(pattern string, doubleStarPatternDepth uint8) ([]string, error) {
if doubleStarPatternDepth == 0 {
return []string{pattern}, nil
}
Expand Down Expand Up @@ -54,7 +54,7 @@ func globPatterns(pattern string, doubleStarPatternDepth uint8) ([]string, error

// Glob expands '**' patterns into multiple patterns to satisfy https://golang.org/pkg/path/filepath/#Match
func Glob(pattern string, doubleStarPatternDepth uint8) ([]string, error) {
patterns, err := globPatterns(pattern, doubleStarPatternDepth)
patterns, err := GlobPatterns(pattern, doubleStarPatternDepth)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/file/glob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type globPatternsTest struct {

func TestGlobPatterns(t *testing.T) {
for _, test := range globPatternsTests {
patterns, err := globPatterns(test.pattern, 2)
patterns, err := GlobPatterns(test.pattern, 2)
if err != nil {
if test.expectedError {
continue
Expand Down
26 changes: 25 additions & 1 deletion filebeat/prospector/log/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/harvester/reader"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/common/match"
"github.com/elastic/beats/libbeat/logp"
)

var (
Expand Down Expand Up @@ -62,7 +64,7 @@ type config struct {
HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"`
Symlinks bool `config:"symlinks"`
TailFiles bool `config:"tail_files"`
recursiveGlob bool `config:"recursive_glob.enabled"`
RecursiveGlob bool `config:"recursive_glob.enabled"`

// Harvester
BufferSize int `config:"harvester_buffer_size"`
Expand Down Expand Up @@ -124,3 +126,25 @@ func (c *config) Validate() error {

return nil
}

func (c *config) resolvePaths() error {
var paths []string
if !c.RecursiveGlob {
logp.Debug("prospector", "recursive glob disabled")
paths = c.Paths
} else {
logp.Debug("prospector", "recursive glob enabled")
}
for _, path := range c.Paths {
patterns, err := file.GlobPatterns(path, recursiveGlobDepth)
if err != nil {
return err
}
if len(patterns) > 1 {
logp.Debug("prospector", "%q expanded to %#v", path, patterns)
}
paths = append(paths, patterns...)
}
c.Paths = paths
return nil
}
12 changes: 6 additions & 6 deletions filebeat/prospector/log/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Prospector struct {
done chan struct{}
}

// NewLog instantiates a new Log
// NewProspector instantiates a new Log
func NewProspector(cfg *common.Config, states []file.State, outlet channel.Outleter, done chan struct{}) (*Prospector, error) {

p := &Prospector{
Expand All @@ -51,6 +51,10 @@ func NewProspector(cfg *common.Config, states []file.State, outlet channel.Outle
if err := cfg.Unpack(&p.config); err != nil {
return nil, err
}
if err := p.config.resolvePaths(); err != nil {
logp.Err("Failed to resolve paths in config: %+v", err)
return nil, err
}

// Create empty harvester to check if configs are fine
// TODO: Do config validation instead
Expand Down Expand Up @@ -175,11 +179,7 @@ func (p *Prospector) getFiles() map[string]os.FileInfo {
paths := map[string]os.FileInfo{}

for _, path := range p.config.Paths {
depth := uint8(0)
if p.config.recursiveGlob {
depth = recursiveGlobDepth
}
matches, err := file.Glob(path, depth)
matches, err := filepath.Glob(path)
if err != nil {
logp.Err("glob(%s) failed: %v", path, err)
continue
Expand Down
2 changes: 2 additions & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ filebeat.prospectors:
# Paths that should be crawled and fetched
{% if path %}paths:
- {{ path }}{% endif %}
{% if recursive_glob %}recursive_glob.enabled: true
{% endif %}
# Type of the files. Annotated in every documented
scan_frequency: {{scan_frequency | default("0.1s") }}
ignore_older: {{ignore_older}}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/tests/system/test_harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ def test_symlink_rotated(self):

# Check if two different files are in registry
data = self.get_registry()
assert len(data) == 2
assert len(data) == 2, "expected to see 2 entries, got '%s'" % data

def test_symlink_removed(self):
"""
Expand Down
46 changes: 46 additions & 0 deletions filebeat/tests/system/test_prospector.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/usr/bin/env python

from filebeat import BaseTest
import os
import time
Expand Down Expand Up @@ -678,3 +680,47 @@ def test_prospector_filter_includefields(self):
)[0]
assert "message" not in output
assert "offset" in output

def test_restart_recursive_glob(self):
"""
Check that file reading via recursive glob patterns continues after restart
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/**",
scan_frequency="1s",
recursive_glob=True,
)

testfile_dir = os.path.join(self.working_dir, "log", "some", "other", "subdir")
os.makedirs(testfile_dir)
testfile_path = os.path.join(testfile_dir, "input")

filebeat = self.start_beat()

with open(testfile_path, 'w') as testfile:
testfile.write("entry1\n")

self.wait_until(
lambda: self.output_has_message("entry1"),
max_timeout=10,
name="output contains 'entry1'")

filebeat.check_kill_and_wait()

# Append to file
with open(testfile_path, 'a') as testfile:
testfile.write("entry2\n")

filebeat = self.start_beat(output="filebeat2.log")

self.wait_until(
lambda: self.output_has_message("entry2"),
max_timeout=10,
name="output contains 'entry2'")

filebeat.check_kill_and_wait()


if __name__ == '__main__':
import unittest
unittest.main()
9 changes: 8 additions & 1 deletion filebeat/tests/system/test_registrar.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#!/usr/bin/env python
"""Test the registrar"""

import os
import platform
import time
import shutil

from filebeat import BaseTest
from nose.plugins.skip import SkipTest

Expand Down Expand Up @@ -323,7 +325,7 @@ def test_rotating_file_inode(self):

def test_restart_continue(self):
"""
Check that file readining continues after restart
Check that file reading continues after restart
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/input*",
Expand Down Expand Up @@ -1396,3 +1398,8 @@ def test_registrar_files_with_prospector_level_processors(self):
"inode": stat.st_ino,
"device": stat.st_dev,
}, file_state_os)


if __name__ == '__main__':
import unittest
unittest.main()
19 changes: 16 additions & 3 deletions libbeat/tests/system/beat/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
INTEGRATION_TESTS = os.environ.get('INTEGRATION_TESTS', False)


class TimeoutError(Exception):
pass


class Proc(object):
"""
Slim wrapper on subprocess.Popen that redirects
Expand Down Expand Up @@ -279,9 +283,8 @@ def wait_until(self, cond, max_timeout=10, poll_interval=0.1, name="cond"):
start = datetime.now()
while not cond():
if datetime.now() - start > timedelta(seconds=max_timeout):
raise Exception("Timeout waiting for '{}' to be true. "
.format(name) +
"Waited {} seconds.".format(max_timeout))
raise TimeoutError("Timeout waiting for '{}' to be true. ".format(name) +
"Waited {} seconds.".format(max_timeout))
time.sleep(poll_interval)

def get_log(self, logfile=None):
Expand Down Expand Up @@ -351,6 +354,16 @@ def output_has(self, lines, output_file=None):
except IOError:
return False

def output_has_message(self, message, output_file=None):
"""
Returns true if the output has the given message field.
"""
try:
return any(line for line in self.read_output(output_file=output_file, required_fields=["message"])
if line.get("message") == message)
except (IOError, TypeError):
return False

def all_have_fields(self, objs, fields):
"""
Checks that the given list of output objects have
Expand Down

0 comments on commit 8fb1cf9

Please sign in to comment.