diff --git a/docs/operators/csv_parser.md b/docs/operators/csv_parser.md
new file mode 100644
index 00000000..02ef5457
--- /dev/null
+++ b/docs/operators/csv_parser.md
@@ -0,0 +1,142 @@
+## `csv_parser` operator
+
+The `csv_parser` operator parses the string-type field selected by `parse_from` with the given header values.
+
+### Configuration Fields
+
+| Field | Default | Description |
+| --- | --- | --- |
+| `id` | `csv_parser` | A unique identifier for the operator |
+| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
+| `header` | required | A string of delimited field names. The values in the delimited header will be used as keys |
+| `delimiter` | `,` | A character that will be used as a delimiter. Values `\r` and `\n` cannot be used as a delimiter |
+| `parse_from` | $body | A [field](/docs/types/field.md) that indicates the field to be parsed |
+| `parse_to` | $body | A [field](/docs/types/field.md) that indicates the field to be parsed |
+| `preserve_to` | | Preserves the unparsed value at the specified [field](/docs/types/field.md) |
+| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md) |
+| `timestamp` | `nil` | An optional [timestamp](/docs/types/timestamp.md) block which will parse a timestamp field before passing the entry to the output operator |
+| `severity` | `nil` | An optional [severity](/docs/types/severity.md) block which will parse a severity field before passing the entry to the output operator |
+
+### Example Configurations
+
+#### Parse the field `message` with a csv parser
+
+Configuration:
+
+```yaml
+- type: csv_parser
+ header: id,severity,message
+```
+
+
+ Input Entry | Output Entry |
+
+
+
+```json
+{
+ "body": "1,debug,Debug Message"
+}
+```
+
+ |
+
+
+```json
+{
+ "body": {
+ "id": "1",
+ "severity": "debug",
+ "message": "Debug Message"
+ }
+}
+```
+
+ |
+
+
+
+#### Parse the field `message` with a csv parser using tab delimiter
+
+Configuration:
+
+```yaml
+- type: csv_parser
+ parse_from: message
+ header: id,severity,message
+ delimiter: "\t"
+```
+
+
+ Input Entry | Output Entry |
+
+
+
+```json
+{
+ "body": {
+ "message": "1 debug Debug Message"
+ }
+}
+```
+
+ |
+
+
+```json
+{
+ "body": {
+ "id": "1",
+ "severity": "debug",
+ "message": "Debug Message"
+ }
+}
+```
+
+ |
+
+
+
+#### Parse the field `message` with csv parser and also parse the timestamp
+
+Configuration:
+
+```yaml
+- type: csv_parser
+ header: 'timestamp_field,severity,message'
+ timestamp:
+ parse_from: timestamp_field
+ layout_type: strptime
+ layout: '%Y-%m-%d'
+```
+
+
+ Input Entry | Output Entry |
+
+
+
+```json
+{
+ "timestamp": "",
+ "body": {
+ "message": "2021-03-17,debug,Debug Message"
+ }
+}
+```
+
+ |
+
+
+```json
+{
+ "timestamp": "2021-03-17T00:00:00-00:00",
+ "body": {
+ "severity": "debug",
+ "message": "Debug Message"
+ }
+}
+```
+
+ |
+
+
\ No newline at end of file
diff --git a/operator/builtin/parser/csv/config_test.go b/operator/builtin/parser/csv/config_test.go
new file mode 100644
index 00000000..b1da7ff6
--- /dev/null
+++ b/operator/builtin/parser/csv/config_test.go
@@ -0,0 +1,70 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package csv
+
+import (
+ "testing"
+
+ "github.com/open-telemetry/opentelemetry-log-collection/entry"
+ "github.com/open-telemetry/opentelemetry-log-collection/operator/helper"
+ "github.com/open-telemetry/opentelemetry-log-collection/operator/helper/operatortest"
+)
+
+func TestJSONParserConfig(t *testing.T) {
+ cases := []operatortest.ConfigUnmarshalTest{
+ {
+ Name: "basic",
+ Expect: func() *CSVParserConfig {
+ p := defaultCfg()
+ p.Header = "id,severity,message"
+ p.ParseFrom = entry.NewBodyField("message")
+ return p
+ }(),
+ },
+ {
+ Name: "delimiter",
+ Expect: func() *CSVParserConfig {
+ p := defaultCfg()
+ p.Header = "id,severity,message"
+ p.ParseFrom = entry.NewBodyField("message")
+ p.FieldDelimiter = "\t"
+ return p
+ }(),
+ },
+ {
+ Name: "timestamp",
+ Expect: func() *CSVParserConfig {
+ p := defaultCfg()
+ p.Header = "timestamp_field,severity,message"
+ newTime := helper.NewTimeParser()
+ p.TimeParser = &newTime
+ parseFrom := entry.NewBodyField("timestamp_field")
+ p.TimeParser.ParseFrom = &parseFrom
+ p.TimeParser.LayoutType = "strptime"
+ p.TimeParser.Layout = "%Y-%m-%d"
+ return p
+ }(),
+ },
+ }
+
+ for _, tc := range cases {
+ t.Run(tc.Name, func(t *testing.T) {
+ tc.Run(t, defaultCfg())
+ })
+ }
+}
+
+func defaultCfg() *CSVParserConfig {
+ return NewCSVParserConfig("json_parser")
+}
diff --git a/operator/builtin/parser/csv/csv.go b/operator/builtin/parser/csv/csv.go
new file mode 100644
index 00000000..9ea7a115
--- /dev/null
+++ b/operator/builtin/parser/csv/csv.go
@@ -0,0 +1,125 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package csv
+
+import (
+ "context"
+ csvparser "encoding/csv"
+ "fmt"
+ "strings"
+
+ "github.com/open-telemetry/opentelemetry-log-collection/entry"
+ "github.com/open-telemetry/opentelemetry-log-collection/operator"
+ "github.com/open-telemetry/opentelemetry-log-collection/operator/helper"
+)
+
+func init() {
+ operator.Register("csv_parser", func() operator.Builder { return NewCSVParserConfig("") })
+}
+
+// NewCSVParserConfig creates a new csv parser config with default values
+func NewCSVParserConfig(operatorID string) *CSVParserConfig {
+ return &CSVParserConfig{
+ ParserConfig: helper.NewParserConfig(operatorID, "csv_parser"),
+ }
+}
+
+// CSVParserConfig is the configuration of a csv parser operator.
+type CSVParserConfig struct {
+ helper.ParserConfig `yaml:",inline"`
+
+ Header string `json:"header" yaml:"header"`
+ FieldDelimiter string `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`
+}
+
+// Build will build a csv parser operator.
+func (c CSVParserConfig) Build(context operator.BuildContext) ([]operator.Operator, error) {
+ parserOperator, err := c.ParserConfig.Build(context)
+ if err != nil {
+ return nil, err
+ }
+
+ if c.Header == "" {
+ return nil, fmt.Errorf("Missing required field 'header'")
+ }
+
+ if c.FieldDelimiter == "" {
+ c.FieldDelimiter = ","
+ }
+
+ if len([]rune(c.FieldDelimiter)) != 1 {
+ return nil, fmt.Errorf("Invalid 'delimiter': '%s'", c.FieldDelimiter)
+ }
+
+ fieldDelimiter := []rune(c.FieldDelimiter)[0]
+
+ if !strings.Contains(c.Header, c.FieldDelimiter) {
+ return nil, fmt.Errorf("missing field delimiter in header")
+ }
+
+ numFields := len(strings.Split(c.Header, c.FieldDelimiter))
+
+ delimiterStr := string([]rune{fieldDelimiter})
+ csvParser := &CSVParser{
+ ParserOperator: parserOperator,
+ header: strings.Split(c.Header, delimiterStr),
+ fieldDelimiter: fieldDelimiter,
+ numFields: numFields,
+ }
+
+ return []operator.Operator{csvParser}, nil
+}
+
+// CSVParser is an operator that parses csv in an entry.
+type CSVParser struct {
+ helper.ParserOperator
+ header []string
+ fieldDelimiter rune
+ numFields int
+}
+
+// Process will parse an entry for csv.
+func (r *CSVParser) Process(ctx context.Context, entry *entry.Entry) error {
+ return r.ParserOperator.ProcessWith(ctx, entry, r.parse)
+}
+
+// parse will parse a value using the supplied csv header.
+func (r *CSVParser) parse(value interface{}) (interface{}, error) {
+ var csvLine string
+ switch val := value.(type) {
+ case string:
+ csvLine = val
+ case []byte:
+ csvLine = string(val)
+ default:
+ return nil, fmt.Errorf("type '%T' cannot be parsed as csv", value)
+ }
+
+ reader := csvparser.NewReader(strings.NewReader(csvLine))
+ reader.Comma = r.fieldDelimiter
+ reader.FieldsPerRecord = r.numFields
+ parsedValues := make(map[string]interface{})
+
+ record, err := reader.Read()
+
+ if err != nil {
+ return nil, err
+ }
+
+ for i, key := range r.header {
+ parsedValues[key] = record[i]
+ }
+
+ return parsedValues, nil
+}
diff --git a/operator/builtin/parser/csv/csv_test.go b/operator/builtin/parser/csv/csv_test.go
new file mode 100644
index 00000000..03ad08d0
--- /dev/null
+++ b/operator/builtin/parser/csv/csv_test.go
@@ -0,0 +1,313 @@
+// Copyright The OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package csv
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/open-telemetry/opentelemetry-log-collection/entry"
+ "github.com/open-telemetry/opentelemetry-log-collection/operator"
+ "github.com/open-telemetry/opentelemetry-log-collection/operator/helper"
+ "github.com/open-telemetry/opentelemetry-log-collection/testutil"
+)
+
+var testHeader = "name,sev,msg"
+
+func newTestParser(t *testing.T) *CSVParser {
+ cfg := NewCSVParserConfig("test")
+ cfg.Header = testHeader
+ ops, err := cfg.Build(testutil.NewBuildContext(t))
+ require.NoError(t, err)
+ op := ops[0]
+ return op.(*CSVParser)
+}
+
+func TestCSVParserBuildFailure(t *testing.T) {
+ cfg := NewCSVParserConfig("test")
+ cfg.OnError = "invalid_on_error"
+ _, err := cfg.Build(testutil.NewBuildContext(t))
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "invalid `on_error` field")
+}
+
+func TestCSVParserBuildFailureInvalidDelimiter(t *testing.T) {
+ cfg := NewCSVParserConfig("test")
+ cfg.Header = testHeader
+ cfg.FieldDelimiter = ";;"
+ _, err := cfg.Build(testutil.NewBuildContext(t))
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "Invalid 'delimiter': ';;'")
+}
+
+func TestCSVParserStringFailure(t *testing.T) {
+ parser := newTestParser(t)
+ _, err := parser.parse("invalid")
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "record on line 1: wrong number of fields")
+}
+
+func TestCSVParserByteFailure(t *testing.T) {
+ parser := newTestParser(t)
+ _, err := parser.parse([]byte("invalid"))
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "record on line 1: wrong number of fields")
+}
+
+func TestCSVParserInvalidType(t *testing.T) {
+ parser := newTestParser(t)
+ _, err := parser.parse([]int{})
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "type '[]int' cannot be parsed as csv")
+}
+
+func TestParserCSV(t *testing.T) {
+ cases := []struct {
+ name string
+ configure func(*CSVParserConfig)
+ inputBody interface{}
+ outputBody interface{}
+ }{
+ {
+ "basic",
+ func(p *CSVParserConfig) {
+ p.Header = testHeader
+ },
+ "stanza,INFO,started agent",
+ map[string]interface{}{
+ "name": "stanza",
+ "sev": "INFO",
+ "msg": "started agent",
+ },
+ },
+ {
+ "advanced",
+ func(p *CSVParserConfig) {
+ p.Header = "name;address;age;phone;position"
+ p.FieldDelimiter = ";"
+ },
+ "stanza;Evergreen;1;555-5555;agent",
+ map[string]interface{}{
+ "name": "stanza",
+ "address": "Evergreen",
+ "age": "1",
+ "phone": "555-5555",
+ "position": "agent",
+ },
+ },
+ {
+ "mariadb-audit-log",
+ func(p *CSVParserConfig) {
+ p.Header = "timestamp,serverhost,username,host,connectionid,queryid,operation,database,object,retcode"
+ tp := helper.NewTimeParser()
+ field := entry.NewBodyField("timestamp")
+ tp.ParseFrom = &field
+ tp.LayoutType = "strptime"
+ tp.Layout = "%Y%m%d"
+ p.TimeParser = &tp
+ },
+ "20210316,oiq-int-mysql,load,oiq-int-mysql.bluemedora.localnet,5,0,DISCONNECT,,,0",
+ map[string]interface{}{
+ "serverhost": "oiq-int-mysql",
+ "username": "load",
+ "host": "oiq-int-mysql.bluemedora.localnet",
+ "connectionid": "5",
+ "queryid": "0",
+ "operation": "DISCONNECT",
+ "database": "",
+ "object": "",
+ "retcode": "0",
+ },
+ },
+ {
+ "empty field",
+ func(p *CSVParserConfig) {
+ p.Header = "name,address,age,phone,position"
+ },
+ "stanza,Evergreen,,555-5555,agent",
+ map[string]interface{}{
+ "name": "stanza",
+ "address": "Evergreen",
+ "age": "",
+ "phone": "555-5555",
+ "position": "agent",
+ },
+ },
+ {
+ "tab delimiter",
+ func(p *CSVParserConfig) {
+ p.Header = "name address age phone position"
+ p.FieldDelimiter = "\t"
+ },
+ "stanza Evergreen 1 555-5555 agent",
+ map[string]interface{}{
+ "name": "stanza",
+ "address": "Evergreen",
+ "age": "1",
+ "phone": "555-5555",
+ "position": "agent",
+ },
+ },
+ {
+ "comma in quotes",
+ func(p *CSVParserConfig) {
+ p.Header = "name,address,age,phone,position"
+ },
+ "stanza,\"Evergreen,49508\",1,555-5555,agent",
+ map[string]interface{}{
+ "name": "stanza",
+ "address": "Evergreen,49508",
+ "age": "1",
+ "phone": "555-5555",
+ "position": "agent",
+ },
+ },
+ {
+ "quotes in quotes",
+ func(p *CSVParserConfig) {
+ p.Header = "name,address,age,phone,position"
+ },
+ "\"bob \"\"the man\"\"\",Evergreen,1,555-5555,agent",
+ map[string]interface{}{
+ "name": "bob \"the man\"",
+ "address": "Evergreen",
+ "age": "1",
+ "phone": "555-5555",
+ "position": "agent",
+ },
+ },
+ }
+
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ cfg := NewCSVParserConfig("test")
+ cfg.OutputIDs = []string{"fake"}
+ tc.configure(cfg)
+
+ ops, err := cfg.Build(testutil.NewBuildContext(t))
+ require.NoError(t, err)
+ op := ops[0]
+
+ fake := testutil.NewFakeOutput(t)
+ op.SetOutputs([]operator.Operator{fake})
+
+ entry := entry.New()
+ entry.Body = tc.inputBody
+ err = op.Process(context.Background(), entry)
+ require.NoError(t, err)
+ if cfg.TimeParser != nil {
+ newTime, _ := time.ParseInLocation("20060102", "20210316", entry.Timestamp.Location())
+ require.Equal(t, newTime, entry.Timestamp)
+ }
+ fake.ExpectBody(t, tc.outputBody)
+ })
+ }
+}
+
+func TestParserCSVMultipleBodys(t *testing.T) {
+ t.Run("basic", func(t *testing.T) {
+ cfg := NewCSVParserConfig("test")
+ cfg.OutputIDs = []string{"fake"}
+ cfg.Header = testHeader
+
+ ops, err := cfg.Build(testutil.NewBuildContext(t))
+ require.NoError(t, err)
+ op := ops[0]
+
+ fake := testutil.NewFakeOutput(t)
+ op.SetOutputs([]operator.Operator{fake})
+
+ entry := entry.New()
+ entry.Body = "stanza,INFO,started agent\nstanza,DEBUG,started agent"
+ err = op.Process(context.Background(), entry)
+ require.Nil(t, err, "Expected to parse a single csv record, got '2'")
+ require.NoError(t, err)
+ fake.ExpectBody(t, map[string]interface{}{
+ "name": "stanza",
+ "sev": "INFO",
+ "msg": "started agent",
+ })
+ })
+}
+
+func TestParserCSVInvalidJSONInput(t *testing.T) {
+ t.Run("basic", func(t *testing.T) {
+ cfg := NewCSVParserConfig("test")
+ cfg.OutputIDs = []string{"fake"}
+ cfg.Header = testHeader
+
+ ops, err := cfg.Build(testutil.NewBuildContext(t))
+ require.NoError(t, err)
+ op := ops[0]
+
+ fake := testutil.NewFakeOutput(t)
+ op.SetOutputs([]operator.Operator{fake})
+
+ entry := entry.New()
+ entry.Body = "{\"name\": \"stanza\"}"
+ err = op.Process(context.Background(), entry)
+ require.Nil(t, err, "parse error on line 1, column 1: bare \" in non-quoted-field")
+ fake.ExpectBody(t, "{\"name\": \"stanza\"}")
+ })
+}
+
+func TestBuildParserCSV(t *testing.T) {
+ newBasicCSVParser := func() *CSVParserConfig {
+ cfg := NewCSVParserConfig("test")
+ cfg.OutputIDs = []string{"test"}
+ cfg.Header = "name,position,number"
+ cfg.FieldDelimiter = ","
+ return cfg
+ }
+
+ t.Run("BasicConfig", func(t *testing.T) {
+ c := newBasicCSVParser()
+ _, err := c.Build(testutil.NewBuildContext(t))
+ require.NoError(t, err)
+ })
+
+ t.Run("MissingHeaderField", func(t *testing.T) {
+ c := newBasicCSVParser()
+ c.Header = ""
+ _, err := c.Build(testutil.NewBuildContext(t))
+ require.Error(t, err)
+ })
+
+ t.Run("InvalidHeaderFieldMissingDelimiter", func(t *testing.T) {
+ c := newBasicCSVParser()
+ c.Header = "name"
+ _, err := c.Build(testutil.NewBuildContext(t))
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "missing field delimiter in header")
+ })
+
+ t.Run("InvalidHeaderFieldWrongDelimiter", func(t *testing.T) {
+ c := newBasicCSVParser()
+ c.Header = "name;position;number"
+ _, err := c.Build(testutil.NewBuildContext(t))
+ require.Error(t, err)
+ })
+
+ t.Run("InvalidDelimiter", func(t *testing.T) {
+ c := newBasicCSVParser()
+ c.Header = "name,position,number"
+ c.FieldDelimiter = ":"
+ _, err := c.Build(testutil.NewBuildContext(t))
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "missing field delimiter in header")
+ })
+}
diff --git a/operator/builtin/parser/csv/testdata/basic.yaml b/operator/builtin/parser/csv/testdata/basic.yaml
new file mode 100644
index 00000000..898d6457
--- /dev/null
+++ b/operator/builtin/parser/csv/testdata/basic.yaml
@@ -0,0 +1,3 @@
+type: csv_parser
+parse_from: message
+header: id,severity,message
\ No newline at end of file
diff --git a/operator/builtin/parser/csv/testdata/delimiter.yaml b/operator/builtin/parser/csv/testdata/delimiter.yaml
new file mode 100644
index 00000000..de37e0d6
--- /dev/null
+++ b/operator/builtin/parser/csv/testdata/delimiter.yaml
@@ -0,0 +1,4 @@
+type: csv_parser
+parse_from: message
+header: id,severity,message
+delimiter: "\t"
\ No newline at end of file
diff --git a/operator/builtin/parser/csv/testdata/timestamp.yaml b/operator/builtin/parser/csv/testdata/timestamp.yaml
new file mode 100644
index 00000000..d9cf329f
--- /dev/null
+++ b/operator/builtin/parser/csv/testdata/timestamp.yaml
@@ -0,0 +1,6 @@
+type: csv_parser
+header: timestamp_field,severity,message
+timestamp:
+ parse_from: timestamp_field
+ layout_type: strptime
+ layout: '%Y-%m-%d'
\ No newline at end of file