Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/2108 - csv parser #4439

Merged
merged 27 commits into from
Aug 24, 2018
Merged

Feature/2108 - csv parser #4439

merged 27 commits into from
Aug 24, 2018

Conversation

maxunt
Copy link
Contributor

@maxunt maxunt commented Jul 19, 2018

closes: #2108

Required for all PRs:

  • Signed CLA.
  • Associated README.md updated.
  • Has appropriate unit tests.

if p.Delimiter != "" {
runeStr := []rune(p.Delimiter)
if len(runeStr) > 1 {
return nil, fmt.Errorf("delimiter must be a single character, got: %v", p.Delimiter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pedantic, but can you use the non-default verb to print (%s in this case)

}

for _, fieldName := range p.FieldColumns {
if recordFields[fieldName] == "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using value, ok := recordFields[fieldName] then check if !ok and return, line 115 becomes unnecessary.

return metrics, nil
}

//does not use any information in header and assumes DataColumns is set
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a comment is directly above an exported function, start it with // FunctionName ...

nameColumn string,
timestampColumn string,
timestampFormat string,
defaultTags map[string]string) (Parser, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning an error isn't useful here. Where it's an unexported function not matching any interface, I'd remove it, or remove the function altogether and just instantiate a CSVParser on line 154.

"github.com/influxdata/telegraf/metric"
)

type CSVParser struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From https://www.w3.org/TR/2015/REC-tabular-data-model-20151217/#parsing:

I think you need to allow comments, quote characters, column skipping, row skipping, header row count, and trimming.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like the quote character can be customized when using the Go csv parser, and we would want to stick to this implementation.

The other options sound great, but I don't think they are must have and we could add them later depending on available time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can definitely add features to allow comments and trimming. I know quote characters aren't supported by the go csv parser. Regarding column skipping, there is already functionality for that by simply not adding the column name to either csv_tag_columns or csv_field_columns. The header row count raises a few issues about how a header with more than one line would be interpreted, unless we decide to skip it entirely. It would most likely not mesh well with the function to extract column names from the header. We would have to decide how we want to configure that; I think we could probably pair it with the row skipping configuration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is some clarification on how these options should work:

The csv_skip_rows option is an integer that controls the number of lines at the beginning of the file that should be skipped over. I would think you would want a bufio.Reader so you can call ReadLine() this many times before passing this reader into csv.Reader.

The csv_skip_columns option is an integer that controls the number of columns, from the left, to skip over.

Finally csv_header_row_count would replace csv_header, it would be an integer that is the number of rows to treat as the column names, the values would be concatenated for each column. This is applied after you csv_skip_rows, here is an example:

foo,bar
1,2,3

This would produce the column names: ["foo1", "bar1", "3"]. Make sure to allow for lines of differing length.


//does not use any information in header and assumes DataColumns is set
func (p *CSVParser) ParseLine(line string) (telegraf.Metric, error) {
r := bytes.NewReader([]byte(line))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ParseLine does not do the same validation as Parse; I don't see delimiter set, for example.

Perhaps extract building the csv reading into a new function that both parse and parseline use.

@danielnelson
Copy link
Contributor

@maxunt Don't forget to rebase/merge so that the unrelated grok documentation is no longer present.

## By default, this is the name of the plugin
## the `name_override` config overrides this
# csv_name_column = ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call this csv_measurement_column

## Columns listed here will be added as fields
## the field type is infered from the value of the field
csv_field_columns = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add all non-tag columns as fields. If someone wants to skip a field they can use fieldpass/fielddrop

## as there are columns of data
## If `csv_header` is set to false, this config must be used
csv_data_columns = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call this csv_column_names

val, _ := strconv.ParseBool(str.Value)
c.CSVTrimSpace = val
} else {
//for config with quotes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to have these else clauses, if its not a bool then it should be an error. This is actually a bug throughout this function, when the type is wrong for the field name it looks like currently we delete the field, when we should return an error and refuse to start Telegraf.

"github.com/influxdata/telegraf/metric"
)

type CSVParser struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is some clarification on how these options should work:

The csv_skip_rows option is an integer that controls the number of lines at the beginning of the file that should be skipped over. I would think you would want a bufio.Reader so you can call ReadLine() this many times before passing this reader into csv.Reader.

The csv_skip_columns option is an integer that controls the number of columns, from the left, to skip over.

Finally csv_header_row_count would replace csv_header, it would be an integer that is the number of rows to treat as the column names, the values would be concatenated for each column. This is applied after you csv_skip_rows, here is an example:

foo,bar
1,2,3

This would produce the column names: ["foo1", "bar1", "3"]. Make sure to allow for lines of differing length.

require.NoError(t, err2)

//deep equal fields
require.True(t, reflect.DeepEqual(goodMetric.Fields(), returnedMetric.Fields()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

require.Equal(t, goodMetric.Fields(), returnedMetric.Fields())

Call these expected/actual, or want/got.


metrics, err := p.Parse([]byte(testCSV))
require.NoError(t, err)
require.Equal(t, true, reflect.DeepEqual(expectedFields, metrics[0].Fields()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

require.Equal(t, expectedFields, metrics[0].Fields())

Check other tests and make sure you are using this everywhere.

metrics, err := p.Parse([]byte(testCSV))
for k := range metrics[0].Fields() {
log.Printf("want: %v, %T", expectedFields[k], expectedFields[k])
log.Printf("got: %v, %T", metrics[0].Fields()[k], metrics[0].Fields()[k])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boo! no logging in tests

}

func (p *CSVParser) parseRecord(record []string) (telegraf.Metric, error) {
recordFields := make(map[string]string)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you won't need this intermediate map if you make fields implicit.

// attempt type conversions
if iValue, err := strconv.Atoi(value); err == nil {
fields[fieldName] = iValue
} else if fValue, err := strconv.ParseFloat(value, 64); err == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will require all floats to have a decimal part to avoid type mismatch errors. @goller Is this going to work for us in the future?

@@ -2,6 +2,17 @@

Telegraf is able to parse the following input data formats into metrics:

<<<<<<< HEAD
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file needs fixed up due to merge issues. I just merged your updates to the JSON parser so you may need to update again too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im not sure i correctly followed your new format for the INPUT_DATA_FORMATS file when i resolved the merge conflict if you could take a look at that. I think the csv is missing the proper link to its section

//concatenate header names
for i := range header {
name := header[i]
name = strings.Trim(name, " ")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may not want to trim the strings.

csvReader := csv.NewReader(r)
// ensures that the reader reads records of different lengths without an error
csvReader.FieldsPerRecord = -1
if p.Delimiter != "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's compute the effective delimiter and comment values as part of a New() function, then when Parse is called we can create the reader without needing to worry to redo this work.

}
} else if p.HeaderRowCount == 0 && len(p.ColumnNames) == 0 {
// if there is no header and no DataColumns, that's an error
return nil, fmt.Errorf("there must be a header if `csv_data_columns` is not specified")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test can also go in the New function.


// if there is nothing in DataColumns, ParseLine will fail
if len(p.ColumnNames) == 0 {
return nil, fmt.Errorf("[parsers.csv] data columns must be specified")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can go in New function

for i, fieldName := range p.ColumnNames {
if i < len(record) {
value := record[i]
value = strings.Trim(value, " ")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we want to trim

// will default to plugin name
measurementName := p.MetricName
if recordFields[p.MeasurementColumn] != nil {
measurementName = recordFields[p.MeasurementColumn].(string)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could panic if the column is not a string, perhaps we should pull the value from record?

if recordFields[tagName] == nil {
return nil, fmt.Errorf("could not find field: %v", tagName)
}
tags[tagName] = recordFields[tagName].(string)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will panic if not a string.

if recordFields[p.TimestampColumn] == nil {
return nil, fmt.Errorf("timestamp column: %v could not be found", p.TimestampColumn)
}
tStr := recordFields[p.TimestampColumn].(string)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could panic, maybe use record with two return value form:

timeColumn := record[p.TimestampColumn]
if timeColumn != nil {
    //
}

timeString, ok := col.(string); ok {
}

Might be easier to deal with errors if you put this into a function.

return nil, fmt.Errorf("timestamp column: %v could not be found", p.TimestampColumn)
}
tStr := recordFields[p.TimestampColumn].(string)
if p.TimestampFormat == "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can verify this in the New function, and then treat this as a struct invariant.

@danielnelson danielnelson added this to the 1.8.0 milestone Aug 24, 2018
@danielnelson danielnelson merged commit 889745a into master Aug 24, 2018
@danielnelson danielnelson deleted the feature/2108 branch August 24, 2018 23:40
rgitzel pushed a commit to rgitzel/telegraf that referenced this pull request Oct 17, 2018
otherpirate pushed a commit to otherpirate/telegraf that referenced this pull request Mar 15, 2019
otherpirate pushed a commit to otherpirate/telegraf that referenced this pull request Mar 15, 2019
dupondje pushed a commit to dupondje/telegraf that referenced this pull request Apr 22, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

CSV data input format (for use with exec, tail)
4 participants