Skip to content

Commit

Permalink
feat(pkg/csv2lp): add RowSkippedListener to inform about rejected rec…
Browse files Browse the repository at this point in the history
…ords #18742
  • Loading branch information
sranka committed Sep 12, 2020
1 parent b51866e commit 05c8a00
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 0 deletions.
11 changes: 11 additions & 0 deletions pkg/csv2lp/csv2lp.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type CsvToLineReader struct {
dataRowAdded bool
// log CSV data errors to sterr and continue with CSV processing
skipRowOnError bool
// RowSkipped is called when a row is skipped because of data parsing error
RowSkipped func(source *CsvToLineReader, lineError error, row []string)

// reader results
buffer []byte
Expand All @@ -68,6 +70,11 @@ func (state *CsvToLineReader) SkipRowOnError(val bool) *CsvToLineReader {
return state
}

// Comma returns a field delimiter used in an input CSV file
func (state *CsvToLineReader) Comma() rune {
return state.csv.Comma
}

// Read implements io.Reader that returns protocol lines
func (state *CsvToLineReader) Read(p []byte) (n int, err error) {
// state1: finished
Expand Down Expand Up @@ -119,6 +126,10 @@ func (state *CsvToLineReader) Read(p []byte) (n int, err error) {
state.dataRowAdded = true
if err != nil {
lineError := CsvLineError{state.LineNumber, err}
if state.RowSkipped != nil {
state.RowSkipped(state, lineError, row)
continue
}
if state.skipRowOnError {
log.Println(lineError)
continue
Expand Down
34 changes: 34 additions & 0 deletions pkg/csv2lp/csv2lp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,40 @@ func Test_CsvToLineProtocol_SkipRowOnError(t *testing.T) {
require.Equal(t, messages, 2)
}

// Test_CsvToLineProtocol_RowSkipped tests that error rows are reported to configured RowSkippedListener
func Test_CsvToLineProtocol_RowSkipped(t *testing.T) {
var buf bytes.Buffer
log.SetOutput(&buf)
oldFlags := log.Flags()
log.SetFlags(0)
oldPrefix := log.Prefix()
prefix := "::PREFIX::"
log.SetPrefix(prefix)
defer func() {
log.SetOutput(os.Stderr)
log.SetFlags(oldFlags)
log.SetPrefix(oldPrefix)
}()

csv := "sep=;\n_measurement;a|long:strict\n;1\ncpu;2.1\ncpu;3a\n"

reader := CsvToLineProtocol(strings.NewReader(csv)).SkipRowOnError(true)
reader.RowSkipped = func(src *CsvToLineReader, err error, _row []string) {
log.Println(err, string(src.Comma()))
}
// read all the data
ioutil.ReadAll(reader)

out := buf.String()
// fmt.Println(out, string(';'))
// ::PREFIX::line 3: column '_measurement': no measurement supplied
// ::PREFIX::line 4: column 'a': '2.1' cannot fit into long data type
// ::PREFIX::line 5: column 'a': strconv.ParseInt: parsing "3a": invalid syntax
messages := strings.Count(out, prefix)
require.Equal(t, 3, messages)
require.Equal(t, 3, strings.Count(out, ";"))
}

// Test_CsvLineError tests CsvLineError error format
func Test_CsvLineError(t *testing.T) {
var tests = []struct {
Expand Down

0 comments on commit 05c8a00

Please sign in to comment.