Skip to content

Commit

Permalink
sql,csv: distinguish empty columns from quoted empty strings in COPY
Browse files Browse the repository at this point in the history
Release note (bug fix): Previously, an empty column in the input to
COPY ... FROM CSV would be treated as an empty string. Now, this is
treated as NULL. The quoted empty string can still be used to input an
empty string, Similarly, if a different NULL token is specified in
the command options, it can be quoted in order to be treated as the
equivalent string value.
  • Loading branch information
rafiss committed Aug 10, 2022
1 parent 49494e5 commit 7cefe60
Show file tree
Hide file tree
Showing 9 changed files with 244 additions and 123 deletions.
12 changes: 7 additions & 5 deletions pkg/sql/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func (c *copyMachine) readCSVData(ctx context.Context, final bool) (brk bool, er
record, err := c.csvReader.Read()
// Look for end of data before checking for errors, since a field count
// error will still return record data.
if len(record) == 1 && record[0] == endOfData && c.buf.Len() == 0 {
if len(record) == 1 && !record[0].Quoted && record[0].Val == endOfData && c.buf.Len() == 0 {
return true, nil
}
if err != nil {
Expand All @@ -509,7 +509,7 @@ func (c *copyMachine) readCSVData(ctx context.Context, final bool) (brk bool, er
return false, err
}

func (c *copyMachine) maybeIgnoreHiddenColumnsStr(in []string) []string {
func (c *copyMachine) maybeIgnoreHiddenColumnsStr(in []csv.Record) []csv.Record {
if len(c.expectedHiddenColumnIdxs) == 0 {
return in
}
Expand All @@ -523,19 +523,21 @@ func (c *copyMachine) maybeIgnoreHiddenColumnsStr(in []string) []string {
return ret
}

func (c *copyMachine) readCSVTuple(ctx context.Context, record []string) error {
func (c *copyMachine) readCSVTuple(ctx context.Context, record []csv.Record) error {
if expected := len(c.resultColumns) + len(c.expectedHiddenColumnIdxs); expected != len(record) {
return pgerror.Newf(pgcode.BadCopyFileFormat,
"expected %d values, got %d", expected, len(record))
}
record = c.maybeIgnoreHiddenColumnsStr(record)
exprs := make(tree.Exprs, len(record))
for i, s := range record {
if s == c.null {
// NB: When we implement FORCE_NULL, then quoted values also are allowed
// to be treated as NULL.
if !s.Quoted && s.Val == c.null {
exprs[i] = tree.DNull
continue
}
d, _, err := tree.ParseAndRequireString(c.resultColumns[i].Typ, s, c.parsingEvalCtx)
d, _, err := tree.ParseAndRequireString(c.resultColumns[i].Typ, s.Val, c.parsingEvalCtx)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ go_test(
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/ctxgroup",
"//pkg/util/encoding/csv",
"//pkg/util/envutil",
"//pkg/util/hlc",
"//pkg/util/ioctx",
Expand Down
21 changes: 17 additions & 4 deletions pkg/sql/importer/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/encoding/csv"
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -3721,7 +3722,7 @@ func BenchmarkUserfileImport(b *testing.B) {
type csvBenchmarkStream struct {
n int
pos int
data [][]string
data [][]csv.Record
}

func (s *csvBenchmarkStream) Progress() float32 {
Expand Down Expand Up @@ -3758,6 +3759,18 @@ func (s *csvBenchmarkStream) Read(buf []byte) (int, error) {
return 0, io.EOF
}

func toRecords(input [][]string) [][]csv.Record {
records := make([][]csv.Record, len(input))
for i := range input {
row := make([]csv.Record, len(input[i]))
for j := range input[i] {
row[j] = csv.Record{Quoted: false, Val: input[i][j]}
}
records[i] = row
}
return records
}

var _ importRowProducer = &csvBenchmarkStream{}

// BenchmarkConvertRecord-16 1000000 2107 ns/op 56.94 MB/s 3600 B/op 101 allocs/op
Expand Down Expand Up @@ -3849,7 +3862,7 @@ func BenchmarkCSVConvertRecord(b *testing.B) {
producer := &csvBenchmarkStream{
n: b.N,
pos: 0,
data: tpchLineItemDataRows,
data: toRecords(tpchLineItemDataRows),
}
consumer := &csvRowConsumer{importCtx: importCtx, opts: &roachpb.CSVOptions{}}
b.ResetTimer()
Expand Down Expand Up @@ -4799,7 +4812,7 @@ func BenchmarkDelimitedConvertRecord(b *testing.B) {
producer := &csvBenchmarkStream{
n: b.N,
pos: 0,
data: tpchLineItemDataRows,
data: toRecords(tpchLineItemDataRows),
}

delimited := &fileReader{Reader: producer}
Expand Down Expand Up @@ -4903,7 +4916,7 @@ func BenchmarkPgCopyConvertRecord(b *testing.B) {
producer := &csvBenchmarkStream{
n: b.N,
pos: 0,
data: tpchLineItemDataRows,
data: toRecords(tpchLineItemDataRows),
}

pgCopyInput := &fileReader{Reader: producer}
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/importer/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/encoding/csv"
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -724,7 +725,11 @@ func (p *parallelImporter) importWorker(

rowIndex := int64(timestamp) + rowNum
if err := conv.Row(ctx, conv.KvBatch.Source, rowIndex); err != nil {
return newImportRowError(err, fmt.Sprintf("%v", record), rowNum)
s := fmt.Sprintf("%v", record)
if r, ok := record.([]csv.Record); ok {
s = strRecord(r, ',')
}
return newImportRowError(err, s, rowNum)
}
}
}
Expand Down
24 changes: 17 additions & 7 deletions pkg/sql/importer/read_import_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type csvRowProducer struct {
csv *csv.Reader
rowNum int64
err error
record []string
record []csv.Record
progress func() float32
numExpectedColumns int
}
Expand Down Expand Up @@ -141,12 +141,20 @@ func (p *csvRowProducer) Skip() error {
return nil
}

func strRecord(record []string, sep rune) string {
func strRecord(record []csv.Record, sep rune) string {
csvSep := ","
if sep != 0 {
csvSep = string(sep)
}
return strings.Join(record, csvSep)
strs := make([]string, len(record))
for i := range record {
if record[i].Quoted {
strs[i] = "\"" + record[i].Val + "\""
} else {
strs[i] = record[i].Val
}
}
return strings.Join(strs, csvSep)
}

// Row() implements importRowProducer interface.
Expand All @@ -156,7 +164,9 @@ func (p *csvRowProducer) Row() (interface{}, error) {

if len(p.record) == expectedColsLen {
// Expected number of columns.
} else if len(p.record) == expectedColsLen+1 && p.record[expectedColsLen] == "" {
} else if len(p.record) == expectedColsLen+1 &&
p.record[expectedColsLen].Val == "" &&
!p.record[expectedColsLen].Quoted {
// Line has the optional trailing comma, ignore the empty field.
p.record = p.record[:expectedColsLen]
} else {
Expand Down Expand Up @@ -184,7 +194,7 @@ var _ importRowConsumer = &csvRowConsumer{}
func (c *csvRowConsumer) FillDatums(
row interface{}, rowNum int64, conv *row.DatumRowConverter,
) error {
record := row.([]string)
record := row.([]csv.Record)
datumIdx := 0

for i, field := range record {
Expand All @@ -195,11 +205,11 @@ func (c *csvRowConsumer) FillDatums(
}

if c.opts.NullEncoding != nil &&
field == *c.opts.NullEncoding {
field.Val == *c.opts.NullEncoding {
conv.Datums[datumIdx] = tree.DNull
} else {
var err error
conv.Datums[datumIdx], err = rowenc.ParseDatumStringAs(conv.VisibleColTypes[i], field, conv.EvalCtx)
conv.Datums[datumIdx], err = rowenc.ParseDatumStringAs(conv.VisibleColTypes[i], field.Val, conv.EvalCtx)
if err != nil {
col := conv.VisibleCols[i]
return newImportRowError(
Expand Down
64 changes: 58 additions & 6 deletions pkg/sql/pgwire/testdata/pgtest/copy
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Query {"String": "COPY t FROM STDIN"}
CopyData {"Data": "1\tblah\n"}
CopyData {"Data": "2\t\n"}
CopyData {"Data": "3\t\\N\n"}
CopyData {"Data": "4\t\"\"\n"}
CopyData {"Data": "\\.\n"}
CopyDone
Query {"String": "SELECT * FROM t ORDER BY i"}
Expand All @@ -38,12 +39,13 @@ ReadyForQuery
{"Type":"CommandComplete","CommandTag":"DELETE 0"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CopyInResponse","ColumnFormatCodes":[0,0]}
{"Type":"CommandComplete","CommandTag":"COPY 3"}
{"Type":"CommandComplete","CommandTag":"COPY 4"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"DataRow","Values":[{"text":"1"},{"text":"blah"}]}
{"Type":"DataRow","Values":[{"text":"2"},null]}
{"Type":"DataRow","Values":[{"text":"3"},null]}
{"Type":"CommandComplete","CommandTag":"SELECT 3"}
{"Type":"DataRow","Values":[{"text":"4"},{"text":"\"\""}]}
{"Type":"CommandComplete","CommandTag":"SELECT 4"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Extra fields.
Expand Down Expand Up @@ -632,6 +634,53 @@ ReadyForQuery
{"Type":"CommandComplete","CommandTag":"SELECT 2"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Test that we distinguish an empty column from a quoted empty string.
# By default, an empty column is NULL.
# If We specify another NULL token, then the empty column does get interpreted
# as an empty string.

send
Query {"String": "DELETE FROM t"}
Query {"String": "COPY t FROM STDIN WITH CSV"}
CopyData {"Data": "1,cat\n"}
CopyData {"Data": "2,\"\"\n"}
CopyData {"Data": "3,\n"}
CopyData {"Data": "\\.\n"}
CopyDone
Query {"String": "COPY t FROM STDIN WITH CSV NULL 'N'"}
CopyData {"Data": "4,\"\"\n"}
CopyData {"Data": "5,\n"}
CopyData {"Data": "6,N\n"}
CopyData {"Data": "7,\"N\"\n"}
CopyData {"Data": "\\.\n"}
CopyDone
Query {"String": "SELECT i, length(t) FROM t ORDER BY i"}
----

until ignore=RowDescription
ReadyForQuery
ReadyForQuery
ReadyForQuery
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"DELETE 2"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CopyInResponse","ColumnFormatCodes":[0,0]}
{"Type":"CommandComplete","CommandTag":"COPY 3"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CopyInResponse","ColumnFormatCodes":[0,0]}
{"Type":"CommandComplete","CommandTag":"COPY 4"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"DataRow","Values":[{"text":"1"},{"text":"3"}]}
{"Type":"DataRow","Values":[{"text":"2"},{"text":"0"}]}
{"Type":"DataRow","Values":[{"text":"3"},null]}
{"Type":"DataRow","Values":[{"text":"4"},{"text":"0"}]}
{"Type":"DataRow","Values":[{"text":"5"},{"text":"0"}]}
{"Type":"DataRow","Values":[{"text":"6"},null]}
{"Type":"DataRow","Values":[{"text":"7"},{"text":"1"}]}
{"Type":"CommandComplete","CommandTag":"SELECT 7"}
{"Type":"ReadyForQuery","TxStatus":"I"}

# Verify that COPY CSV input can be split up at arbitrary points.
send
Query {"String": "DELETE FROM t"}
Expand Down Expand Up @@ -659,7 +708,7 @@ ReadyForQuery
ReadyForQuery
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"DELETE 2"}
{"Type":"CommandComplete","CommandTag":"DELETE 7"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CopyInResponse","ColumnFormatCodes":[0,0]}
{"Type":"CommandComplete","CommandTag":"COPY 9"}
Expand Down Expand Up @@ -795,15 +844,19 @@ ReadyForQuery
{"Type":"ReadyForQuery","TxStatus":"I"}

send
Query {"String": "SET TIME ZONE UTC"}
Query {"String": "COPY t FROM STDIN CSV"}
CopyData {"Data": "1,2021-09-20T06:05:04\n"}
CopyData {"Data": "\\.\n"}
CopyDone
----

until ignore=RowDescription
until ignore=RowDescription ignore=ParameterStatus
ReadyForQuery
ReadyForQuery
----
{"Type":"CommandComplete","CommandTag":"SET"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CopyInResponse","ColumnFormatCodes":[0,0]}
{"Type":"CommandComplete","CommandTag":"COPY 1"}
{"Type":"ReadyForQuery","TxStatus":"I"}
Expand All @@ -817,12 +870,11 @@ CopyDone
Query {"String": "SELECT i, t FROM t ORDER BY i"}
----

until ignore=RowDescription
until ignore=RowDescription ignore=ParameterStatus
ReadyForQuery
ReadyForQuery
ReadyForQuery
----
{"Type":"ParameterStatus","Name":"TimeZone","Value":"America/Chicago"}
{"Type":"CommandComplete","CommandTag":"SET"}
{"Type":"ReadyForQuery","TxStatus":"I"}
{"Type":"CopyInResponse","ColumnFormatCodes":[0,0]}
Expand Down
26 changes: 18 additions & 8 deletions pkg/util/encoding/csv/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ Ken,Thompson,ken
fmt.Println(record)
}
// Output:
// [first_name last_name username]
// [Rob Pike rob]
// [Ken Thompson ken]
// [Robert Griesemer gri]
// [{first_name false} {last_name false} {username false}]
// [{Rob true} {Pike true} {rob false}]
// [{Ken false} {Thompson false} {ken false}]
// [{Robert true} {Griesemer true} {gri true}]
}

// This example shows how csv.Reader can be configured to handle other
Expand All @@ -71,9 +71,14 @@ Ken;Thompson;ken
log.Fatalf(ctx, "%v", err)
}

fmt.Print(records)
for _, record := range records {
fmt.Println(record)
}
// Output:
// [[first_name last_name username] [Rob Pike rob] [Ken Thompson ken] [Robert Griesemer gri]]
// [{first_name false} {last_name false} {username false}]
// [{Rob true} {Pike true} {rob false}]
// [{Ken false} {Thompson false} {ken false}]
// [{Robert true} {Griesemer true} {gri true}]
}

func ExampleReader_ReadAll() {
Expand All @@ -90,9 +95,14 @@ Ken,Thompson,ken
log.Fatalf(ctx, "%v", err)
}

fmt.Print(records)
for _, record := range records {
fmt.Println(record)
}
// Output:
// [[first_name last_name username] [Rob Pike rob] [Ken Thompson ken] [Robert Griesemer gri]]
// [{first_name false} {last_name false} {username false}]
// [{Rob true} {Pike true} {rob false}]
// [{Ken false} {Thompson false} {ken false}]
// [{Robert true} {Griesemer true} {gri true}]
}

func ExampleWriter() {
Expand Down
Loading

0 comments on commit 7cefe60

Please sign in to comment.