Skip to content

Commit

Permalink
Updated CSV Parser to include configurable header_attribute field. (o…
Browse files Browse the repository at this point in the history
…pen-telemetry#335)

* Updated CSV Parser to include configurable header_attribute field.

Updated csv parser to use errors.New instead of fmt.errorf when not needed

Added csv config test for header attribute config

Updated config readme for csv parser

Signed-off-by: Corbin Phelps <[email protected]>

* Updated with PR comments

Signed-off-by: Corbin Phelps <[email protected]>

* Refactored csv parser to fix race condition

Signed-off-by: Corbin Phelps <[email protected]>

Co-authored-by: Jonathan Wamsley <[email protected]>
  • Loading branch information
Corbin Phelps and JonathanWamsley authored Dec 21, 2021
1 parent fa36116 commit 6d253c4
Show file tree
Hide file tree
Showing 5 changed files with 596 additions and 142 deletions.
94 changes: 80 additions & 14 deletions docs/operators/csv_parser.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ The `csv_parser` operator parses the string-type field selected by `parse_from`

### Configuration Fields

| Field | Default | Description |
| --- | --- | --- |
| `id` | `csv_parser` | A unique identifier for the operator. |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. |
| `header` | required | A string of delimited field names. The values in the delimited header will be used as keys. |
| `delimiter` | `,` | A character that will be used as a delimiter. Values `\r` and `\n` cannot be used as a delimiter. |
| `lazy_quotes` | `false` | If true, a quote may appear in an unquoted field and a non-doubled quote may appear in a quoted field. |
| `parse_from` | $body | The [field](/docs/types/field.md) from which the value will be parsed. |
| `parse_to` | $body | The [field](/docs/types/field.md) to which the value will be parsed. |
| `preserve_to` | | Preserves the unparsed value at the specified [field](/docs/types/field.md). |
| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md). |
| `timestamp` | `nil` | An optional [timestamp](/docs/types/timestamp.md) block which will parse a timestamp field before passing the entry to the output operator. |
| `severity` | `nil` | An optional [severity](/docs/types/severity.md) block which will parse a severity field before passing the entry to the output operator. |
| Field | Default | Description |
| --- | --- | --- |
| `id` | `csv_parser` | A unique identifier for the operator. |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. |
| `header` | required when `header_attribute` not set | A string of delimited field names |
| `header_attribute` | required when `header` not set | An attribute name to read the header field from, to support dynamic field names |
| `delimiter` | `,` | A character that will be used as a delimiter. Values `\r` and `\n` cannot be used as a delimiter. |
| `lazy_quotes` | `false` | If true, a quote may appear in an unquoted field and a non-doubled quote may appear in a quoted field. |
| `parse_from` | $body | The [field](/docs/types/field.md) from which the value will be parsed. |
| `parse_to` | $body | The [field](/docs/types/field.md) to which the value will be parsed. |
| `preserve_to` | | Preserves the unparsed value at the specified [field](/docs/types/field.md). |
| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md). |
| `timestamp` | `nil` | An optional [timestamp](/docs/types/timestamp.md) block which will parse a timestamp field before passing the entry to the output operator. |
| `severity` | `nil` | An optional [severity](/docs/types/severity.md) block which will parse a severity field before passing the entry to the output operator. |

### Example Configurations

Expand Down Expand Up @@ -140,4 +141,69 @@ Configuration:

</td>
</tr>
</table>
</table>

#### Parse the field `message` using dynamic field names

Dynamic field names can be had when leveraging file_input's `label_regex`.

Configuration:

```yaml
- type: file_input
include:
- ./dynamic.log
start_at: beginning
label_regex: '^#(?P<key>.*?): (?P<value>.*)'

- type: csv_parser
delimiter: ","
header_attribute: Fields
```
Input File:
```
#Fields: "id,severity,message"
1,debug,Hello
```

<table>
<tr><td> Input record </td> <td> Output record </td></tr>
<tr>
<td>

Entry (from file_input):

```json
{
"timestamp": "",
"labels": {
"fields": "id,severity,message"
},
"record": {
"message": "1,debug,Hello"
}
}
```

</td>
<td>

```json
{
"timestamp": "",
"labels": {
"fields": "id,severity,message"
},
"record": {
"id": "1",
"severity": "debug",
"message": "Hello"
}
}
```

</td>
</tr>
</table>
10 changes: 10 additions & 0 deletions operator/builtin/parser/csv/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ func TestJSONParserConfig(t *testing.T) {
return p
}(),
},
{
Name: "header_attribute",
Expect: func() *CSVParserConfig {
p := defaultCfg()
p.HeaderAttribute = "header_field"
p.ParseFrom = entry.NewBodyField("message")
p.FieldDelimiter = "\t"
return p
}(),
},
{
Name: "timestamp",
Expect: func() *CSVParserConfig {
Expand Down
131 changes: 82 additions & 49 deletions operator/builtin/parser/csv/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
csvparser "encoding/csv"
"errors"
"fmt"
"io"
"strings"

"github.com/open-telemetry/opentelemetry-log-collection/entry"
Expand All @@ -40,9 +41,10 @@ func NewCSVParserConfig(operatorID string) *CSVParserConfig {
type CSVParserConfig struct {
helper.ParserConfig `yaml:",inline"`

Header string `json:"header" yaml:"header"`
FieldDelimiter string `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`
LazyQuotes bool `json:"lazy_quotes,omitempty" yaml:"lazy_quotes,omitempty"`
Header string `json:"header" yaml:"header"`
HeaderAttribute string `json:"header_attribute" yaml:"header_attribute"`
FieldDelimiter string `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`
LazyQuotes bool `json:"lazy_quotes,omitempty" yaml:"lazy_quotes,omitempty"`
}

// Build will build a csv parser operator.
Expand All @@ -52,33 +54,36 @@ func (c CSVParserConfig) Build(context operator.BuildContext) ([]operator.Operat
return nil, err
}

if c.Header == "" {
return nil, errors.New("missing required field 'header'")
}

if c.FieldDelimiter == "" {
c.FieldDelimiter = ","
}

fieldDelimiter := []rune(c.FieldDelimiter)[0]

if len([]rune(c.FieldDelimiter)) != 1 {
return nil, fmt.Errorf("invalid 'delimiter': '%s'", c.FieldDelimiter)
}

fieldDelimiter := []rune(c.FieldDelimiter)[0]

if !strings.Contains(c.Header, c.FieldDelimiter) {
headers := make([]string, 0)
switch {
case c.Header == "" && c.HeaderAttribute == "":
return nil, errors.New("missing required field 'header' or 'header_attribute'")
case c.Header != "" && c.HeaderAttribute != "":
return nil, errors.New("only one header parameter can be set: 'header' or 'header_attribute'")
case c.Header != "" && !strings.Contains(c.Header, c.FieldDelimiter):
return nil, errors.New("missing field delimiter in header")
case c.Header != "":
headers = strings.Split(c.Header, c.FieldDelimiter)
}

numFields := len(strings.Split(c.Header, c.FieldDelimiter))

delimiterStr := string([]rune{fieldDelimiter})
csvParser := &CSVParser{
ParserOperator: parserOperator,
header: strings.Split(c.Header, delimiterStr),
fieldDelimiter: fieldDelimiter,
numFields: numFields,
lazyQuotes: c.LazyQuotes,
ParserOperator: parserOperator,
header: headers,
headerAttribute: c.HeaderAttribute,
fieldDelimiter: fieldDelimiter,
lazyQuotes: c.LazyQuotes,

parse: generateParseFunc(headers, fieldDelimiter, c.LazyQuotes),
}

return []operator.Operator{csvParser}, nil
Expand All @@ -87,42 +92,70 @@ func (c CSVParserConfig) Build(context operator.BuildContext) ([]operator.Operat
// CSVParser is an operator that parses csv in an entry.
type CSVParser struct {
helper.ParserOperator
header []string
fieldDelimiter rune
numFields int
lazyQuotes bool
fieldDelimiter rune
header []string
headerAttribute string
lazyQuotes bool
parse parseFunc
}

// Process will parse an entry for csv.
func (r *CSVParser) Process(ctx context.Context, entry *entry.Entry) error {
return r.ParserOperator.ProcessWith(ctx, entry, r.parse)
}
type parseFunc func(interface{}) (interface{}, error)

// parse will parse a value using the supplied csv header.
func (r *CSVParser) parse(value interface{}) (interface{}, error) {
var csvLine string
switch val := value.(type) {
case string:
csvLine = val
default:
return nil, fmt.Errorf("type '%T' cannot be parsed as csv", value)
// Process will parse an entry for csv.
func (r *CSVParser) Process(ctx context.Context, e *entry.Entry) error {
parse := r.parse

// If we have a headerAttribute set we need to dynamically generate our parser function
if r.headerAttribute != "" {
h, ok := e.Attributes[r.headerAttribute]
if !ok {
err := fmt.Errorf("failed to read dynamic header attribute %s", r.headerAttribute)
r.Error(err)
return err
}
headers := strings.Split(h, string([]rune{r.fieldDelimiter}))
parse = generateParseFunc(headers, r.fieldDelimiter, r.lazyQuotes)
}

reader := csvparser.NewReader(strings.NewReader(csvLine))
reader.Comma = r.fieldDelimiter
reader.FieldsPerRecord = r.numFields
reader.LazyQuotes = r.lazyQuotes
parsedValues := make(map[string]interface{})

record, err := reader.Read()

if err != nil {
return nil, err
}
return r.ParserOperator.ProcessWith(ctx, e, parse)
}

for i, key := range r.header {
parsedValues[key] = record[i]
// generateParseFunc returns a parse function for a given header, allowing
// each entry to have a potentially unique set of fields when using dynamic
// field names retrieved from an entry's attribute
func generateParseFunc(headers []string, fieldDelimiter rune, lazyQuotes bool) parseFunc {
return func(value interface{}) (interface{}, error) {
var csvLine string
switch t := value.(type) {
case string:
csvLine += t
case []byte:
csvLine += string(t)
default:
return nil, fmt.Errorf("type '%T' cannot be parsed as csv", value)
}

reader := csvparser.NewReader(strings.NewReader(csvLine))
reader.Comma = fieldDelimiter
reader.FieldsPerRecord = len(headers)
reader.LazyQuotes = lazyQuotes
parsedValues := make(map[string]interface{})

for {
body, err := reader.Read()
if err == io.EOF {
break
}

if err != nil {
return nil, err
}

for i, key := range headers {
parsedValues[key] = body[i]
}
}

return parsedValues, nil
}

return parsedValues, nil
}
Loading

0 comments on commit 6d253c4

Please sign in to comment.