From b29aa65277a2712291b8397a2af474574f52f644 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Wed, 1 Jun 2022 17:09:34 +0200 Subject: [PATCH 01/12] fix(inputs/directory_monitor): Add support for multiline file parsing resolves #11012 --- plugins/inputs/directory_monitor/README.md | 6 +- .../directory_monitor/directory_monitor.go | 23 +++++- .../directory_monitor_test.go | 81 ++++++++++++++++--- plugins/inputs/directory_monitor/sample.conf | 4 + 4 files changed, 100 insertions(+), 14 deletions(-) diff --git a/plugins/inputs/directory_monitor/README.md b/plugins/inputs/directory_monitor/README.md index bdcb5045f00f6..a2404befaa27d 100644 --- a/plugins/inputs/directory_monitor/README.md +++ b/plugins/inputs/directory_monitor/README.md @@ -1,7 +1,7 @@ # Directory Monitor Input Plugin This plugin monitors a single directory (without looking at sub-directories), and takes in each file placed in the directory. -The plugin will gather all files in the directory at a configurable interval (`monitor_interval`), and parse the ones that haven't been picked up yet. +The plugin will gather all files in the directory at the configured interval, and parse the ones that haven't been picked up yet. This plugin is intended to read files that are moved or copied to the monitored directory, and thus files should also not be used by another process or else they may fail to be gathered. Please be advised that this plugin pulls files directly after they've been in the directory for the length of the configurable `directory_duration_threshold`, and thus files should not be written 'live' to the monitored directory. If you absolutely must write files directly, they must be guaranteed to finish writing before the `directory_duration_threshold`. @@ -46,6 +46,10 @@ This plugin is intended to read files that are moved or copied to the monitored ## https://docs.influxdata.com/influxdb/cloud/reference/glossary/#series-cardinality # file_tag = "" # + ## Specify if the file can be read completely at once or if it needs to be read line by line (default). + ## Possible values: line-by-line, complete-file + # parse_method = "line-by-line" + # ## The dataformat to be read from the files. ## Each data format has its own unique set of configuration options, read ## more about them here: diff --git a/plugins/inputs/directory_monitor/directory_monitor.go b/plugins/inputs/directory_monitor/directory_monitor.go index 6834d25f471df..49b4bc8f27709 100644 --- a/plugins/inputs/directory_monitor/directory_monitor.go +++ b/plugins/inputs/directory_monitor/directory_monitor.go @@ -20,6 +20,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal/choice" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/csv" @@ -36,6 +37,7 @@ var ( defaultMaxBufferedMetrics = 10000 defaultDirectoryDurationThreshold = config.Duration(0 * time.Millisecond) defaultFileQueueSize = 100000 + defaultParseMethod = "line-by-line" ) type DirectoryMonitor struct { @@ -50,6 +52,7 @@ type DirectoryMonitor struct { DirectoryDurationThreshold config.Duration `toml:"directory_duration_threshold"` Log telegraf.Logger `toml:"-"` FileQueueSize int `toml:"file_queue_size"` + ParseMethod string `toml:"parse_method"` filesInUse sync.Map cancel context.CancelFunc @@ -200,7 +203,7 @@ func (monitor *DirectoryMonitor) ingestFile(filePath string) error { parser, err := monitor.parserFunc() if err != nil { - return fmt.Errorf("E! Creating parser: %s", err.Error()) + return fmt.Errorf("creating parser: %w", err) } // Handle gzipped files. @@ -219,6 +222,19 @@ func (monitor *DirectoryMonitor) ingestFile(filePath string) error { func (monitor *DirectoryMonitor) parseFile(parser parsers.Parser, reader io.Reader, fileName string) error { scanner := bufio.NewScanner(reader) + + switch monitor.ParseMethod { + case "complete-file": + scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) { + if atEOF && len(data) != 0 { + return len(data), data, nil + } + return 0, nil, nil + }) + case "line-by-line": + scanner.Split(bufio.ScanLines) + } + for scanner.Scan() { metrics, err := monitor.parseLine(parser, scanner.Bytes()) if err != nil { @@ -357,6 +373,10 @@ func (monitor *DirectoryMonitor) Init() error { monitor.fileRegexesToIgnore = append(monitor.fileRegexesToIgnore, regex) } + if err := choice.Check(monitor.ParseMethod, []string{"line-by-line", "complete-file"}); err != nil { + return fmt.Errorf("config option parse_method: %w", err) + } + return nil } @@ -368,6 +388,7 @@ func init() { MaxBufferedMetrics: defaultMaxBufferedMetrics, DirectoryDurationThreshold: defaultDirectoryDurationThreshold, FileQueueSize: defaultFileQueueSize, + ParseMethod: defaultParseMethod, } }) } diff --git a/plugins/inputs/directory_monitor/directory_monitor_test.go b/plugins/inputs/directory_monitor/directory_monitor_test.go index 87eefc48a0795..353b92fbfb995 100644 --- a/plugins/inputs/directory_monitor/directory_monitor_test.go +++ b/plugins/inputs/directory_monitor/directory_monitor_test.go @@ -27,8 +27,9 @@ func TestCSVGZImport(t *testing.T) { r := DirectoryMonitor{ Directory: processDirectory, FinishedDirectory: finishedDirectory, - MaxBufferedMetrics: 1000, - FileQueueSize: 100000, + MaxBufferedMetrics: defaultMaxBufferedMetrics, + FileQueueSize: defaultFileQueueSize, + ParseMethod: defaultParseMethod, } err := r.Init() require.NoError(t, err) @@ -91,8 +92,9 @@ func TestMultipleJSONFileImports(t *testing.T) { r := DirectoryMonitor{ Directory: processDirectory, FinishedDirectory: finishedDirectory, - MaxBufferedMetrics: 1000, - FileQueueSize: 1000, + MaxBufferedMetrics: defaultMaxBufferedMetrics, + FileQueueSize: defaultFileQueueSize, + ParseMethod: defaultParseMethod, } err := r.Init() require.NoError(t, err) @@ -140,8 +142,9 @@ func TestFileTag(t *testing.T) { Directory: processDirectory, FinishedDirectory: finishedDirectory, FileTag: "filename", - MaxBufferedMetrics: 1000, - FileQueueSize: 1000, + MaxBufferedMetrics: defaultMaxBufferedMetrics, + FileQueueSize: defaultFileQueueSize, + ParseMethod: defaultParseMethod, } err := r.Init() require.NoError(t, err) @@ -194,8 +197,9 @@ func TestCSVNoSkipRows(t *testing.T) { r := DirectoryMonitor{ Directory: processDirectory, FinishedDirectory: finishedDirectory, - MaxBufferedMetrics: 1000, - FileQueueSize: 100000, + MaxBufferedMetrics: defaultMaxBufferedMetrics, + FileQueueSize: defaultFileQueueSize, + ParseMethod: defaultParseMethod, } err := r.Init() require.NoError(t, err) @@ -262,8 +266,9 @@ func TestCSVSkipRows(t *testing.T) { r := DirectoryMonitor{ Directory: processDirectory, FinishedDirectory: finishedDirectory, - MaxBufferedMetrics: 1000, - FileQueueSize: 100000, + MaxBufferedMetrics: defaultMaxBufferedMetrics, + FileQueueSize: defaultFileQueueSize, + ParseMethod: defaultParseMethod, } err := r.Init() require.NoError(t, err) @@ -332,8 +337,9 @@ func TestCSVMultiHeader(t *testing.T) { r := DirectoryMonitor{ Directory: processDirectory, FinishedDirectory: finishedDirectory, - MaxBufferedMetrics: 1000, - FileQueueSize: 100000, + MaxBufferedMetrics: defaultMaxBufferedMetrics, + FileQueueSize: defaultFileQueueSize, + ParseMethod: defaultParseMethod, } err := r.Init() require.NoError(t, err) @@ -387,3 +393,54 @@ hello,80,test_name2` require.Equal(t, expectedFields, m.Fields) } } + +func TestParseCompleteFile(t *testing.T) { + acc := testutil.Accumulator{} + + // Establish process directory and finished directory. + finishedDirectory := t.TempDir() + processDirectory := t.TempDir() + + // Init plugin. + r := DirectoryMonitor{ + Directory: processDirectory, + FinishedDirectory: finishedDirectory, + MaxBufferedMetrics: defaultMaxBufferedMetrics, + FileQueueSize: defaultFileQueueSize, + ParseMethod: "complete-file", + } + err := r.Init() + require.NoError(t, err) + r.Log = testutil.Logger{} + + parserConfig := parsers.Config{ + DataFormat: "json", + JSONNameKey: "name", + TagKeys: []string{"tag1"}, + } + + r.SetParserFunc(func() (parsers.Parser, error) { + return parsers.NewParser(&parserConfig) + }) + + testJson := `{ + "name": "test1", + "value": 100.1, + "tag1": "value1" + }` + + // Write json file to process into the 'process' directory. + f, _ := os.CreateTemp(processDirectory, "test.json") + f.WriteString(testJson) + f.Close() + + err = r.Start(&acc) + require.NoError(t, err) + err = r.Gather(&acc) + require.NoError(t, err) + acc.Wait(1) + r.Stop() + + require.Len(t, acc.Metrics, 1) + testutil.RequireMetricEqual(t, testutil.TestMetric(100.1), testutil.FromTestMetric(acc.Metrics[0]), testutil.IgnoreTime()) +} diff --git a/plugins/inputs/directory_monitor/sample.conf b/plugins/inputs/directory_monitor/sample.conf index 2b72e90a59f78..c892b6903ce03 100644 --- a/plugins/inputs/directory_monitor/sample.conf +++ b/plugins/inputs/directory_monitor/sample.conf @@ -36,6 +36,10 @@ ## https://docs.influxdata.com/influxdb/cloud/reference/glossary/#series-cardinality # file_tag = "" # + ## Specify if the file can be read completely at once or if it needs to be read line by line (default). + ## Possible values: line-by-line, complete-file + # parse_method = "line-by-line" + # ## The dataformat to be read from the files. ## Each data format has its own unique set of configuration options, read ## more about them here: From 0a399ae3060edbb6bf1f6ec637b950aed0e67230 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Thu, 2 Jun 2022 12:28:43 +0200 Subject: [PATCH 02/12] docs: Remove now incorrect statement --- plugins/inputs/directory_monitor/README.md | 1 - plugins/inputs/directory_monitor/sample.conf | 1 - 2 files changed, 2 deletions(-) diff --git a/plugins/inputs/directory_monitor/README.md b/plugins/inputs/directory_monitor/README.md index a2404befaa27d..c31688160ffd9 100644 --- a/plugins/inputs/directory_monitor/README.md +++ b/plugins/inputs/directory_monitor/README.md @@ -54,6 +54,5 @@ This plugin is intended to read files that are moved or copied to the monitored ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md - ## NOTE: We currently only support parsing newline-delimited JSON. See the format here: https://github.com/ndjson/ndjson-spec data_format = "influx" ``` diff --git a/plugins/inputs/directory_monitor/sample.conf b/plugins/inputs/directory_monitor/sample.conf index c892b6903ce03..df185f428da58 100644 --- a/plugins/inputs/directory_monitor/sample.conf +++ b/plugins/inputs/directory_monitor/sample.conf @@ -44,5 +44,4 @@ ## Each data format has its own unique set of configuration options, read ## more about them here: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md - ## NOTE: We currently only support parsing newline-delimited JSON. See the format here: https://github.com/ndjson/ndjson-spec data_format = "influx" From e9d91d986d89730bb6824e97e03f4dde4941224d Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Fri, 3 Jun 2022 11:22:46 +0200 Subject: [PATCH 03/12] fix(inputs/directory_monitor): Improve tests --- plugins/inputs/directory_monitor/directory_monitor_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/directory_monitor/directory_monitor_test.go b/plugins/inputs/directory_monitor/directory_monitor_test.go index 353b92fbfb995..80d2b4f9ac853 100644 --- a/plugins/inputs/directory_monitor/directory_monitor_test.go +++ b/plugins/inputs/directory_monitor/directory_monitor_test.go @@ -441,6 +441,7 @@ func TestParseCompleteFile(t *testing.T) { acc.Wait(1) r.Stop() + require.NoError(t, acc.FirstError()) require.Len(t, acc.Metrics, 1) - testutil.RequireMetricEqual(t, testutil.TestMetric(100.1), testutil.FromTestMetric(acc.Metrics[0]), testutil.IgnoreTime()) + testutil.RequireMetricEqual(t, testutil.TestMetric(100.1), acc.GetTelegrafMetrics()[0], testutil.IgnoreTime()) } From 2368fe97aff1812ffbbcb96348205baec4ac5fce Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Fri, 3 Jun 2022 17:05:40 +0200 Subject: [PATCH 04/12] fix(inputs/directory_monitor): Linter cleanup --- plugins/inputs/directory_monitor/directory_monitor_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/inputs/directory_monitor/directory_monitor_test.go b/plugins/inputs/directory_monitor/directory_monitor_test.go index 80d2b4f9ac853..a863e1814662e 100644 --- a/plugins/inputs/directory_monitor/directory_monitor_test.go +++ b/plugins/inputs/directory_monitor/directory_monitor_test.go @@ -423,7 +423,7 @@ func TestParseCompleteFile(t *testing.T) { return parsers.NewParser(&parserConfig) }) - testJson := `{ + testJSON := `{ "name": "test1", "value": 100.1, "tag1": "value1" @@ -431,8 +431,8 @@ func TestParseCompleteFile(t *testing.T) { // Write json file to process into the 'process' directory. f, _ := os.CreateTemp(processDirectory, "test.json") - f.WriteString(testJson) - f.Close() + _, _ = f.WriteString(testJSON) + _ = f.Close() err = r.Start(&acc) require.NoError(t, err) From 651395c0c3635fb8c6c373e7797e152f05083541 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Fri, 3 Jun 2022 17:08:53 +0200 Subject: [PATCH 05/12] fix(inputs/directory_monitor): Use standard reader to read complete file --- .../directory_monitor/directory_monitor.go | 51 +++++++++++-------- 1 file changed, 31 insertions(+), 20 deletions(-) diff --git a/plugins/inputs/directory_monitor/directory_monitor.go b/plugins/inputs/directory_monitor/directory_monitor.go index 49b4bc8f27709..981802bbcf291 100644 --- a/plugins/inputs/directory_monitor/directory_monitor.go +++ b/plugins/inputs/directory_monitor/directory_monitor.go @@ -221,54 +221,65 @@ func (monitor *DirectoryMonitor) ingestFile(filePath string) error { } func (monitor *DirectoryMonitor) parseFile(parser parsers.Parser, reader io.Reader, fileName string) error { - scanner := bufio.NewScanner(reader) + if monitor.ParseMethod == "complete-file" { + bytes, err := io.ReadAll(reader) + if err != nil { + return err + } + + metrics, err := monitor.parseMetrics(parser, bytes, fileName) + if err != nil { + return err + } + if err := monitor.sendMetrics(metrics); err != nil { + return err + } + } + + scanner := bufio.NewScanner(reader) switch monitor.ParseMethod { - case "complete-file": - scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) { - if atEOF && len(data) != 0 { - return len(data), data, nil - } - return 0, nil, nil - }) case "line-by-line": scanner.Split(bufio.ScanLines) + case "future-split-type": + scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) { return }) } for scanner.Scan() { - metrics, err := monitor.parseLine(parser, scanner.Bytes()) + metrics, err := monitor.parseMetrics(parser, scanner.Bytes(), fileName) if err != nil { return err } - if monitor.FileTag != "" { - for _, m := range metrics { - m.AddTag(monitor.FileTag, filepath.Base(fileName)) - } - } - if err := monitor.sendMetrics(metrics); err != nil { return err } } - return nil + return scanner.Err() } -func (monitor *DirectoryMonitor) parseLine(parser parsers.Parser, line []byte) ([]telegraf.Metric, error) { +func (monitor *DirectoryMonitor) parseMetrics(parser parsers.Parser, line []byte, fileName string) (metrics []telegraf.Metric, err error) { switch parser.(type) { case *csv.Parser: - m, err := parser.Parse(line) + metrics, err = parser.Parse(line) if err != nil { if errors.Is(err, io.EOF) { return nil, nil } return nil, err } - return m, err default: - return parser.Parse(line) + metrics, err = parser.Parse(line) } + + if monitor.FileTag != "" { + for _, m := range metrics { + m.AddTag(monitor.FileTag, filepath.Base(fileName)) + } + } + + return } func (monitor *DirectoryMonitor) sendMetrics(metrics []telegraf.Metric) error { From 7d91b6e0099d504d9751a8f459c2d7abc8e152a3 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Fri, 3 Jun 2022 17:21:46 +0200 Subject: [PATCH 06/12] chore(inputs/directory_monitor): Linter fixes --- plugins/inputs/directory_monitor/directory_monitor.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/plugins/inputs/directory_monitor/directory_monitor.go b/plugins/inputs/directory_monitor/directory_monitor.go index 981802bbcf291..a28f0dbf5f7aa 100644 --- a/plugins/inputs/directory_monitor/directory_monitor.go +++ b/plugins/inputs/directory_monitor/directory_monitor.go @@ -238,12 +238,13 @@ func (monitor *DirectoryMonitor) parseFile(parser parsers.Parser, reader io.Read } scanner := bufio.NewScanner(reader) + /* To be used when we add a new scanner based parse method switch monitor.ParseMethod { case "line-by-line": scanner.Split(bufio.ScanLines) case "future-split-type": scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) { return }) - } + } */ for scanner.Scan() { metrics, err := monitor.parseMetrics(parser, scanner.Bytes(), fileName) @@ -279,7 +280,7 @@ func (monitor *DirectoryMonitor) parseMetrics(parser parsers.Parser, line []byte } } - return + return metrics, err } func (monitor *DirectoryMonitor) sendMetrics(metrics []telegraf.Metric) error { From 2544f336f24ff89ce5b7a95c32d4298ae8aa4c7f Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Tue, 7 Jun 2022 17:14:48 +0200 Subject: [PATCH 07/12] fix(inputs/directory_monitor): Return early --- plugins/inputs/directory_monitor/directory_monitor.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/plugins/inputs/directory_monitor/directory_monitor.go b/plugins/inputs/directory_monitor/directory_monitor.go index a28f0dbf5f7aa..8f2fd4cd86306 100644 --- a/plugins/inputs/directory_monitor/directory_monitor.go +++ b/plugins/inputs/directory_monitor/directory_monitor.go @@ -235,6 +235,8 @@ func (monitor *DirectoryMonitor) parseFile(parser parsers.Parser, reader io.Read if err := monitor.sendMetrics(metrics); err != nil { return err } + + return nil } scanner := bufio.NewScanner(reader) From a7af221ea7656264c903ae53b6b13aa602fdd692 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Wed, 8 Jun 2022 14:09:25 +0200 Subject: [PATCH 08/12] chore(inputs/directory_monitor): Fix README linter errors --- plugins/inputs/directory_monitor/README.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/directory_monitor/README.md b/plugins/inputs/directory_monitor/README.md index 347b411dc7f3f..09fcbc3a5f540 100644 --- a/plugins/inputs/directory_monitor/README.md +++ b/plugins/inputs/directory_monitor/README.md @@ -2,7 +2,7 @@ This plugin monitors a single directory (without looking at sub-directories), and takes in each file placed in the directory. The plugin will gather all -files in the directory at the configured interval, and parse the ones that +files in the directory at the configured interval, and parse the ones that haven't been picked up yet. This plugin is intended to read files that are moved or copied to the monitored @@ -64,3 +64,8 @@ be guaranteed to finish writing before the `directory_duration_threshold`. ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "influx" ``` + +## Metrics + +The format of metrics produced by this plugin depends on the content and data +format of the file. From 1ddc418b79d3db0543fda4b2c08725f2a44e08e1 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Thu, 9 Jun 2022 17:35:32 +0200 Subject: [PATCH 09/12] chore(inputs/directory_monitor): Cleanup error handling --- plugins/inputs/directory_monitor/directory_monitor.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/plugins/inputs/directory_monitor/directory_monitor.go b/plugins/inputs/directory_monitor/directory_monitor.go index 8f2fd4cd86306..e6072466b86c5 100644 --- a/plugins/inputs/directory_monitor/directory_monitor.go +++ b/plugins/inputs/directory_monitor/directory_monitor.go @@ -232,11 +232,7 @@ func (monitor *DirectoryMonitor) parseFile(parser parsers.Parser, reader io.Read return err } - if err := monitor.sendMetrics(metrics); err != nil { - return err - } - - return nil + return monitor.sendMetrics(metrics) } scanner := bufio.NewScanner(reader) From 2d09cd5137a196ea5c00e5d0374e90dbfdc966c1 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Thu, 9 Jun 2022 17:40:54 +0200 Subject: [PATCH 10/12] fix(inputs/directory_monitor): Rename option value --- plugins/inputs/directory_monitor/README.md | 2 +- plugins/inputs/directory_monitor/directory_monitor.go | 4 ++-- plugins/inputs/directory_monitor/directory_monitor_test.go | 2 +- plugins/inputs/directory_monitor/sample.conf | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/plugins/inputs/directory_monitor/README.md b/plugins/inputs/directory_monitor/README.md index 09fcbc3a5f540..08d0ead074de9 100644 --- a/plugins/inputs/directory_monitor/README.md +++ b/plugins/inputs/directory_monitor/README.md @@ -55,7 +55,7 @@ be guaranteed to finish writing before the `directory_duration_threshold`. # file_tag = "" # ## Specify if the file can be read completely at once or if it needs to be read line by line (default). - ## Possible values: line-by-line, complete-file + ## Possible values: "line-by-line", "at-once" # parse_method = "line-by-line" # ## The dataformat to be read from the files. diff --git a/plugins/inputs/directory_monitor/directory_monitor.go b/plugins/inputs/directory_monitor/directory_monitor.go index e6072466b86c5..e2f88901e6dff 100644 --- a/plugins/inputs/directory_monitor/directory_monitor.go +++ b/plugins/inputs/directory_monitor/directory_monitor.go @@ -221,7 +221,7 @@ func (monitor *DirectoryMonitor) ingestFile(filePath string) error { } func (monitor *DirectoryMonitor) parseFile(parser parsers.Parser, reader io.Reader, fileName string) error { - if monitor.ParseMethod == "complete-file" { + if monitor.ParseMethod == "at-once" { bytes, err := io.ReadAll(reader) if err != nil { return err @@ -383,7 +383,7 @@ func (monitor *DirectoryMonitor) Init() error { monitor.fileRegexesToIgnore = append(monitor.fileRegexesToIgnore, regex) } - if err := choice.Check(monitor.ParseMethod, []string{"line-by-line", "complete-file"}); err != nil { + if err := choice.Check(monitor.ParseMethod, []string{"line-by-line", "at-once"}); err != nil { return fmt.Errorf("config option parse_method: %w", err) } diff --git a/plugins/inputs/directory_monitor/directory_monitor_test.go b/plugins/inputs/directory_monitor/directory_monitor_test.go index a863e1814662e..e76db4cbabe5d 100644 --- a/plugins/inputs/directory_monitor/directory_monitor_test.go +++ b/plugins/inputs/directory_monitor/directory_monitor_test.go @@ -407,7 +407,7 @@ func TestParseCompleteFile(t *testing.T) { FinishedDirectory: finishedDirectory, MaxBufferedMetrics: defaultMaxBufferedMetrics, FileQueueSize: defaultFileQueueSize, - ParseMethod: "complete-file", + ParseMethod: "at-once", } err := r.Init() require.NoError(t, err) diff --git a/plugins/inputs/directory_monitor/sample.conf b/plugins/inputs/directory_monitor/sample.conf index df185f428da58..265fab791f971 100644 --- a/plugins/inputs/directory_monitor/sample.conf +++ b/plugins/inputs/directory_monitor/sample.conf @@ -37,7 +37,7 @@ # file_tag = "" # ## Specify if the file can be read completely at once or if it needs to be read line by line (default). - ## Possible values: line-by-line, complete-file + ## Possible values: "line-by-line", "at-once" # parse_method = "line-by-line" # ## The dataformat to be read from the files. From 3f7a6333b311a405e4f6b27f2b98451f83081393 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Fri, 10 Jun 2022 17:02:16 +0200 Subject: [PATCH 11/12] Restructure parseFile --- .../directory_monitor/directory_monitor.go | 42 +++++++++++-------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/plugins/inputs/directory_monitor/directory_monitor.go b/plugins/inputs/directory_monitor/directory_monitor.go index e2f88901e6dff..7fe46f5f4d853 100644 --- a/plugins/inputs/directory_monitor/directory_monitor.go +++ b/plugins/inputs/directory_monitor/directory_monitor.go @@ -221,28 +221,20 @@ func (monitor *DirectoryMonitor) ingestFile(filePath string) error { } func (monitor *DirectoryMonitor) parseFile(parser parsers.Parser, reader io.Reader, fileName string) error { - if monitor.ParseMethod == "at-once" { - bytes, err := io.ReadAll(reader) - if err != nil { - return err - } + var splitter bufio.SplitFunc - metrics, err := monitor.parseMetrics(parser, bytes, fileName) - if err != nil { - return err - } - - return monitor.sendMetrics(metrics) + // Decide on how to split the file + switch monitor.ParseMethod { + case "at-once": + return monitor.parseAtOnce(parser, reader, fileName) + case "line-by-line": + splitter = bufio.ScanLines + default: + return fmt.Errorf("unknown parse method %q", monitor.ParseMethod) } scanner := bufio.NewScanner(reader) - /* To be used when we add a new scanner based parse method - switch monitor.ParseMethod { - case "line-by-line": - scanner.Split(bufio.ScanLines) - case "future-split-type": - scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) { return }) - } */ + scanner.Split(splitter) for scanner.Scan() { metrics, err := monitor.parseMetrics(parser, scanner.Bytes(), fileName) @@ -258,6 +250,20 @@ func (monitor *DirectoryMonitor) parseFile(parser parsers.Parser, reader io.Read return scanner.Err() } +func (monitor *DirectoryMonitor) parseAtOnce(parser parsers.Parser, reader io.Reader, fileName string) error { + bytes, err := io.ReadAll(reader) + if err != nil { + return err + } + + metrics, err := monitor.parseMetrics(parser, bytes, fileName) + if err != nil { + return err + } + + return monitor.sendMetrics(metrics) +} + func (monitor *DirectoryMonitor) parseMetrics(parser parsers.Parser, line []byte, fileName string) (metrics []telegraf.Metric, err error) { switch parser.(type) { case *csv.Parser: From c55a9bff113ff61711d337ca8519ac97e28ad8e9 Mon Sep 17 00:00:00 2001 From: Thomas Casteleyn Date: Mon, 13 Jun 2022 12:28:36 +0200 Subject: [PATCH 12/12] fix(inputs/directory_monitor): Add test to check default values for the creator --- .../directory_monitor/directory_monitor_test.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/plugins/inputs/directory_monitor/directory_monitor_test.go b/plugins/inputs/directory_monitor/directory_monitor_test.go index e76db4cbabe5d..800ad392cdb92 100644 --- a/plugins/inputs/directory_monitor/directory_monitor_test.go +++ b/plugins/inputs/directory_monitor/directory_monitor_test.go @@ -9,11 +9,28 @@ import ( "github.com/stretchr/testify/require" + "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/parsers" "github.com/influxdata/telegraf/plugins/parsers/csv" "github.com/influxdata/telegraf/testutil" ) +func TestCreator(t *testing.T) { + creator, found := inputs.Inputs["directory_monitor"] + require.True(t, found) + + expected := &DirectoryMonitor{ + FilesToMonitor: defaultFilesToMonitor, + FilesToIgnore: defaultFilesToIgnore, + MaxBufferedMetrics: defaultMaxBufferedMetrics, + DirectoryDurationThreshold: defaultDirectoryDurationThreshold, + FileQueueSize: defaultFileQueueSize, + ParseMethod: defaultParseMethod, + } + + require.Equal(t, expected, creator()) +} + func TestCSVGZImport(t *testing.T) { acc := testutil.Accumulator{} testCsvFile := "test.csv"