Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

Port CSV operator from stanza #123

Merged
merged 14 commits into from
May 17, 2021
142 changes: 142 additions & 0 deletions docs/operators/csv_parser.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
## `csv_parser` operator

The `csv_parser` operator parses the string-type field selected by `parse_from` with the given header values.

### Configuration Fields

| Field | Default | Description |
| --- | --- | --- |
| `id` | `csv_parser` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `header` | required | A string of delimited field names. The values in the delimited header will be used as keys |
| `delimiter` | `,` | A character that will be used as a delimiter. Values `\r` and `\n` cannot be used as a delimiter |
| `parse_from` | $body | A [field](/docs/types/field.md) that indicates the field to be parsed |
| `parse_to` | $body | A [field](/docs/types/field.md) that indicates the field to be parsed |
| `preserve_to` | | Preserves the unparsed value at the specified [field](/docs/types/field.md) |
| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md) |
| `timestamp` | `nil` | An optional [timestamp](/docs/types/timestamp.md) block which will parse a timestamp field before passing the entry to the output operator |
| `severity` | `nil` | An optional [severity](/docs/types/severity.md) block which will parse a severity field before passing the entry to the output operator |

### Example Configurations

#### Parse the field `message` with a csv parser

Configuration:

```yaml
- type: csv_parser
header: id,severity,message
```

<table>
<tr><td> Input Entry </td> <td> Output Entry </td></tr>
<tr>
<td>

```json
{
"body": "1,debug,Debug Message"
}
```

</td>
<td>

```json
{
"body": {
"id": "1",
"severity": "debug",
"message": "Debug Message"
}
}
```

</td>
</tr>
</table>

#### Parse the field `message` with a csv parser using tab delimiter

Configuration:

```yaml
- type: csv_parser
parse_from: message
header: id,severity,message
delimiter: "\t"
```

<table>
<tr><td> Input Entry </td> <td> Output Entry </td></tr>
<tr>
<td>

```json
{
"body": {
"message": "1 debug Debug Message"
}
}
```

</td>
<td>

```json
{
"body": {
"id": "1",
"severity": "debug",
"message": "Debug Message"
}
}
```

</td>
</tr>
</table>

#### Parse the field `message` with csv parser and also parse the timestamp

Configuration:

```yaml
- type: csv_parser
header: 'timestamp_field,severity,message'
timestamp:
parse_from: timestamp_field
layout_type: strptime
layout: '%Y-%m-%d'
```

<table>
<tr><td> Input Entry </td> <td> Output Entry </td></tr>
<tr>
<td>

```json
{
"timestamp": "",
"body": {
"message": "2021-03-17,debug,Debug Message"
}
}
```

</td>
<td>

```json
{
"timestamp": "2021-03-17T00:00:00-00:00",
"body": {
"severity": "debug",
"message": "Debug Message"
}
}
```

</td>
</tr>
</table>
70 changes: 70 additions & 0 deletions operator/builtin/parser/csv/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package csv

import (
"testing"

"github.com/open-telemetry/opentelemetry-log-collection/entry"
"github.com/open-telemetry/opentelemetry-log-collection/operator/helper"
"github.com/open-telemetry/opentelemetry-log-collection/operator/helper/operatortest"
)

func TestJSONParserConfig(t *testing.T) {
cases := []operatortest.ConfigUnmarshalTest{
{
Name: "basic",
Expect: func() *CSVParserConfig {
p := defaultCfg()
p.Header = "id,severity,message"
p.ParseFrom = entry.NewBodyField("message")
return p
}(),
},
{
Name: "delimiter",
Expect: func() *CSVParserConfig {
p := defaultCfg()
p.Header = "id,severity,message"
p.ParseFrom = entry.NewBodyField("message")
p.FieldDelimiter = "\t"
return p
}(),
},
{
Name: "timestamp",
Expect: func() *CSVParserConfig {
p := defaultCfg()
p.Header = "timestamp_field,severity,message"
newTime := helper.NewTimeParser()
p.TimeParser = &newTime
parseFrom := entry.NewBodyField("timestamp_field")
p.TimeParser.ParseFrom = &parseFrom
p.TimeParser.LayoutType = "strptime"
p.TimeParser.Layout = "%Y-%m-%d"
return p
}(),
},
}

for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
tc.Run(t, defaultCfg())
})
}
}

func defaultCfg() *CSVParserConfig {
return NewCSVParserConfig("json_parser")
}
125 changes: 125 additions & 0 deletions operator/builtin/parser/csv/csv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package csv

import (
"context"
csvparser "encoding/csv"
"fmt"
"strings"

"github.com/open-telemetry/opentelemetry-log-collection/entry"
"github.com/open-telemetry/opentelemetry-log-collection/operator"
"github.com/open-telemetry/opentelemetry-log-collection/operator/helper"
)

func init() {
operator.Register("csv_parser", func() operator.Builder { return NewCSVParserConfig("") })
}

// NewCSVParserConfig creates a new csv parser config with default values
func NewCSVParserConfig(operatorID string) *CSVParserConfig {
return &CSVParserConfig{
ParserConfig: helper.NewParserConfig(operatorID, "csv_parser"),
}
}

// CSVParserConfig is the configuration of a csv parser operator.
type CSVParserConfig struct {
helper.ParserConfig `yaml:",inline"`

Header string `json:"header" yaml:"header"`
FieldDelimiter string `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`
}

// Build will build a csv parser operator.
func (c CSVParserConfig) Build(context operator.BuildContext) ([]operator.Operator, error) {
parserOperator, err := c.ParserConfig.Build(context)
if err != nil {
return nil, err
}

if c.Header == "" {
return nil, fmt.Errorf("Missing required field 'header'")
}

if c.FieldDelimiter == "" {
c.FieldDelimiter = ","
}

if len([]rune(c.FieldDelimiter)) != 1 {
return nil, fmt.Errorf("Invalid 'delimiter': '%s'", c.FieldDelimiter)
}

fieldDelimiter := []rune(c.FieldDelimiter)[0]

if !strings.Contains(c.Header, c.FieldDelimiter) {
return nil, fmt.Errorf("missing field delimiter in header")
}

numFields := len(strings.Split(c.Header, c.FieldDelimiter))

delimiterStr := string([]rune{fieldDelimiter})
Mrod1598 marked this conversation as resolved.
Show resolved Hide resolved
csvParser := &CSVParser{
ParserOperator: parserOperator,
header: strings.Split(c.Header, delimiterStr),
fieldDelimiter: fieldDelimiter,
numFields: numFields,
}

return []operator.Operator{csvParser}, nil
}

// CSVParser is an operator that parses csv in an entry.
type CSVParser struct {
helper.ParserOperator
header []string
fieldDelimiter rune
numFields int
}

// Process will parse an entry for csv.
func (r *CSVParser) Process(ctx context.Context, entry *entry.Entry) error {
return r.ParserOperator.ProcessWith(ctx, entry, r.parse)
}

// parse will parse a value using the supplied csv header.
func (r *CSVParser) parse(value interface{}) (interface{}, error) {
var csvLine string
switch val := value.(type) {
case string:
csvLine = val
case []byte:
csvLine = string(val)
default:
return nil, fmt.Errorf("type '%T' cannot be parsed as csv", value)
}

reader := csvparser.NewReader(strings.NewReader(csvLine))
reader.Comma = r.fieldDelimiter
reader.FieldsPerRecord = r.numFields
parsedValues := make(map[string]interface{})

record, err := reader.Read()

if err != nil {
return nil, err
}

for i, key := range r.header {
parsedValues[key] = record[i]
}
djaglowski marked this conversation as resolved.
Show resolved Hide resolved

return parsedValues, nil
}
Loading