Skip to content

Commit

Permalink
feat(cmd/influx/write): add --errors-file option #18742
Browse files Browse the repository at this point in the history
  • Loading branch information
sranka committed Aug 6, 2020
1 parent b8d7dfd commit 6d6b961
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
25 changes: 25 additions & 0 deletions cmd/influx/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"encoding/csv"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -38,6 +39,7 @@ type writeFlagsType struct {
SkipHeader int
IgnoreDataTypeInColumnName bool
Encoding string
ErrorsFile string
}

var writeFlags writeFlagsType
Expand Down Expand Up @@ -86,6 +88,7 @@ func cmdWrite(f *globalFlags, opt genericCLIOpts) *cobra.Command {
cmd.PersistentFlags().BoolVar(&writeFlags.IgnoreDataTypeInColumnName, "xIgnoreDataTypeInColumnName", false, "Ignores dataType which could be specified after ':' in column name")
cmd.PersistentFlags().MarkHidden("xIgnoreDataTypeInColumnName") // should be used only upon explicit advice
cmd.PersistentFlags().StringVar(&writeFlags.Encoding, "encoding", "UTF-8", "Character encoding of input files or stdin")
cmd.PersistentFlags().StringVar(&writeFlags.ErrorsFile, "errors-file", "", "The path to the file to write rejected rows")

cmdDryRun := opt.newCmd("dryrun", fluxWriteDryrunF, false)
cmdDryRun.Args = cobra.MaximumNArgs(1)
Expand Down Expand Up @@ -204,6 +207,27 @@ func (writeFlags *writeFlagsType) createLineReader(ctx context.Context, cmd *cob
}
}

// create writer for errors-file, if supplied
var errorsFile *csv.Writer
var rowSkippedListener func(*csv2lp.CsvToLineReader, error, []string)
if writeFlags.ErrorsFile != "" {
writer, err := os.Create(writeFlags.ErrorsFile)
if err != nil {
return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to create %q: %v", writeFlags.ErrorsFile, err)
}
closers = append(closers, writer)
errorsFile = csv.NewWriter(writer)
rowSkippedListener = func(source *csv2lp.CsvToLineReader, lineError error, row []string) {
log.Println(lineError)
errorsFile.Comma = source.Comma()
errorsFile.Write([]string{fmt.Sprintf("# error : %v", lineError)})
if err := errorsFile.Write(row); err != nil {
log.Printf("Unable to write to error-file: %v\n", err)
}
errorsFile.Flush() // flush is required
}
}

// concatenate readers
r := io.MultiReader(readers...)
if writeFlags.Format == inputFormatCsv {
Expand All @@ -213,6 +237,7 @@ func (writeFlags *writeFlagsType) createLineReader(ctx context.Context, cmd *cob
csvReader.Table.IgnoreDataTypeInColumnName(writeFlags.IgnoreDataTypeInColumnName)
// change LineNumber to report file/stdin line numbers properly
csvReader.LineNumber = writeFlags.SkipHeader - len(writeFlags.Headers)
csvReader.RowSkipped = rowSkippedListener
r = csvReader
}
return r, csv2lp.MultiCloser(closers...), nil
Expand Down
17 changes: 17 additions & 0 deletions cmd/influx/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func readLines(reader io.Reader) []string {

func createTempFile(suffix string, contents []byte) string {
file, err := ioutil.TempFile("", "influx_writeTest*."+suffix)
file.Close() // Close immediatelly, since we need only a file name
if err != nil {
log.Fatal(err)
return "unknown.file"
Expand Down Expand Up @@ -539,3 +540,19 @@ func Test_fluxWriteF(t *testing.T) {
require.Equal(t, "stdin3 i=stdin1,j=stdin2,k=stdin4", strings.Trim(string(lineData), "\n"))
})
}

// Test_writeFlags_errorsFile tests that rejected rows are written to errors file
func Test_writeFlags_errorsFile(t *testing.T) {
defer removeTempFiles()
errorsFile := createTempFile("errors", []byte{})
stdInContents := "_measurement,a|long:strict\nm,1\nm,1.1"
out := bytes.Buffer{}
command := cmdWrite(&globalFlags{}, genericCLIOpts{in: strings.NewReader(stdInContents), w: bufio.NewWriter(&out)})
command.SetArgs([]string{"dryrun", "--format", "csv", "--errors-file", errorsFile})
err := command.Execute()
require.Nil(t, err)
require.Equal(t, "m a=1i", strings.Trim(out.String(), "\n"))
errorLines, err := ioutil.ReadFile(errorsFile)
require.Nil(t, err)
require.Equal(t, "# error : line 3: column 'a': '1.1' cannot fit into long data type\nm,1.1", strings.Trim(string(errorLines), "\n"))
}

0 comments on commit 6d6b961

Please sign in to comment.