From 6d253c46cc757a48f02e055fc5eaf28238ed2e15 Mon Sep 17 00:00:00 2001 From: Corbin Phelps Date: Tue, 21 Dec 2021 09:44:13 -0500 Subject: [PATCH] Updated CSV Parser to include configurable header_attribute field. (#335) * Updated CSV Parser to include configurable header_attribute field. Updated csv parser to use errors.New instead of fmt.errorf when not needed Added csv config test for header attribute config Updated config readme for csv parser Signed-off-by: Corbin Phelps * Updated with PR comments Signed-off-by: Corbin Phelps * Refactored csv parser to fix race condition Signed-off-by: Corbin Phelps Co-authored-by: Jonathan Wamsley --- docs/operators/csv_parser.md | 94 +++- operator/builtin/parser/csv/config_test.go | 10 + operator/builtin/parser/csv/csv.go | 131 +++-- operator/builtin/parser/csv/csv_test.go | 499 +++++++++++++++--- .../parser/csv/testdata/header_attribute.yaml | 4 + 5 files changed, 596 insertions(+), 142 deletions(-) create mode 100644 operator/builtin/parser/csv/testdata/header_attribute.yaml diff --git a/docs/operators/csv_parser.md b/docs/operators/csv_parser.md index 98859610..a034fdb4 100644 --- a/docs/operators/csv_parser.md +++ b/docs/operators/csv_parser.md @@ -4,19 +4,20 @@ The `csv_parser` operator parses the string-type field selected by `parse_from` ### Configuration Fields -| Field | Default | Description | -| --- | --- | --- | -| `id` | `csv_parser` | A unique identifier for the operator. | -| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. | -| `header` | required | A string of delimited field names. The values in the delimited header will be used as keys. | -| `delimiter` | `,` | A character that will be used as a delimiter. Values `\r` and `\n` cannot be used as a delimiter. | -| `lazy_quotes` | `false` | If true, a quote may appear in an unquoted field and a non-doubled quote may appear in a quoted field. | -| `parse_from` | $body | The [field](/docs/types/field.md) from which the value will be parsed. | -| `parse_to` | $body | The [field](/docs/types/field.md) to which the value will be parsed. | -| `preserve_to` | | Preserves the unparsed value at the specified [field](/docs/types/field.md). | -| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md). | -| `timestamp` | `nil` | An optional [timestamp](/docs/types/timestamp.md) block which will parse a timestamp field before passing the entry to the output operator. | -| `severity` | `nil` | An optional [severity](/docs/types/severity.md) block which will parse a severity field before passing the entry to the output operator. | +| Field | Default | Description | +| --- | --- | --- | +| `id` | `csv_parser` | A unique identifier for the operator. | +| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. | +| `header` | required when `header_attribute` not set | A string of delimited field names | +| `header_attribute` | required when `header` not set | An attribute name to read the header field from, to support dynamic field names | +| `delimiter` | `,` | A character that will be used as a delimiter. Values `\r` and `\n` cannot be used as a delimiter. | +| `lazy_quotes` | `false` | If true, a quote may appear in an unquoted field and a non-doubled quote may appear in a quoted field. | +| `parse_from` | $body | The [field](/docs/types/field.md) from which the value will be parsed. | +| `parse_to` | $body | The [field](/docs/types/field.md) to which the value will be parsed. | +| `preserve_to` | | Preserves the unparsed value at the specified [field](/docs/types/field.md). | +| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md). | +| `timestamp` | `nil` | An optional [timestamp](/docs/types/timestamp.md) block which will parse a timestamp field before passing the entry to the output operator. | +| `severity` | `nil` | An optional [severity](/docs/types/severity.md) block which will parse a severity field before passing the entry to the output operator. | ### Example Configurations @@ -140,4 +141,69 @@ Configuration: - \ No newline at end of file + + +#### Parse the field `message` using dynamic field names + +Dynamic field names can be had when leveraging file_input's `label_regex`. + +Configuration: + +```yaml +- type: file_input + include: + - ./dynamic.log + start_at: beginning + label_regex: '^#(?P.*?): (?P.*)' + +- type: csv_parser + delimiter: "," + header_attribute: Fields +``` + +Input File: + +``` +#Fields: "id,severity,message" +1,debug,Hello +``` + + + + + + + +
Input record Output record
+ +Entry (from file_input): + +```json +{ + "timestamp": "", + "labels": { + "fields": "id,severity,message" + }, + "record": { + "message": "1,debug,Hello" + } +} +``` + + + +```json +{ + "timestamp": "", + "labels": { + "fields": "id,severity,message" + }, + "record": { + "id": "1", + "severity": "debug", + "message": "Hello" + } +} +``` + +
diff --git a/operator/builtin/parser/csv/config_test.go b/operator/builtin/parser/csv/config_test.go index 5e4775ff..2e595b72 100644 --- a/operator/builtin/parser/csv/config_test.go +++ b/operator/builtin/parser/csv/config_test.go @@ -52,6 +52,16 @@ func TestJSONParserConfig(t *testing.T) { return p }(), }, + { + Name: "header_attribute", + Expect: func() *CSVParserConfig { + p := defaultCfg() + p.HeaderAttribute = "header_field" + p.ParseFrom = entry.NewBodyField("message") + p.FieldDelimiter = "\t" + return p + }(), + }, { Name: "timestamp", Expect: func() *CSVParserConfig { diff --git a/operator/builtin/parser/csv/csv.go b/operator/builtin/parser/csv/csv.go index 0fdd3ba4..0db89e22 100644 --- a/operator/builtin/parser/csv/csv.go +++ b/operator/builtin/parser/csv/csv.go @@ -18,6 +18,7 @@ import ( csvparser "encoding/csv" "errors" "fmt" + "io" "strings" "github.com/open-telemetry/opentelemetry-log-collection/entry" @@ -40,9 +41,10 @@ func NewCSVParserConfig(operatorID string) *CSVParserConfig { type CSVParserConfig struct { helper.ParserConfig `yaml:",inline"` - Header string `json:"header" yaml:"header"` - FieldDelimiter string `json:"delimiter,omitempty" yaml:"delimiter,omitempty"` - LazyQuotes bool `json:"lazy_quotes,omitempty" yaml:"lazy_quotes,omitempty"` + Header string `json:"header" yaml:"header"` + HeaderAttribute string `json:"header_attribute" yaml:"header_attribute"` + FieldDelimiter string `json:"delimiter,omitempty" yaml:"delimiter,omitempty"` + LazyQuotes bool `json:"lazy_quotes,omitempty" yaml:"lazy_quotes,omitempty"` } // Build will build a csv parser operator. @@ -52,33 +54,36 @@ func (c CSVParserConfig) Build(context operator.BuildContext) ([]operator.Operat return nil, err } - if c.Header == "" { - return nil, errors.New("missing required field 'header'") - } - if c.FieldDelimiter == "" { c.FieldDelimiter = "," } + fieldDelimiter := []rune(c.FieldDelimiter)[0] + if len([]rune(c.FieldDelimiter)) != 1 { return nil, fmt.Errorf("invalid 'delimiter': '%s'", c.FieldDelimiter) } - fieldDelimiter := []rune(c.FieldDelimiter)[0] - - if !strings.Contains(c.Header, c.FieldDelimiter) { + headers := make([]string, 0) + switch { + case c.Header == "" && c.HeaderAttribute == "": + return nil, errors.New("missing required field 'header' or 'header_attribute'") + case c.Header != "" && c.HeaderAttribute != "": + return nil, errors.New("only one header parameter can be set: 'header' or 'header_attribute'") + case c.Header != "" && !strings.Contains(c.Header, c.FieldDelimiter): return nil, errors.New("missing field delimiter in header") + case c.Header != "": + headers = strings.Split(c.Header, c.FieldDelimiter) } - numFields := len(strings.Split(c.Header, c.FieldDelimiter)) - - delimiterStr := string([]rune{fieldDelimiter}) csvParser := &CSVParser{ - ParserOperator: parserOperator, - header: strings.Split(c.Header, delimiterStr), - fieldDelimiter: fieldDelimiter, - numFields: numFields, - lazyQuotes: c.LazyQuotes, + ParserOperator: parserOperator, + header: headers, + headerAttribute: c.HeaderAttribute, + fieldDelimiter: fieldDelimiter, + lazyQuotes: c.LazyQuotes, + + parse: generateParseFunc(headers, fieldDelimiter, c.LazyQuotes), } return []operator.Operator{csvParser}, nil @@ -87,42 +92,70 @@ func (c CSVParserConfig) Build(context operator.BuildContext) ([]operator.Operat // CSVParser is an operator that parses csv in an entry. type CSVParser struct { helper.ParserOperator - header []string - fieldDelimiter rune - numFields int - lazyQuotes bool + fieldDelimiter rune + header []string + headerAttribute string + lazyQuotes bool + parse parseFunc } -// Process will parse an entry for csv. -func (r *CSVParser) Process(ctx context.Context, entry *entry.Entry) error { - return r.ParserOperator.ProcessWith(ctx, entry, r.parse) -} +type parseFunc func(interface{}) (interface{}, error) -// parse will parse a value using the supplied csv header. -func (r *CSVParser) parse(value interface{}) (interface{}, error) { - var csvLine string - switch val := value.(type) { - case string: - csvLine = val - default: - return nil, fmt.Errorf("type '%T' cannot be parsed as csv", value) +// Process will parse an entry for csv. +func (r *CSVParser) Process(ctx context.Context, e *entry.Entry) error { + parse := r.parse + + // If we have a headerAttribute set we need to dynamically generate our parser function + if r.headerAttribute != "" { + h, ok := e.Attributes[r.headerAttribute] + if !ok { + err := fmt.Errorf("failed to read dynamic header attribute %s", r.headerAttribute) + r.Error(err) + return err + } + headers := strings.Split(h, string([]rune{r.fieldDelimiter})) + parse = generateParseFunc(headers, r.fieldDelimiter, r.lazyQuotes) } - reader := csvparser.NewReader(strings.NewReader(csvLine)) - reader.Comma = r.fieldDelimiter - reader.FieldsPerRecord = r.numFields - reader.LazyQuotes = r.lazyQuotes - parsedValues := make(map[string]interface{}) - - record, err := reader.Read() - - if err != nil { - return nil, err - } + return r.ParserOperator.ProcessWith(ctx, e, parse) +} - for i, key := range r.header { - parsedValues[key] = record[i] +// generateParseFunc returns a parse function for a given header, allowing +// each entry to have a potentially unique set of fields when using dynamic +// field names retrieved from an entry's attribute +func generateParseFunc(headers []string, fieldDelimiter rune, lazyQuotes bool) parseFunc { + return func(value interface{}) (interface{}, error) { + var csvLine string + switch t := value.(type) { + case string: + csvLine += t + case []byte: + csvLine += string(t) + default: + return nil, fmt.Errorf("type '%T' cannot be parsed as csv", value) + } + + reader := csvparser.NewReader(strings.NewReader(csvLine)) + reader.Comma = fieldDelimiter + reader.FieldsPerRecord = len(headers) + reader.LazyQuotes = lazyQuotes + parsedValues := make(map[string]interface{}) + + for { + body, err := reader.Read() + if err == io.EOF { + break + } + + if err != nil { + return nil, err + } + + for i, key := range headers { + parsedValues[key] = body[i] + } + } + + return parsedValues, nil } - - return parsedValues, nil } diff --git a/operator/builtin/parser/csv/csv_test.go b/operator/builtin/parser/csv/csv_test.go index 52d0e8d5..129e6625 100644 --- a/operator/builtin/parser/csv/csv_test.go +++ b/operator/builtin/parser/csv/csv_test.go @@ -22,7 +22,6 @@ import ( "github.com/open-telemetry/opentelemetry-log-collection/entry" "github.com/open-telemetry/opentelemetry-log-collection/operator" - "github.com/open-telemetry/opentelemetry-log-collection/operator/helper" "github.com/open-telemetry/opentelemetry-log-collection/testutil" ) @@ -54,11 +53,20 @@ func TestCSVParserBuildFailureInvalidDelimiter(t *testing.T) { require.Contains(t, err.Error(), "invalid 'delimiter': ';;'") } +func TestCSVParserBuildFailureBadHeaderConfig(t *testing.T) { + cfg := NewCSVParserConfig("test") + cfg.Header = "testheader" + cfg.HeaderAttribute = "testheader" + _, err := cfg.Build(testutil.NewBuildContext(t)) + require.Error(t, err) + require.Contains(t, err.Error(), "only one header parameter can be set: 'header' or 'header_attribute'") +} + func TestCSVParserByteFailure(t *testing.T) { parser := newTestParser(t) _, err := parser.parse([]byte("invalid")) require.Error(t, err) - require.Contains(t, err.Error(), "type '[]uint8' cannot be parsed as csv") + require.Contains(t, err.Error(), "record on line 1: wrong number of fields") } func TestCSVParserStringFailure(t *testing.T) { @@ -77,22 +85,68 @@ func TestCSVParserInvalidType(t *testing.T) { func TestParserCSV(t *testing.T) { cases := []struct { - name string - configure func(*CSVParserConfig) - inputBody interface{} - outputBody interface{} + name string + configure func(*CSVParserConfig) + inputEntry []entry.Entry + outputBody []interface{} + expectBuildErr bool + expectProcessErr bool }{ { "basic", func(p *CSVParserConfig) { p.Header = testHeader }, - "stanza,INFO,started agent", - map[string]interface{}{ - "name": "stanza", - "sev": "INFO", - "msg": "started agent", + []entry.Entry{ + { + Body: "stanza,INFO,started agent", + }, }, + []interface{}{ + map[string]interface{}{ + "name": "stanza", + "sev": "INFO", + "msg": "started agent", + }, + }, + false, + false, + }, + { + "basic-multiple-static-bodies", + func(p *CSVParserConfig) { + p.Header = testHeader + }, + []entry.Entry{ + { + Body: "stanza,INFO,started agent", + }, + { + Body: "stanza,ERROR,agent killed", + }, + { + Body: "kernel,TRACE,oom", + }, + }, + []interface{}{ + map[string]interface{}{ + "name": "stanza", + "sev": "INFO", + "msg": "started agent", + }, + map[string]interface{}{ + "name": "stanza", + "sev": "ERROR", + "msg": "agent killed", + }, + map[string]interface{}{ + "name": "kernel", + "sev": "TRACE", + "msg": "oom", + }, + }, + false, + false, }, { "advanced", @@ -100,52 +154,213 @@ func TestParserCSV(t *testing.T) { p.Header = "name;address;age;phone;position" p.FieldDelimiter = ";" }, - "stanza;Evergreen;1;555-5555;agent", - map[string]interface{}{ - "name": "stanza", - "address": "Evergreen", - "age": "1", - "phone": "555-5555", - "position": "agent", + []entry.Entry{ + { + Body: "stanza;Evergreen;1;555-5555;agent", + }, + }, + []interface{}{ + map[string]interface{}{ + "name": "stanza", + "address": "Evergreen", + "age": "1", + "phone": "555-5555", + "position": "agent", + }, }, + false, + false, + }, + { + "dynamic-fields", + func(p *CSVParserConfig) { + p.HeaderAttribute = "Fields" + p.FieldDelimiter = "," + }, + []entry.Entry{ + { + Attributes: map[string]string{ + "Fields": "name,age,height,number", + }, + Body: "stanza dev,1,400,555-555-5555", + }, + }, + []interface{}{ + map[string]interface{}{ + "name": "stanza dev", + "age": "1", + "height": "400", + "number": "555-555-5555", + }, + }, + false, + false, + }, + { + "dynamic-fields-multiple-entries", + func(p *CSVParserConfig) { + p.HeaderAttribute = "Fields" + p.FieldDelimiter = "," + }, + []entry.Entry{ + { + Attributes: map[string]string{ + "Fields": "name,age,height,number", + }, + Body: "stanza dev,1,400,555-555-5555", + }, + { + Attributes: map[string]string{ + "Fields": "x,y", + }, + Body: "000100,2", + }, + { + Attributes: map[string]string{ + "Fields": "a,b,c,d,e,f", + }, + Body: "1,2,3,4,5,6", + }, + }, + []interface{}{ + map[string]interface{}{ + "name": "stanza dev", + "age": "1", + "height": "400", + "number": "555-555-5555", + }, + map[string]interface{}{ + "x": "000100", + "y": "2", + }, + map[string]interface{}{ + "a": "1", + "b": "2", + "c": "3", + "d": "4", + "e": "5", + "f": "6", + }, + }, + false, + false, + }, + { + "dynamic-fields-tab", + func(p *CSVParserConfig) { + p.HeaderAttribute = "columns" + p.FieldDelimiter = "\t" + }, + []entry.Entry{ + { + Attributes: map[string]string{ + "columns": "name age height number", + }, + Body: "stanza dev 1 400 555-555-5555", + }, + }, + []interface{}{ + map[string]interface{}{ + "name": "stanza dev", + "age": "1", + "height": "400", + "number": "555-555-5555", + }, + }, + false, + false, + }, + { + "dynamic-fields-label-missing", + func(p *CSVParserConfig) { + p.HeaderAttribute = "Fields" + p.FieldDelimiter = "," + }, + []entry.Entry{ + { + Body: "stanza dev,1,400,555-555-5555", + }, + }, + []interface{}{ + map[string]interface{}{ + "name": "stanza dev", + "age": "1", + "height": "400", + "number": "555-555-5555", + }, + }, + false, + true, + }, + { + "missing-header-field", + func(p *CSVParserConfig) { + p.FieldDelimiter = "," + }, + []entry.Entry{ + { + Body: "stanza,1,400,555-555-5555", + }, + }, + []interface{}{ + map[string]interface{}{ + "name": "stanza", + "age": "1", + "height": "400", + "number": "555-555-5555", + }, + }, + true, + false, }, { "mariadb-audit-log", func(p *CSVParserConfig) { p.Header = "timestamp,serverhost,username,host,connectionid,queryid,operation,database,object,retcode" - tp := helper.NewTimeParser() - field := entry.NewBodyField("timestamp") - tp.ParseFrom = &field - tp.LayoutType = "strptime" - tp.Layout = "%Y%m%d" - p.TimeParser = &tp - }, - "20210316,oiq-int-mysql,load,oiq-int-mysql.bluemedora.localnet,5,0,DISCONNECT,,,0", - map[string]interface{}{ - "serverhost": "oiq-int-mysql", - "username": "load", - "host": "oiq-int-mysql.bluemedora.localnet", - "connectionid": "5", - "queryid": "0", - "operation": "DISCONNECT", - "database": "", - "object": "", - "retcode": "0", }, + []entry.Entry{ + { + Body: "20210316 17:08:01,oiq-int-mysql,load,oiq-int-mysql.bluemedora.localnet,5,0,DISCONNECT,,,0", + }, + }, + []interface{}{ + map[string]interface{}{ + "timestamp": "20210316 17:08:01", + "serverhost": "oiq-int-mysql", + "username": "load", + "host": "oiq-int-mysql.bluemedora.localnet", + "connectionid": "5", + "queryid": "0", + "operation": "DISCONNECT", + "database": "", + "object": "", + "retcode": "0", + }, + }, + false, + false, }, { "empty field", func(p *CSVParserConfig) { p.Header = "name,address,age,phone,position" }, - "stanza,Evergreen,,555-5555,agent", - map[string]interface{}{ - "name": "stanza", - "address": "Evergreen", - "age": "", - "phone": "555-5555", - "position": "agent", + []entry.Entry{ + { + Body: "stanza,Evergreen,,555-5555,agent", + }, + }, + []interface{}{ + map[string]interface{}{ + "name": "stanza", + "address": "Evergreen", + "age": "", + "phone": "555-5555", + "position": "agent", + }, }, + false, + false, }, { "tab delimiter", @@ -153,42 +368,155 @@ func TestParserCSV(t *testing.T) { p.Header = "name address age phone position" p.FieldDelimiter = "\t" }, - "stanza Evergreen 1 555-5555 agent", - map[string]interface{}{ - "name": "stanza", - "address": "Evergreen", - "age": "1", - "phone": "555-5555", - "position": "agent", + []entry.Entry{ + { + Body: "stanza Evergreen 1 555-5555 agent", + }, + }, + []interface{}{ + map[string]interface{}{ + "name": "stanza", + "address": "Evergreen", + "age": "1", + "phone": "555-5555", + "position": "agent", + }, }, + false, + false, }, { "comma in quotes", func(p *CSVParserConfig) { p.Header = "name,address,age,phone,position" }, - "stanza,\"Evergreen,49508\",1,555-5555,agent", - map[string]interface{}{ - "name": "stanza", - "address": "Evergreen,49508", - "age": "1", - "phone": "555-5555", - "position": "agent", + []entry.Entry{ + { + Body: "stanza,\"Evergreen,49508\",1,555-5555,agent", + }, }, + []interface{}{ + map[string]interface{}{ + "name": "stanza", + "address": "Evergreen,49508", + "age": "1", + "phone": "555-5555", + "position": "agent", + }, + }, + false, + false, }, { "quotes in quotes", func(p *CSVParserConfig) { p.Header = "name,address,age,phone,position" }, - "\"bob \"\"the man\"\"\",Evergreen,1,555-5555,agent", - map[string]interface{}{ - "name": "bob \"the man\"", - "address": "Evergreen", - "age": "1", - "phone": "555-5555", - "position": "agent", + []entry.Entry{ + { + Body: "\"bob \"\"the man\"\"\",Evergreen,1,555-5555,agent", + }, + }, + []interface{}{ + map[string]interface{}{ + "name": "bob \"the man\"", + "address": "Evergreen", + "age": "1", + "phone": "555-5555", + "position": "agent", + }, + }, + false, + false, + }, + { + "missing-header-delimiter-in-header", + func(p *CSVParserConfig) { + p.Header = "name:age:height:number" + p.FieldDelimiter = "," + }, + []entry.Entry{ + { + Body: "stanza,1,400,555-555-5555", + }, + }, + []interface{}{ + map[string]interface{}{ + "name": "stanza", + "age": "1", + "height": "400", + "number": "555-555-5555", + }, + }, + true, + false, + }, + { + "invalid-delimiter", + func(p *CSVParserConfig) { + // expect []rune of length 1 + p.Header = "name,,age,,height,,number" + p.FieldDelimiter = ",," + }, + []entry.Entry{ + { + Body: "stanza,1,400,555-555-5555", + }, + }, + []interface{}{ + map[string]interface{}{ + "name": "stanza", + "age": "1", + "height": "400", + "number": "555-555-5555", + }, + }, + true, + false, + }, + { + "parse-failure-num-fields-mismatch", + func(p *CSVParserConfig) { + p.Header = "name,age,height,number" + p.FieldDelimiter = "," + }, + []entry.Entry{ + { + Body: "1,400,555-555-5555", + }, + }, + []interface{}{ + map[string]interface{}{ + "name": "stanza", + "age": "1", + "height": "400", + "number": "555-555-5555", + }, + }, + false, + true, + }, + { + "parse-failure-wrong-field-delimiter", + func(p *CSVParserConfig) { + p.Header = "name,age,height,number" + p.FieldDelimiter = "," + }, + []entry.Entry{ + { + Body: "stanza:1:400:555-555-5555", + }, }, + []interface{}{ + map[string]interface{}{ + "name": "stanza", + "age": "1", + "height": "400", + "number": "555-555-5555", + }, + }, + false, + true, }, { "parse-with-lazy-quotes", @@ -197,13 +525,21 @@ func TestParserCSV(t *testing.T) { p.FieldDelimiter = "," p.LazyQuotes = true }, - `stanza "log parser",1,6ft,5`, - map[string]interface{}{ - "name": `stanza "log parser"`, - "age": "1", - "height": "6ft", - "number": "5", + []entry.Entry{ + { + Body: "stanza \"log parser\",1,6ft,5", + }, + }, + []interface{}{ + map[string]interface{}{ + "name": "stanza \"log parser\"", + "age": "1", + "height": "6ft", + "number": "5", + }, }, + false, + false, }, } @@ -214,21 +550,26 @@ func TestParserCSV(t *testing.T) { tc.configure(cfg) ops, err := cfg.Build(testutil.NewBuildContext(t)) + if tc.expectBuildErr { + require.Error(t, err) + return + } require.NoError(t, err) op := ops[0] fake := testutil.NewFakeOutput(t) op.SetOutputs([]operator.Operator{fake}) - entry := entry.New() - entry.Body = tc.inputBody - err = op.Process(context.Background(), entry) - require.NoError(t, err) - if cfg.TimeParser != nil { - newTime, _ := time.ParseInLocation("20060102", "20210316", entry.Timestamp.Location()) - require.Equal(t, newTime, entry.Timestamp) + for i, inputEntry := range tc.inputEntry { + err = op.Process(context.Background(), &inputEntry) + if tc.expectProcessErr { + require.Error(t, err) + return + } + require.NoError(t, err) + + fake.ExpectBody(t, tc.outputBody[i]) } - fake.ExpectBody(t, tc.outputBody) }) } } @@ -252,7 +593,7 @@ func TestParserCSVMultipleBodies(t *testing.T) { require.NoError(t, err) fake.ExpectBody(t, map[string]interface{}{ "name": "stanza", - "sev": "INFO", + "sev": "DEBUG", "msg": "started agent", }) fake.ExpectNoEntry(t, 100*time.Millisecond) diff --git a/operator/builtin/parser/csv/testdata/header_attribute.yaml b/operator/builtin/parser/csv/testdata/header_attribute.yaml new file mode 100644 index 00000000..ed85f303 --- /dev/null +++ b/operator/builtin/parser/csv/testdata/header_attribute.yaml @@ -0,0 +1,4 @@ +type: csv_parser +parse_from: message +header_attribute: header_field +delimiter: "\t"