Skip to content

Commit

Permalink
feat(inputs.directory_monitor): Traverse sub-directories (#11773)
Browse files Browse the repository at this point in the history
  • Loading branch information
sspaink authored Sep 7, 2022
1 parent dc9abf3 commit f238df2
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 32 deletions.
16 changes: 12 additions & 4 deletions plugins/inputs/directory_monitor/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Directory Monitor Input Plugin

This plugin monitors a single directory (without looking at sub-directories),
This plugin monitors a single directory (traversing sub-directories),
and takes in each file placed in the directory. The plugin will gather all
files in the directory at the configured interval, and parse the ones that
haven't been picked up yet.
Expand All @@ -18,12 +18,15 @@ be guaranteed to finish writing before the `directory_duration_threshold`.
```toml @sample.conf
# Ingests files in a directory and then moves them to a target directory.
[[inputs.directory_monitor]]
## The directory to monitor and read files from.
## The directory to monitor and read files from (including sub-directories if "recursive" is true).
directory = ""
#
## The directory to move finished files to.
## The directory to move finished files to (maintaining directory hierachy from source).
finished_directory = ""
#
## Setting recursive to true will make the plugin recursively walk the directory and process all sub-directories.
# recursive = false
#
## The directory to move files to upon file error.
## If not provided, erroring files will stay in the monitored directory.
# error_directory = ""
Expand Down Expand Up @@ -56,7 +59,7 @@ be guaranteed to finish writing before the `directory_duration_threshold`.
#
## Specify if the file can be read completely at once or if it needs to be read line by line (default).
## Possible values: "line-by-line", "at-once"
# parse_method = "line-by-line"
# parse_method = "line-by-line"
#
## The dataformat to be read from the files.
## Each data format has its own unique set of configuration options, read
Expand All @@ -69,3 +72,8 @@ be guaranteed to finish writing before the `directory_duration_threshold`.

The format of metrics produced by this plugin depends on the content and data
format of the file.

## Example Output

The metrics produced by this plugin depends on the content and data
format of the file.
85 changes: 60 additions & 25 deletions plugins/inputs/directory_monitor/directory_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"time"

Expand All @@ -28,6 +29,7 @@ import (
)

// DO NOT REMOVE THE NEXT TWO LINES! This is required to embed the sampleConfig data.
//
//go:embed sample.conf
var sampleConfig string

Expand All @@ -43,6 +45,7 @@ var (
type DirectoryMonitor struct {
Directory string `toml:"directory"`
FinishedDirectory string `toml:"finished_directory"`
Recursive bool `toml:"recursive"`
ErrorDirectory string `toml:"error_directory"`
FileTag string `toml:"file_tag"`

Expand Down Expand Up @@ -73,31 +76,62 @@ func (*DirectoryMonitor) SampleConfig() string {
}

func (monitor *DirectoryMonitor) Gather(_ telegraf.Accumulator) error {
// Get all files sitting in the directory.
files, err := os.ReadDir(monitor.Directory)
if err != nil {
return fmt.Errorf("unable to monitor the targeted directory: %w", err)
}

for _, file := range files {
filePath := monitor.Directory + "/" + file.Name()

processFile := func(path string, name string) error {
// We've been cancelled via Stop().
if monitor.context.Err() != nil {
//nolint:nilerr // context cancelation is not an error
return nil
return io.EOF
}

stat, err := times.Stat(filePath)
stat, err := times.Stat(path)
if err != nil {
continue
// Don't stop traversing if there is an eror
return nil //nolint:nilerr
}

timeThresholdExceeded := time.Since(stat.AccessTime()) >= time.Duration(monitor.DirectoryDurationThreshold)

// If file is decaying, process it.
if timeThresholdExceeded {
monitor.processFile(file)
monitor.processFile(name, path)
}
return nil
}

if monitor.Recursive {
err := filepath.Walk(monitor.Directory,
func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}

return processFile(path, info.Name())
})
// We've been cancelled via Stop().
if err == io.EOF {
//nolint:nilerr // context cancelation is not an error
return nil
}
if err != nil {
return err
}
} else {
// Get all files sitting in the directory.
files, err := os.ReadDir(monitor.Directory)
if err != nil {
return fmt.Errorf("unable to monitor the targeted directory: %w", err)
}

for _, file := range files {
if file.IsDir() {
continue
}
path := monitor.Directory + "/" + file.Name()
err := processFile(path, file.Name())
// We've been cancelled via Stop().
if err == io.EOF {
//nolint:nilerr // context cancelation is not an error
return nil
}
}
}

Expand Down Expand Up @@ -149,25 +183,19 @@ func (monitor *DirectoryMonitor) Monitor() {
}
}

func (monitor *DirectoryMonitor) processFile(file os.DirEntry) {
if file.IsDir() {
return
}

filePath := monitor.Directory + "/" + file.Name()

func (monitor *DirectoryMonitor) processFile(name string, path string) {
// File must be configured to be monitored, if any configuration...
if !monitor.isMonitoredFile(file.Name()) {
if !monitor.isMonitoredFile(name) {
return
}

// ...and should not be configured to be ignored.
if monitor.isIgnoredFile(file.Name()) {
if monitor.isIgnoredFile(name) {
return
}

select {
case monitor.filesToProcess <- filePath:
case monitor.filesToProcess <- path:
default:
}
}
Expand Down Expand Up @@ -300,8 +328,15 @@ func (monitor *DirectoryMonitor) sendMetrics(metrics []telegraf.Metric) error {
}

func (monitor *DirectoryMonitor) moveFile(filePath string, directory string) {
err := os.Rename(filePath, directory+"/"+filepath.Base(filePath))
basePath := strings.Replace(filePath, monitor.Directory, "", 1)
newPath := filepath.Join(directory, basePath)

err := os.MkdirAll(filepath.Dir(newPath), os.ModePerm)
if err != nil {
monitor.Log.Errorf("Error creating directory hierachy for " + filePath + ". Error: " + err.Error())
}

err = os.Rename(filePath, newPath)
if err != nil {
monitor.Log.Errorf("Error while moving file '" + filePath + "' to another directory. Error: " + err.Error())
}
Expand Down
72 changes: 72 additions & 0 deletions plugins/inputs/directory_monitor/directory_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,3 +456,75 @@ func TestParseCompleteFile(t *testing.T) {
require.Len(t, acc.Metrics, 1)
testutil.RequireMetricEqual(t, testutil.TestMetric(100.1), acc.GetTelegrafMetrics()[0], testutil.IgnoreTime())
}

func TestParseSubdirectories(t *testing.T) {
acc := testutil.Accumulator{}

// Establish process directory and finished directory.
finishedDirectory := t.TempDir()
processDirectory := t.TempDir()

// Init plugin.
r := DirectoryMonitor{
Directory: processDirectory,
FinishedDirectory: finishedDirectory,
Recursive: true,
MaxBufferedMetrics: defaultMaxBufferedMetrics,
FileQueueSize: defaultFileQueueSize,
ParseMethod: "at-once",
}
err := r.Init()
require.NoError(t, err)
r.Log = testutil.Logger{}

r.SetParserFunc(func() (parsers.Parser, error) {
parser := &json.Parser{
NameKey: "name",
TagKeys: []string{"tag1"},
}
err := parser.Init()
return parser, err
})

testJSON := `{
"name": "test1",
"value": 100.1,
"tag1": "value1"
}`

// Write json file to process into the 'process' directory.
testJSONFile := "test.json"
f, err := os.Create(filepath.Join(processDirectory, testJSONFile))
require.NoError(t, err)
_, err = f.WriteString(testJSON)
require.NoError(t, err)
err = f.Close()
require.NoError(t, err)

// Write json file to process into a subdirectory in the the 'process' directory.
err = os.Mkdir(filepath.Join(processDirectory, "sub"), os.ModePerm)
require.NoError(t, err)
f, err = os.Create(filepath.Join(processDirectory, "sub", testJSONFile))
require.NoError(t, err)
_, err = f.WriteString(testJSON)
require.NoError(t, err)
err = f.Close()
require.NoError(t, err)

err = r.Start(&acc)
require.NoError(t, err)
err = r.Gather(&acc)
require.NoError(t, err)
acc.Wait(2)
r.Stop()

require.NoError(t, acc.FirstError())
require.Len(t, acc.Metrics, 2)
testutil.RequireMetricEqual(t, testutil.TestMetric(100.1), acc.GetTelegrafMetrics()[0], testutil.IgnoreTime())

// File should have gone back to the test directory, as we configured.
_, err = os.Stat(filepath.Join(finishedDirectory, testJSONFile))
require.NoError(t, err)
_, err = os.Stat(filepath.Join(finishedDirectory, "sub", testJSONFile))
require.NoError(t, err)
}
9 changes: 6 additions & 3 deletions plugins/inputs/directory_monitor/sample.conf
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
# Ingests files in a directory and then moves them to a target directory.
[[inputs.directory_monitor]]
## The directory to monitor and read files from.
## The directory to monitor and read files from (including sub-directories if "recursive" is true).
directory = ""
#
## The directory to move finished files to.
## The directory to move finished files to (maintaining directory hierachy from source).
finished_directory = ""
#
## Setting recursive to true will make the plugin recursively walk the directory and process all sub-directories.
# recursive = false
#
## The directory to move files to upon file error.
## If not provided, erroring files will stay in the monitored directory.
# error_directory = ""
Expand Down Expand Up @@ -38,7 +41,7 @@
#
## Specify if the file can be read completely at once or if it needs to be read line by line (default).
## Possible values: "line-by-line", "at-once"
# parse_method = "line-by-line"
# parse_method = "line-by-line"
#
## The dataformat to be read from the files.
## Each data format has its own unique set of configuration options, read
Expand Down

0 comments on commit f238df2

Please sign in to comment.