From caa3c6ce6c37ce42048d0cd40a879a7929924e8a Mon Sep 17 00:00:00 2001 From: Xiang Gu Date: Mon, 8 Aug 2022 17:13:10 -0400 Subject: [PATCH 1/5] sql/schemachanger/scgraph, scplan: fixed a bug when drawing dep graph Previously, we define all stauses an element can be in in the declarative schema changer in the scpb package. We removed one status (TXN_DROPPED) previously from that list and leave its enum number as a reserved number. However, some logic in scgraph incorrectly made the assumption that all enum numbers are active and we can just iterate from 0 to len(enum_list)-1 in order to iterate over all possible status, part of the logic to draw the dep graph. This is problematic because as we continue to add more status in that enum list, such way of iteration will be incorrect to draw the dep graph. This PR fixes that. This PR also spotted and fixed an panic recover bug where we forget to correctly update the return error, causing a situation where if a panic happens and the recover catches it, we will return with a nil error. Release note (bug fix): Fixed a bug internal to drawing dependency graph of a DDL statement under the declarative schema changer. --- pkg/sql/schemachanger/scplan/internal/scgraph/iteration.go | 2 +- pkg/sql/schemachanger/scplan/plan_explain.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/sql/schemachanger/scplan/internal/scgraph/iteration.go b/pkg/sql/schemachanger/scplan/internal/scgraph/iteration.go index bdd3b223a455..0e8a5e4d8c7a 100644 --- a/pkg/sql/schemachanger/scplan/internal/scgraph/iteration.go +++ b/pkg/sql/schemachanger/scplan/internal/scgraph/iteration.go @@ -23,7 +23,7 @@ type NodeIterator func(n *screl.Node) error // ForEachNode iterates the nodes in the graph. func (g *Graph) ForEachNode(it NodeIterator) error { for _, m := range g.targetNodes { - for i := 0; i < scpb.NumStatus; i++ { + for i := range scpb.Status_name { if ts, ok := m[scpb.Status(i)]; ok { if err := it(ts); err != nil { return iterutil.Map(err) diff --git a/pkg/sql/schemachanger/scplan/plan_explain.go b/pkg/sql/schemachanger/scplan/plan_explain.go index 74cc47d34b1f..ce587df7275a 100644 --- a/pkg/sql/schemachanger/scplan/plan_explain.go +++ b/pkg/sql/schemachanger/scplan/plan_explain.go @@ -30,7 +30,7 @@ import ( ) // DecorateErrorWithPlanDetails adds plan graphviz URLs as error details. -func (p Plan) DecorateErrorWithPlanDetails(err error) error { +func (p Plan) DecorateErrorWithPlanDetails(err error) (retErr error) { if err == nil { return nil } @@ -40,7 +40,7 @@ func (p Plan) DecorateErrorWithPlanDetails(err error) error { if !ok { rAsErr = errors.Errorf("panic during scplan.DecorateErrorWithPlanDetails: %v", r) } - err = errors.CombineErrors(err, rAsErr) + retErr = errors.CombineErrors(err, rAsErr) } }() From c3da9191f26a1e9644a734a48d18a38dcaf2e135 Mon Sep 17 00:00:00 2001 From: Rebecca Taft Date: Tue, 9 Aug 2022 14:15:14 -0500 Subject: [PATCH 2/5] opt: respect NO_INDEX_JOIN flag Prior to this commit, it was possible that the optimizer could produce a plan with an index join even if the user hinted that index joins should be avoided by using the NO_INDEX_JOIN hint. This commit fixes that oversight, and we no longer plan an index join in this case. This commit also adds assertions that an index join is not planned if NO_INDEX_JOIN is used to prevent this bug from recurring. Fixes #85841 Release note (bug fix): Fixed an issue where the NO_INDEX_JOIN hint could be ignored by the optimizer in some cases, causing it to create a query plan with an index join. --- pkg/sql/opt/memo/check_expr.go | 3 ++ pkg/sql/opt/xform/groupby_funcs.go | 6 +++ pkg/sql/opt/xform/index_scan_builder.go | 3 ++ pkg/sql/opt/xform/limit_funcs.go | 8 +++- pkg/sql/opt/xform/select_funcs.go | 10 +++++ pkg/sql/opt/xform/testdata/rules/groupby | 25 +++++++++++++ pkg/sql/opt/xform/testdata/rules/limit | 28 ++++++++++++++ pkg/sql/opt/xform/testdata/rules/select | 47 ++++++++++++++++++++++++ 8 files changed, 129 insertions(+), 1 deletion(-) diff --git a/pkg/sql/opt/memo/check_expr.go b/pkg/sql/opt/memo/check_expr.go index 696ad68f7d7b..208761feb13c 100644 --- a/pkg/sql/opt/memo/check_expr.go +++ b/pkg/sql/opt/memo/check_expr.go @@ -207,6 +207,9 @@ func (m *Memo) CheckExpr(e opt.Expr) { if t.Cols.Empty() { panic(errors.AssertionFailedf("index join with no columns")) } + if scan, ok := t.Input.(*ScanExpr); ok && scan.Flags.NoIndexJoin { + panic(errors.AssertionFailedf("index join used with NoIndexJoin flag")) + } case *LookupJoinExpr: if len(t.KeyCols) == 0 && len(t.LookupExpr) == 0 { diff --git a/pkg/sql/opt/xform/groupby_funcs.go b/pkg/sql/opt/xform/groupby_funcs.go index a85f47cb4639..c506879094a6 100644 --- a/pkg/sql/opt/xform/groupby_funcs.go +++ b/pkg/sql/opt/xform/groupby_funcs.go @@ -411,6 +411,12 @@ func (c *CustomFuncs) GenerateLimitedGroupByScans( return } + // Otherwise, try to construct an IndexJoin operator that provides the + // columns missing from the index. + if sp.Flags.NoIndexJoin { + return + } + // Calculate the PK columns once. if pkCols.Empty() { pkCols = c.PrimaryKeyCols(sp.Table) diff --git a/pkg/sql/opt/xform/index_scan_builder.go b/pkg/sql/opt/xform/index_scan_builder.go index ee6c4916f6e2..bcc3b2789d00 100644 --- a/pkg/sql/opt/xform/index_scan_builder.go +++ b/pkg/sql/opt/xform/index_scan_builder.go @@ -163,6 +163,9 @@ func (b *indexScanBuilder) AddSelectAfterSplit( // AddIndexJoin wraps the input expression with an IndexJoin expression that // produces the given set of columns by lookup in the primary index. func (b *indexScanBuilder) AddIndexJoin(cols opt.ColSet) { + if b.scanPrivate.Flags.NoIndexJoin { + panic(errors.AssertionFailedf("attempt to create an index join with NoIndexJoin flag")) + } if b.hasIndexJoin() { panic(errors.AssertionFailedf("cannot call AddIndexJoin twice")) } diff --git a/pkg/sql/opt/xform/limit_funcs.go b/pkg/sql/opt/xform/limit_funcs.go index 782650b01a55..362c807b2a07 100644 --- a/pkg/sql/opt/xform/limit_funcs.go +++ b/pkg/sql/opt/xform/limit_funcs.go @@ -68,7 +68,7 @@ func (c *CustomFuncs) CanLimitFilteredScan( if scanPrivate.IsVirtualTable(md) && !required.Any() { return false } - ok, _ := ordering.ScanPrivateCanProvide(c.e.mem.Metadata(), scanPrivate, &required) + ok, _ := ordering.ScanPrivateCanProvide(md, scanPrivate, &required) return ok } @@ -298,6 +298,12 @@ func (c *CustomFuncs) GenerateLimitedTopKScans( return } + // Otherwise, try to construct an IndexJoin operator that provides the + // columns missing from the index. + if sp.Flags.NoIndexJoin { + return + } + // Calculate the PK columns once. if pkCols.Empty() { pkCols = c.PrimaryKeyCols(sp.Table) diff --git a/pkg/sql/opt/xform/select_funcs.go b/pkg/sql/opt/xform/select_funcs.go index 9f7feffe7237..72d725a4fb76 100644 --- a/pkg/sql/opt/xform/select_funcs.go +++ b/pkg/sql/opt/xform/select_funcs.go @@ -121,6 +121,12 @@ func (c *CustomFuncs) GeneratePartialIndexScans( return } + // Otherwise, try to construct an IndexJoin operator that provides the + // columns missing from the index. + if scanPrivate.Flags.NoIndexJoin { + return + } + // Calculate the PK columns once. if pkCols.Empty() { pkCols = c.PrimaryKeyCols(scanPrivate.Table) @@ -893,6 +899,10 @@ func (c *CustomFuncs) GenerateInvertedIndexScans( newScanPrivate.SetConstraint(c.e.evalCtx, constraint) newScanPrivate.InvertedConstraint = spansToRead + if scanPrivate.Flags.NoIndexJoin { + return + } + // Calculate the PK columns once. if pkCols.Empty() { pkCols = c.PrimaryKeyCols(scanPrivate.Table) diff --git a/pkg/sql/opt/xform/testdata/rules/groupby b/pkg/sql/opt/xform/testdata/rules/groupby index 20661fe6ccdc..f2c1d53e3130 100644 --- a/pkg/sql/opt/xform/testdata/rules/groupby +++ b/pkg/sql/opt/xform/testdata/rules/groupby @@ -3510,3 +3510,28 @@ limit │ └── aggregations │ └── count-rows [as=count_rows:8] └── 10 + +# GenerateLimitedGroupByScans will be triggered, but not add an index +# scan to the memo since NO_INDEX_JOIN is specified. +memo expect-not=GenerateLimitedGroupByScans +SELECT d, e, count(*) FROM defg@{NO_INDEX_JOIN} GROUP BY d, e LIMIT 10 +---- +memo (optimized, ~5KB, required=[presentation: d:1,e:2,count:8]) + ├── G1: (limit G2 G3) + │ └── [presentation: d:1,e:2,count:8] + │ ├── best: (limit G2="[limit hint: 10.00]" G3) + │ └── cost: 1145.01 + ├── G2: (group-by G4 G5 cols=(1,2)) (group-by G4 G5 cols=(1,2),ordering=+1) + │ └── [limit hint: 10.00] + │ ├── best: (group-by G4 G5 cols=(1,2)) + │ └── cost: 1144.90 + ├── G3: (const 10) + ├── G4: (scan defg,cols=(1,2)) + │ ├── [ordering: +1] [limit hint: 10.00] + │ │ ├── best: (sort G4) + │ │ └── cost: 1334.20 + │ └── [] + │ ├── best: (scan defg,cols=(1,2)) + │ └── cost: 1094.72 + ├── G5: (aggregations G6) + └── G6: (count-rows) diff --git a/pkg/sql/opt/xform/testdata/rules/limit b/pkg/sql/opt/xform/testdata/rules/limit index e1dc67ed96c5..32e55dfc679d 100644 --- a/pkg/sql/opt/xform/testdata/rules/limit +++ b/pkg/sql/opt/xform/testdata/rules/limit @@ -1936,6 +1936,34 @@ top-k └── scan defg └── columns: d:1 e:2 f:3 g:4 +# GenerateLimitedTopKScans will be triggered, but not add an index +# scan to the memo since NO_INDEX_JOIN is specified. +memo expect-not=GenerateLimitedTopKScans +SELECT d, f, e FROM defg@{NO_INDEX_JOIN} ORDER BY d, f, e LIMIT 10 +---- +memo (optimized, ~4KB, required=[presentation: d:1,f:3,e:2] [ordering: +1,+3,+2]) + ├── G1: (limit G2 G3 ordering=+1,+3,+2) (top-k G2 &{10 +1,+3,+2 }) (top-k G2 &{10 +1,+3,+2 +1,+3}) + │ ├── [presentation: d:1,f:3,e:2] [ordering: +1,+3,+2] + │ │ ├── best: (top-k G2 &{10 +1,+3,+2 }) + │ │ └── cost: 1196.32 + │ └── [] + │ ├── best: (top-k G2 &{10 +1,+3,+2 }) + │ └── cost: 1196.32 + ├── G2: (scan defg,cols=(1-3)) + │ ├── [ordering: +1,+3,+2] [limit hint: 10.00] + │ │ ├── best: (sort G2) + │ │ └── cost: 1366.36 + │ ├── [ordering: +1,+3] + │ │ ├── best: (sort G2) + │ │ └── cost: 1365.27 + │ ├── [ordering: +1,+3] [limit hint: 100.00] + │ │ ├── best: (sort G2) + │ │ └── cost: 1365.27 + │ └── [] + │ ├── best: (scan defg,cols=(1-3)) + │ └── cost: 1104.82 + └── G3: (const 10) + # --------------------------------------------------- # GeneratePartialOrderTopK # --------------------------------------------------- diff --git a/pkg/sql/opt/xform/testdata/rules/select b/pkg/sql/opt/xform/testdata/rules/select index 67e82b6dccee..016f2d842902 100644 --- a/pkg/sql/opt/xform/testdata/rules/select +++ b/pkg/sql/opt/xform/testdata/rules/select @@ -306,6 +306,29 @@ memo (optimized, ~17KB, required=[presentation: k:1,i:2,f:3,s:4,b:5]) ├── G16: (variable s) └── G17: (const 'foo') +# GeneratePartialIndexScans will be triggered, but not add an index +# scan to the memo since NO_INDEX_JOIN is specified. +memo expect-not=GeneratePartialIndexScans +SELECT * FROM p@{NO_INDEX_JOIN} WHERE i > 0 AND s = 'foo' +---- +memo (optimized, ~9KB, required=[presentation: k:1,i:2,f:3,s:4,b:5]) + ├── G1: (select G2 G3) + │ └── [presentation: k:1,i:2,f:3,s:4,b:5] + │ ├── best: (select G2 G3) + │ └── cost: 1135.06 + ├── G2: (scan p,cols=(1-5)) + │ └── [] + │ ├── best: (scan p,cols=(1-5)) + │ └── cost: 1125.02 + ├── G3: (filters G4 G5) + ├── G4: (gt G6 G7) + ├── G5: (eq G8 G9) + ├── G6: (variable i) + ├── G7: (const 0) + ├── G8: (variable s) + └── G9: (const 'foo') + + # Do not generate a partial index scan when the predicate is not implied by the # filter. memo expect-not=GeneratePartialIndexScans @@ -2202,6 +2225,30 @@ memo (optimized, ~8KB, required=[presentation: k:1]) ├── G8: (variable j) └── G9: (const '{"a": "b"}') +# GenerateInvertedIndexScans will be triggered, but not add an index +# scan to the memo since NO_INDEX_JOIN is specified. +memo expect-not=GenerateInvertedIndexScans +SELECT k FROM b@{NO_INDEX_JOIN} WHERE j @> '{"a": "b"}' +---- +memo (optimized, ~6KB, required=[presentation: k:1]) + ├── G1: (project G2 G3 k) + │ └── [presentation: k:1] + │ ├── best: (project G2 G3 k) + │ └── cost: 1095.77 + ├── G2: (select G4 G5) + │ └── [] + │ ├── best: (select G4 G5) + │ └── cost: 1094.65 + ├── G3: (projections) + ├── G4: (scan b,cols=(1,4)) + │ └── [] + │ ├── best: (scan b,cols=(1,4)) + │ └── cost: 1084.62 + ├── G5: (filters G6) + ├── G6: (contains G7 G8) + ├── G7: (variable j) + └── G8: (const '{"a": "b"}') + # Query requiring an index join with no remaining filter. opt expect=GenerateInvertedIndexScans SELECT u, k FROM b WHERE j @> '{"a": "b"}' From 7cefe60f85f11b2c40ee4135714ff537cbb564a7 Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Fri, 15 Jul 2022 00:36:27 -0400 Subject: [PATCH 3/5] sql,csv: distinguish empty columns from quoted empty strings in COPY Release note (bug fix): Previously, an empty column in the input to COPY ... FROM CSV would be treated as an empty string. Now, this is treated as NULL. The quoted empty string can still be used to input an empty string, Similarly, if a different NULL token is specified in the command options, it can be quoted in order to be treated as the equivalent string value. --- pkg/sql/copy.go | 12 +- pkg/sql/importer/BUILD.bazel | 1 + pkg/sql/importer/import_stmt_test.go | 21 ++- pkg/sql/importer/read_import_base.go | 7 +- pkg/sql/importer/read_import_csv.go | 24 +++- pkg/sql/pgwire/testdata/pgtest/copy | 64 ++++++++- pkg/util/encoding/csv/example_test.go | 26 ++-- pkg/util/encoding/csv/reader.go | 27 +++- pkg/util/encoding/csv/reader_test.go | 185 ++++++++++++++------------ 9 files changed, 244 insertions(+), 123 deletions(-) diff --git a/pkg/sql/copy.go b/pkg/sql/copy.go index 39f153ace5db..366e63d78947 100644 --- a/pkg/sql/copy.go +++ b/pkg/sql/copy.go @@ -498,7 +498,7 @@ func (c *copyMachine) readCSVData(ctx context.Context, final bool) (brk bool, er record, err := c.csvReader.Read() // Look for end of data before checking for errors, since a field count // error will still return record data. - if len(record) == 1 && record[0] == endOfData && c.buf.Len() == 0 { + if len(record) == 1 && !record[0].Quoted && record[0].Val == endOfData && c.buf.Len() == 0 { return true, nil } if err != nil { @@ -509,7 +509,7 @@ func (c *copyMachine) readCSVData(ctx context.Context, final bool) (brk bool, er return false, err } -func (c *copyMachine) maybeIgnoreHiddenColumnsStr(in []string) []string { +func (c *copyMachine) maybeIgnoreHiddenColumnsStr(in []csv.Record) []csv.Record { if len(c.expectedHiddenColumnIdxs) == 0 { return in } @@ -523,7 +523,7 @@ func (c *copyMachine) maybeIgnoreHiddenColumnsStr(in []string) []string { return ret } -func (c *copyMachine) readCSVTuple(ctx context.Context, record []string) error { +func (c *copyMachine) readCSVTuple(ctx context.Context, record []csv.Record) error { if expected := len(c.resultColumns) + len(c.expectedHiddenColumnIdxs); expected != len(record) { return pgerror.Newf(pgcode.BadCopyFileFormat, "expected %d values, got %d", expected, len(record)) @@ -531,11 +531,13 @@ func (c *copyMachine) readCSVTuple(ctx context.Context, record []string) error { record = c.maybeIgnoreHiddenColumnsStr(record) exprs := make(tree.Exprs, len(record)) for i, s := range record { - if s == c.null { + // NB: When we implement FORCE_NULL, then quoted values also are allowed + // to be treated as NULL. + if !s.Quoted && s.Val == c.null { exprs[i] = tree.DNull continue } - d, _, err := tree.ParseAndRequireString(c.resultColumns[i].Typ, s, c.parsingEvalCtx) + d, _, err := tree.ParseAndRequireString(c.resultColumns[i].Typ, s.Val, c.parsingEvalCtx) if err != nil { return err } diff --git a/pkg/sql/importer/BUILD.bazel b/pkg/sql/importer/BUILD.bazel index bd274772fdd5..5bad9cb0c379 100644 --- a/pkg/sql/importer/BUILD.bazel +++ b/pkg/sql/importer/BUILD.bazel @@ -212,6 +212,7 @@ go_test( "//pkg/testutils/testcluster", "//pkg/util", "//pkg/util/ctxgroup", + "//pkg/util/encoding/csv", "//pkg/util/envutil", "//pkg/util/hlc", "//pkg/util/ioctx", diff --git a/pkg/sql/importer/import_stmt_test.go b/pkg/sql/importer/import_stmt_test.go index 68eb6ef5ec21..e9095f6fcba6 100644 --- a/pkg/sql/importer/import_stmt_test.go +++ b/pkg/sql/importer/import_stmt_test.go @@ -64,6 +64,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/encoding/csv" "github.com/cockroachdb/cockroach/pkg/util/ioctx" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -3721,7 +3722,7 @@ func BenchmarkUserfileImport(b *testing.B) { type csvBenchmarkStream struct { n int pos int - data [][]string + data [][]csv.Record } func (s *csvBenchmarkStream) Progress() float32 { @@ -3758,6 +3759,18 @@ func (s *csvBenchmarkStream) Read(buf []byte) (int, error) { return 0, io.EOF } +func toRecords(input [][]string) [][]csv.Record { + records := make([][]csv.Record, len(input)) + for i := range input { + row := make([]csv.Record, len(input[i])) + for j := range input[i] { + row[j] = csv.Record{Quoted: false, Val: input[i][j]} + } + records[i] = row + } + return records +} + var _ importRowProducer = &csvBenchmarkStream{} // BenchmarkConvertRecord-16 1000000 2107 ns/op 56.94 MB/s 3600 B/op 101 allocs/op @@ -3849,7 +3862,7 @@ func BenchmarkCSVConvertRecord(b *testing.B) { producer := &csvBenchmarkStream{ n: b.N, pos: 0, - data: tpchLineItemDataRows, + data: toRecords(tpchLineItemDataRows), } consumer := &csvRowConsumer{importCtx: importCtx, opts: &roachpb.CSVOptions{}} b.ResetTimer() @@ -4799,7 +4812,7 @@ func BenchmarkDelimitedConvertRecord(b *testing.B) { producer := &csvBenchmarkStream{ n: b.N, pos: 0, - data: tpchLineItemDataRows, + data: toRecords(tpchLineItemDataRows), } delimited := &fileReader{Reader: producer} @@ -4903,7 +4916,7 @@ func BenchmarkPgCopyConvertRecord(b *testing.B) { producer := &csvBenchmarkStream{ n: b.N, pos: 0, - data: tpchLineItemDataRows, + data: toRecords(tpchLineItemDataRows), } pgCopyInput := &fileReader{Reader: producer} diff --git a/pkg/sql/importer/read_import_base.go b/pkg/sql/importer/read_import_base.go index 321eee78922c..1789bc399286 100644 --- a/pkg/sql/importer/read_import_base.go +++ b/pkg/sql/importer/read_import_base.go @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/encoding/csv" "github.com/cockroachdb/cockroach/pkg/util/ioctx" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -724,7 +725,11 @@ func (p *parallelImporter) importWorker( rowIndex := int64(timestamp) + rowNum if err := conv.Row(ctx, conv.KvBatch.Source, rowIndex); err != nil { - return newImportRowError(err, fmt.Sprintf("%v", record), rowNum) + s := fmt.Sprintf("%v", record) + if r, ok := record.([]csv.Record); ok { + s = strRecord(r, ',') + } + return newImportRowError(err, s, rowNum) } } } diff --git a/pkg/sql/importer/read_import_csv.go b/pkg/sql/importer/read_import_csv.go index 5b2916944649..92e85dec3e86 100644 --- a/pkg/sql/importer/read_import_csv.go +++ b/pkg/sql/importer/read_import_csv.go @@ -111,7 +111,7 @@ type csvRowProducer struct { csv *csv.Reader rowNum int64 err error - record []string + record []csv.Record progress func() float32 numExpectedColumns int } @@ -141,12 +141,20 @@ func (p *csvRowProducer) Skip() error { return nil } -func strRecord(record []string, sep rune) string { +func strRecord(record []csv.Record, sep rune) string { csvSep := "," if sep != 0 { csvSep = string(sep) } - return strings.Join(record, csvSep) + strs := make([]string, len(record)) + for i := range record { + if record[i].Quoted { + strs[i] = "\"" + record[i].Val + "\"" + } else { + strs[i] = record[i].Val + } + } + return strings.Join(strs, csvSep) } // Row() implements importRowProducer interface. @@ -156,7 +164,9 @@ func (p *csvRowProducer) Row() (interface{}, error) { if len(p.record) == expectedColsLen { // Expected number of columns. - } else if len(p.record) == expectedColsLen+1 && p.record[expectedColsLen] == "" { + } else if len(p.record) == expectedColsLen+1 && + p.record[expectedColsLen].Val == "" && + !p.record[expectedColsLen].Quoted { // Line has the optional trailing comma, ignore the empty field. p.record = p.record[:expectedColsLen] } else { @@ -184,7 +194,7 @@ var _ importRowConsumer = &csvRowConsumer{} func (c *csvRowConsumer) FillDatums( row interface{}, rowNum int64, conv *row.DatumRowConverter, ) error { - record := row.([]string) + record := row.([]csv.Record) datumIdx := 0 for i, field := range record { @@ -195,11 +205,11 @@ func (c *csvRowConsumer) FillDatums( } if c.opts.NullEncoding != nil && - field == *c.opts.NullEncoding { + field.Val == *c.opts.NullEncoding { conv.Datums[datumIdx] = tree.DNull } else { var err error - conv.Datums[datumIdx], err = rowenc.ParseDatumStringAs(conv.VisibleColTypes[i], field, conv.EvalCtx) + conv.Datums[datumIdx], err = rowenc.ParseDatumStringAs(conv.VisibleColTypes[i], field.Val, conv.EvalCtx) if err != nil { col := conv.VisibleCols[i] return newImportRowError( diff --git a/pkg/sql/pgwire/testdata/pgtest/copy b/pkg/sql/pgwire/testdata/pgtest/copy index a3681ea58532..2a811f627a6e 100644 --- a/pkg/sql/pgwire/testdata/pgtest/copy +++ b/pkg/sql/pgwire/testdata/pgtest/copy @@ -25,6 +25,7 @@ Query {"String": "COPY t FROM STDIN"} CopyData {"Data": "1\tblah\n"} CopyData {"Data": "2\t\n"} CopyData {"Data": "3\t\\N\n"} +CopyData {"Data": "4\t\"\"\n"} CopyData {"Data": "\\.\n"} CopyDone Query {"String": "SELECT * FROM t ORDER BY i"} @@ -38,12 +39,13 @@ ReadyForQuery {"Type":"CommandComplete","CommandTag":"DELETE 0"} {"Type":"ReadyForQuery","TxStatus":"I"} {"Type":"CopyInResponse","ColumnFormatCodes":[0,0]} -{"Type":"CommandComplete","CommandTag":"COPY 3"} +{"Type":"CommandComplete","CommandTag":"COPY 4"} {"Type":"ReadyForQuery","TxStatus":"I"} {"Type":"DataRow","Values":[{"text":"1"},{"text":"blah"}]} {"Type":"DataRow","Values":[{"text":"2"},null]} {"Type":"DataRow","Values":[{"text":"3"},null]} -{"Type":"CommandComplete","CommandTag":"SELECT 3"} +{"Type":"DataRow","Values":[{"text":"4"},{"text":"\"\""}]} +{"Type":"CommandComplete","CommandTag":"SELECT 4"} {"Type":"ReadyForQuery","TxStatus":"I"} # Extra fields. @@ -632,6 +634,53 @@ ReadyForQuery {"Type":"CommandComplete","CommandTag":"SELECT 2"} {"Type":"ReadyForQuery","TxStatus":"I"} +# Test that we distinguish an empty column from a quoted empty string. +# By default, an empty column is NULL. +# If We specify another NULL token, then the empty column does get interpreted +# as an empty string. + +send +Query {"String": "DELETE FROM t"} +Query {"String": "COPY t FROM STDIN WITH CSV"} +CopyData {"Data": "1,cat\n"} +CopyData {"Data": "2,\"\"\n"} +CopyData {"Data": "3,\n"} +CopyData {"Data": "\\.\n"} +CopyDone +Query {"String": "COPY t FROM STDIN WITH CSV NULL 'N'"} +CopyData {"Data": "4,\"\"\n"} +CopyData {"Data": "5,\n"} +CopyData {"Data": "6,N\n"} +CopyData {"Data": "7,\"N\"\n"} +CopyData {"Data": "\\.\n"} +CopyDone +Query {"String": "SELECT i, length(t) FROM t ORDER BY i"} +---- + +until ignore=RowDescription +ReadyForQuery +ReadyForQuery +ReadyForQuery +ReadyForQuery +---- +{"Type":"CommandComplete","CommandTag":"DELETE 2"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CopyInResponse","ColumnFormatCodes":[0,0]} +{"Type":"CommandComplete","CommandTag":"COPY 3"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"CopyInResponse","ColumnFormatCodes":[0,0]} +{"Type":"CommandComplete","CommandTag":"COPY 4"} +{"Type":"ReadyForQuery","TxStatus":"I"} +{"Type":"DataRow","Values":[{"text":"1"},{"text":"3"}]} +{"Type":"DataRow","Values":[{"text":"2"},{"text":"0"}]} +{"Type":"DataRow","Values":[{"text":"3"},null]} +{"Type":"DataRow","Values":[{"text":"4"},{"text":"0"}]} +{"Type":"DataRow","Values":[{"text":"5"},{"text":"0"}]} +{"Type":"DataRow","Values":[{"text":"6"},null]} +{"Type":"DataRow","Values":[{"text":"7"},{"text":"1"}]} +{"Type":"CommandComplete","CommandTag":"SELECT 7"} +{"Type":"ReadyForQuery","TxStatus":"I"} + # Verify that COPY CSV input can be split up at arbitrary points. send Query {"String": "DELETE FROM t"} @@ -659,7 +708,7 @@ ReadyForQuery ReadyForQuery ReadyForQuery ---- -{"Type":"CommandComplete","CommandTag":"DELETE 2"} +{"Type":"CommandComplete","CommandTag":"DELETE 7"} {"Type":"ReadyForQuery","TxStatus":"I"} {"Type":"CopyInResponse","ColumnFormatCodes":[0,0]} {"Type":"CommandComplete","CommandTag":"COPY 9"} @@ -795,15 +844,19 @@ ReadyForQuery {"Type":"ReadyForQuery","TxStatus":"I"} send +Query {"String": "SET TIME ZONE UTC"} Query {"String": "COPY t FROM STDIN CSV"} CopyData {"Data": "1,2021-09-20T06:05:04\n"} CopyData {"Data": "\\.\n"} CopyDone ---- -until ignore=RowDescription +until ignore=RowDescription ignore=ParameterStatus +ReadyForQuery ReadyForQuery ---- +{"Type":"CommandComplete","CommandTag":"SET"} +{"Type":"ReadyForQuery","TxStatus":"I"} {"Type":"CopyInResponse","ColumnFormatCodes":[0,0]} {"Type":"CommandComplete","CommandTag":"COPY 1"} {"Type":"ReadyForQuery","TxStatus":"I"} @@ -817,12 +870,11 @@ CopyDone Query {"String": "SELECT i, t FROM t ORDER BY i"} ---- -until ignore=RowDescription +until ignore=RowDescription ignore=ParameterStatus ReadyForQuery ReadyForQuery ReadyForQuery ---- -{"Type":"ParameterStatus","Name":"TimeZone","Value":"America/Chicago"} {"Type":"CommandComplete","CommandTag":"SET"} {"Type":"ReadyForQuery","TxStatus":"I"} {"Type":"CopyInResponse","ColumnFormatCodes":[0,0]} diff --git a/pkg/util/encoding/csv/example_test.go b/pkg/util/encoding/csv/example_test.go index 18eacfb7f700..36dc40cce214 100644 --- a/pkg/util/encoding/csv/example_test.go +++ b/pkg/util/encoding/csv/example_test.go @@ -46,10 +46,10 @@ Ken,Thompson,ken fmt.Println(record) } // Output: - // [first_name last_name username] - // [Rob Pike rob] - // [Ken Thompson ken] - // [Robert Griesemer gri] + // [{first_name false} {last_name false} {username false}] + // [{Rob true} {Pike true} {rob false}] + // [{Ken false} {Thompson false} {ken false}] + // [{Robert true} {Griesemer true} {gri true}] } // This example shows how csv.Reader can be configured to handle other @@ -71,9 +71,14 @@ Ken;Thompson;ken log.Fatalf(ctx, "%v", err) } - fmt.Print(records) + for _, record := range records { + fmt.Println(record) + } // Output: - // [[first_name last_name username] [Rob Pike rob] [Ken Thompson ken] [Robert Griesemer gri]] + // [{first_name false} {last_name false} {username false}] + // [{Rob true} {Pike true} {rob false}] + // [{Ken false} {Thompson false} {ken false}] + // [{Robert true} {Griesemer true} {gri true}] } func ExampleReader_ReadAll() { @@ -90,9 +95,14 @@ Ken,Thompson,ken log.Fatalf(ctx, "%v", err) } - fmt.Print(records) + for _, record := range records { + fmt.Println(record) + } // Output: - // [[first_name last_name username] [Rob Pike rob] [Ken Thompson ken] [Robert Griesemer gri]] + // [{first_name false} {last_name false} {username false}] + // [{Rob true} {Pike true} {rob false}] + // [{Ken false} {Thompson false} {ken false}] + // [{Robert true} {Griesemer true} {gri true}] } func ExampleWriter() { diff --git a/pkg/util/encoding/csv/reader.go b/pkg/util/encoding/csv/reader.go index b86219c5c3d6..07df8d721e59 100644 --- a/pkg/util/encoding/csv/reader.go +++ b/pkg/util/encoding/csv/reader.go @@ -179,7 +179,7 @@ type Reader struct { fieldIndexes []int // lastRecord is a record cache and only used when ReuseRecord == true. - lastRecord []string + lastRecord []Record } // NewReader returns a new Reader that reads from r. @@ -199,7 +199,7 @@ func NewReader(r io.Reader) *Reader { // If there is no data left to be read, Read returns nil, io.EOF. // If ReuseRecord is true, the returned slice may be shared // between multiple calls to Read. -func (r *Reader) Read() (record []string, err error) { +func (r *Reader) Read() (record []Record, err error) { if r.ReuseRecord { record, err = r.readRecord(r.lastRecord) r.lastRecord = record @@ -214,7 +214,7 @@ func (r *Reader) Read() (record []string, err error) { // A successful call returns err == nil, not err == io.EOF. Because ReadAll is // defined to read until EOF, it does not treat end of file as an error to be // reported. -func (r *Reader) ReadAll() (records [][]string, err error) { +func (r *Reader) ReadAll() (records [][]Record, err error) { for { record, err := r.readRecord(nil) if err == io.EOF { @@ -299,7 +299,16 @@ func (r *Reader) stripEscapeForReadRecord(in []byte) (ret []byte, trailingEscape return ret, false } -func (r *Reader) readRecord(dst []string) ([]string, error) { +// Record is a single column of a CSV row. It's necessary to distinguish an +// empty column from a quoted empty string. Most importantly, the default +// behavior is that an empty column is treated as NULL during COPY, whereas +// a quoted empty string is treated as an empty string value. +type Record struct { + Val string + Quoted bool +} + +func (r *Reader) readRecord(dst []Record) ([]Record, error) { if r.Comma == r.Comment || !validDelim(r.Comma) || (r.Comment != 0 && !validDelim(r.Comment)) { return nil, errInvalidDelim } @@ -331,6 +340,7 @@ func (r *Reader) readRecord(dst []string) ([]string, error) { recLine := r.numLine // Starting line for record r.recordBuffer = r.recordBuffer[:0] r.fieldIndexes = r.fieldIndexes[:0] + quoted := make([]bool, 0, cap(r.fieldIndexes)) parseField: for { if r.TrimLeadingSpace { @@ -338,6 +348,7 @@ parseField: } if len(line) == 0 || line[0] != '"' { // Non-quoted string field + quoted = append(quoted, false) i := bytes.IndexRune(line, r.Comma) field := line if i >= 0 { @@ -362,6 +373,7 @@ parseField: break parseField } else { // Quoted string field + quoted = append(quoted, true) line = line[quoteLen:] for { i := bytes.IndexByte(line, '"') @@ -441,12 +453,15 @@ parseField: str := string(r.recordBuffer) // Convert to string once to batch allocations dst = dst[:0] if cap(dst) < len(r.fieldIndexes) { - dst = make([]string, len(r.fieldIndexes)) + dst = make([]Record, len(r.fieldIndexes)) } dst = dst[:len(r.fieldIndexes)] var preIdx int for i, idx := range r.fieldIndexes { - dst[i] = str[preIdx:idx] + dst[i] = Record{ + Val: str[preIdx:idx], + Quoted: quoted[i], + } preIdx = idx } diff --git a/pkg/util/encoding/csv/reader_test.go b/pkg/util/encoding/csv/reader_test.go index 164a84eec154..4251d85bb4db 100644 --- a/pkg/util/encoding/csv/reader_test.go +++ b/pkg/util/encoding/csv/reader_test.go @@ -26,7 +26,7 @@ func TestRead(t *testing.T) { tests := []struct { Name string Input string - Output [][]string + Output [][]Record Error error // These fields are copied into the Reader @@ -41,15 +41,15 @@ func TestRead(t *testing.T) { }{{ Name: "Simple", Input: "a,b,c\n", - Output: [][]string{{"a", "b", "c"}}, + Output: [][]Record{{Record{"a", false}, Record{"b", false}, Record{"c", false}}}, }, { Name: "CRLF", Input: "a,b\r\nc,d\r\n", - Output: [][]string{{"a", "b"}, {"c", "d"}}, + Output: [][]Record{{Record{"a", false}, Record{"b", false}}, {Record{"c", false}, Record{"d", false}}}, }, { Name: "BareCR", Input: "a,b\rc,d\r\n", - Output: [][]string{{"a", "b\rc", "d"}}, + Output: [][]Record{{Record{"a", false}, Record{"b\rc", false}, Record{"d", false}}}, }, { Name: "RFC4180test", Input: `#field1,field2,field3 @@ -58,22 +58,22 @@ b","ccc" "a,a","b""bb","ccc" zzz,yyy,xxx `, - Output: [][]string{ - {"#field1", "field2", "field3"}, - {"aaa", "bb\nb", "ccc"}, - {"a,a", `b"bb`, "ccc"}, - {"zzz", "yyy", "xxx"}, + Output: [][]Record{ + {Record{"#field1", false}, Record{"field2", false}, Record{"field3", false}}, + {Record{"aaa", true}, Record{"bb\nb", true}, Record{"ccc", true}}, + {Record{"a,a", true}, Record{`b"bb`, true}, Record{"ccc", true}}, + {Record{"zzz", false}, Record{"yyy", false}, Record{"xxx", false}}, }, UseFieldsPerRecord: true, FieldsPerRecord: 0, }, { Name: "NoEOLTest", Input: "a,b,c", - Output: [][]string{{"a", "b", "c"}}, + Output: [][]Record{{Record{"a", false}, Record{"b", false}, Record{"c", false}}}, }, { Name: "Semicolon", Input: "a;b;c\n", - Output: [][]string{{"a", "b", "c"}}, + Output: [][]Record{{Record{"a", false}, Record{"b", false}, Record{"c", false}}}, Comma: ';', }, { Name: "MultiLine", @@ -81,55 +81,58 @@ zzz,yyy,xxx line","one line","three line field"`, - Output: [][]string{{"two\nline", "one line", "three\nline\nfield"}}, + Output: [][]Record{{Record{"two\nline", true}, Record{"one line", true}, Record{"three\nline\nfield", true}}}, }, { Name: "BlankLine", Input: "a,b,c\n\nd,e,f\n\n", - Output: [][]string{ - {"a", "b", "c"}, - {"d", "e", "f"}, + Output: [][]Record{ + {Record{"a", false}, Record{"b", false}, Record{"c", false}}, + {Record{"d", false}, Record{"e", false}, Record{"f", false}}, }, }, { Name: "BlankLineFieldCount", Input: "a,b,c\n\nd,e,f\n\n", - Output: [][]string{ - {"a", "b", "c"}, - {"d", "e", "f"}, + Output: [][]Record{ + {Record{"a", false}, Record{"b", false}, Record{"c", false}}, + {Record{"d", false}, Record{"e", false}, Record{"f", false}}, }, UseFieldsPerRecord: true, FieldsPerRecord: 0, }, { Name: "TrimSpace", Input: " a, b, c\n", - Output: [][]string{{"a", "b", "c"}}, + Output: [][]Record{{Record{"a", false}, Record{"b", false}, Record{"c", false}}}, TrimLeadingSpace: true, }, { Name: "LeadingSpace", Input: " a, b, c\n", - Output: [][]string{{" a", " b", " c"}}, + Output: [][]Record{{Record{" a", false}, Record{" b", false}, Record{" c", false}}}, }, { Name: "Comment", Input: "#1,2,3\na,b,c\n#comment", - Output: [][]string{{"a", "b", "c"}}, + Output: [][]Record{{Record{"a", false}, Record{"b", false}, Record{"c", false}}}, Comment: '#', }, { - Name: "NoComment", - Input: "#1,2,3\na,b,c", - Output: [][]string{{"#1", "2", "3"}, {"a", "b", "c"}}, + Name: "NoComment", + Input: "#1,2,3\na,b,c", + Output: [][]Record{ + {Record{"#1", false}, Record{"2", false}, Record{"3", false}}, + {Record{"a", false}, Record{"b", false}, Record{"c", false}}, + }, }, { Name: "LazyQuotes", Input: `a "word","1"2",a","b`, - Output: [][]string{{`a "word"`, `1"2`, `a"`, `b`}}, + Output: [][]Record{{Record{`a "word"`, false}, Record{`1"2`, true}, Record{`a"`, false}, Record{`b`, true}}}, LazyQuotes: true, }, { Name: "BareQuotes", Input: `a "word","1"2",a"`, - Output: [][]string{{`a "word"`, `1"2`, `a"`}}, + Output: [][]Record{{Record{`a "word"`, false}, Record{`1"2`, true}, Record{`a"`, false}}}, LazyQuotes: true, }, { Name: "BareDoubleQuotes", Input: `a""b,c`, - Output: [][]string{{`a""b`, `c`}}, + Output: [][]Record{{Record{`a""b`, false}, Record{`c`, false}}}, LazyQuotes: true, }, { Name: "BadDoubleQuotes", @@ -138,7 +141,7 @@ field"`, }, { Name: "TrimQuote", Input: ` "a"," b",c`, - Output: [][]string{{"a", " b", "c"}}, + Output: [][]Record{{Record{"a", true}, Record{" b", true}, Record{"c", false}}}, TrimLeadingSpace: true, }, { Name: "BadBareQuote", @@ -165,36 +168,42 @@ field"`, UseFieldsPerRecord: true, FieldsPerRecord: 2, }, { - Name: "FieldCount", - Input: "a,b,c\nd,e", - Output: [][]string{{"a", "b", "c"}, {"d", "e"}}, + Name: "FieldCount", + Input: "a,b,c\nd,e", + Output: [][]Record{ + {Record{"a", false}, Record{"b", false}, Record{"c", false}}, + {Record{"d", false}, Record{"e", false}}, + }, }, { Name: "TrailingCommaEOF", Input: "a,b,c,", - Output: [][]string{{"a", "b", "c", ""}}, + Output: [][]Record{{Record{"a", false}, Record{"b", false}, Record{"c", false}, Record{"", false}}}, }, { Name: "TrailingCommaEOL", Input: "a,b,c,\n", - Output: [][]string{{"a", "b", "c", ""}}, + Output: [][]Record{{Record{"a", false}, Record{"b", false}, Record{"c", false}, Record{"", false}}}, }, { Name: "TrailingCommaSpaceEOF", Input: "a,b,c, ", - Output: [][]string{{"a", "b", "c", ""}}, + Output: [][]Record{{Record{"a", false}, Record{"b", false}, Record{"c", false}, Record{"", false}}}, TrimLeadingSpace: true, }, { Name: "TrailingCommaSpaceEOL", Input: "a,b,c, \n", - Output: [][]string{{"a", "b", "c", ""}}, + Output: [][]Record{{Record{"a", false}, Record{"b", false}, Record{"c", false}, Record{"", false}}}, TrimLeadingSpace: true, }, { - Name: "TrailingCommaLine3", - Input: "a,b,c\nd,e,f\ng,hi,", - Output: [][]string{{"a", "b", "c"}, {"d", "e", "f"}, {"g", "hi", ""}}, + Name: "TrailingCommaLine3", + Input: "a,b,c\nd,e,f\ng,hi,", + Output: [][]Record{ + {Record{"a", false}, Record{"b", false}, Record{"c", false}}, + {Record{"d", false}, Record{"e", false}, Record{"f", false}}, + {Record{"g", false}, Record{"hi", false}, Record{"", false}}}, TrimLeadingSpace: true, }, { Name: "NotTrailingComma3", Input: "a,b,c, \n", - Output: [][]string{{"a", "b", "c", " "}}, + Output: [][]Record{{Record{"a", false}, Record{"b", false}, Record{"c", false}, Record{" ", false}}}, }, { Name: "CommaFieldTest", Input: `x,y,z,w @@ -208,32 +217,32 @@ x,,, "x","","","" "","","","" `, - Output: [][]string{ - {"x", "y", "z", "w"}, - {"x", "y", "z", ""}, - {"x", "y", "", ""}, - {"x", "", "", ""}, - {"", "", "", ""}, - {"x", "y", "z", "w"}, - {"x", "y", "z", ""}, - {"x", "y", "", ""}, - {"x", "", "", ""}, - {"", "", "", ""}, + Output: [][]Record{ + {Record{"x", false}, Record{"y", false}, Record{"z", false}, Record{"w", false}}, + {Record{"x", false}, Record{"y", false}, Record{"z", false}, Record{"", false}}, + {Record{"x", false}, Record{"y", false}, Record{"", false}, Record{"", false}}, + {Record{"x", false}, Record{"", false}, Record{"", false}, Record{"", false}}, + {Record{"", false}, Record{"", false}, Record{"", false}, Record{"", false}}, + {Record{"x", true}, Record{"y", true}, Record{"z", true}, Record{"w", true}}, + {Record{"x", true}, Record{"y", true}, Record{"z", true}, Record{"", true}}, + {Record{"x", true}, Record{"y", true}, Record{"", true}, Record{"", true}}, + {Record{"x", true}, Record{"", true}, Record{"", true}, Record{"", true}}, + {Record{"", true}, Record{"", true}, Record{"", true}, Record{"", true}}, }, }, { Name: "TrailingCommaIneffective1", Input: "a,b,\nc,d,e", - Output: [][]string{ - {"a", "b", ""}, - {"c", "d", "e"}, + Output: [][]Record{ + {Record{"a", false}, Record{"b", false}, Record{"", false}}, + {Record{"c", false}, Record{"d", false}, Record{"e", false}}, }, TrimLeadingSpace: true, }, { Name: "ReadAllReuseRecord", Input: "a,b\nc,d", - Output: [][]string{ - {"a", "b"}, - {"c", "d"}, + Output: [][]Record{ + {Record{"a", false}, Record{"b", false}}, + {Record{"c", false}, Record{"d", false}}, }, ReuseRecord: true, }, { @@ -247,21 +256,21 @@ x,,, }, { Name: "CRLFInQuotedField", // Issue 21201 Input: "\"Hello\r\nHi\"", - Output: [][]string{ - {"Hello\r\nHi"}, + Output: [][]Record{ + {Record{"Hello\r\nHi", true}}, }, }, { Name: "BinaryBlobField", // Issue 19410 Input: "x09\x41\xb4\x1c,aktau", - Output: [][]string{{"x09A\xb4\x1c", "aktau"}}, + Output: [][]Record{{Record{"x09A\xb4\x1c", false}, Record{"aktau", false}}}, }, { Name: "TrailingCR", Input: "field1,field2\r", - Output: [][]string{{"field1", "field2"}}, + Output: [][]Record{{Record{"field1", false}, Record{"field2", false}}}, }, { Name: "QuotedTrailingCR", Input: "\"field\"\r", - Output: [][]string{{"field"}}, + Output: [][]Record{{Record{"field", true}}}, }, { Name: "QuotedTrailingCRCR", Input: "\"field\"\r\r", @@ -269,42 +278,46 @@ x,,, }, { Name: "FieldCR", Input: "field\rfield\r", - Output: [][]string{{"field\rfield"}}, + Output: [][]Record{{Record{"field\rfield", false}}}, }, { Name: "FieldCRCR", Input: "field\r\rfield\r\r", - Output: [][]string{{"field\r\rfield\r"}}, + Output: [][]Record{{Record{"field\r\rfield\r", false}}}, }, { Name: "FieldCRCRLF", Input: "field\r\r\nfield\r\r\n", - Output: [][]string{{"field\r"}, {"field\r"}}, + Output: [][]Record{{Record{"field\r", false}}, {Record{"field\r", false}}}, }, { Name: "FieldCRCRLFCR", Input: "field\r\r\n\rfield\r\r\n\r", - Output: [][]string{{"field\r"}, {"\rfield\r"}}, - }, { - Name: "FieldCRCRLFCRCR", - Input: "field\r\r\n\r\rfield\r\r\n\r\r", - Output: [][]string{{"field\r"}, {"\r\rfield\r"}, {"\r"}}, + Output: [][]Record{{Record{"field\r", false}}, {Record{"\rfield\r", false}}}, + }, { + Name: "FieldCRCRLFCRCR", + Input: "field\r\r\n\r\rfield\r\r\n\r\r", + Output: [][]Record{ + {Record{"field\r", false}}, + {Record{"\r\rfield\r", false}}, + {Record{"\r", false}}, + }, }, { Name: "MultiFieldCRCRLFCRCR", Input: "field1,field2\r\r\n\r\rfield1,field2\r\r\n\r\r,", - Output: [][]string{ - {"field1", "field2\r"}, - {"\r\rfield1", "field2\r"}, - {"\r\r", ""}, + Output: [][]Record{ + {Record{"field1", false}, Record{"field2\r", false}}, + {Record{"\r\rfield1", false}, Record{"field2\r", false}}, + {Record{"\r\r", false}, Record{"", false}}, }, }, { Name: "NonASCIICommaAndComment", Input: "a£b,c£ \td,e\n€ comment\n", - Output: [][]string{{"a", "b,c", "d,e"}}, + Output: [][]Record{{Record{"a", false}, Record{"b,c", false}, Record{"d,e", false}}}, TrimLeadingSpace: true, Comma: '£', Comment: '€', }, { Name: "NonASCIICommaAndCommentWithQuotes", Input: "a€\" b,\"€ c\nλ comment\n", - Output: [][]string{{"a", " b,", " c"}}, + Output: [][]Record{{Record{"a", false}, Record{" b,", true}, Record{" c", false}}}, Comma: '€', Comment: 'λ', }, { @@ -312,18 +325,18 @@ x,,, // This tests that the parser doesn't confuse such characters. Name: "NonASCIICommaConfusion", Input: "\"abθcd\"λefθgh", - Output: [][]string{{"abθcd", "efθgh"}}, + Output: [][]Record{{Record{"abθcd", true}, Record{"efθgh", false}}}, Comma: 'λ', Comment: '€', }, { Name: "NonASCIICommentConfusion", Input: "λ\nλ\nθ\nλ\n", - Output: [][]string{{"λ"}, {"λ"}, {"λ"}}, + Output: [][]Record{{Record{"λ", false}}, {Record{"λ", false}}, {Record{"λ", false}}}, Comment: 'θ', }, { Name: "QuotedFieldMultipleLF", Input: "\"\n\n\n\n\"", - Output: [][]string{{"\n\n\n\n"}}, + Output: [][]Record{{Record{"\n\n\n\n", true}}}, }, { Name: "MultipleCRLF", Input: "\r\n\r\n\r\n\r\n", @@ -332,7 +345,7 @@ x,,, // in the read buffer, so we should test the code to handle that condition. Name: "HugeLines", Input: strings.Repeat("#ignore\n", 10000) + strings.Repeat("@", 5000) + "," + strings.Repeat("*", 5000), - Output: [][]string{{strings.Repeat("@", 5000), strings.Repeat("*", 5000)}}, + Output: [][]Record{{Record{strings.Repeat("@", 5000), false}, Record{strings.Repeat("*", 5000), false}}}, Comment: '#', }, { Name: "QuoteWithTrailingCRLF", @@ -341,16 +354,16 @@ x,,, }, { Name: "LazyQuoteWithTrailingCRLF", Input: "\"foo\"bar\"\r\n", - Output: [][]string{{`foo"bar`}}, + Output: [][]Record{{Record{`foo"bar`, true}}}, LazyQuotes: true, }, { Name: "DoubleQuoteWithTrailingCRLF", Input: "\"foo\"\"bar\"\r\n", - Output: [][]string{{`foo"bar`}}, + Output: [][]Record{{Record{`foo"bar`, true}}}, }, { Name: "EvenQuotes", Input: `""""""""`, - Output: [][]string{{`"""`}}, + Output: [][]Record{{Record{`"""`, true}}}, }, { Name: "OddQuotes", Input: `"""""""`, @@ -358,7 +371,7 @@ x,,, }, { Name: "LazyOddQuotes", Input: `"""""""`, - Output: [][]string{{`"""`}}, + Output: [][]Record{{Record{`"""`, true}}}, LazyQuotes: true, }, { Name: "BadComma1", @@ -393,18 +406,18 @@ x,,, Name: "EscapeText", Escape: 'x', Input: `"x"",",","xxx"",x,"xxxx,"` + "\n", - Output: [][]string{{`"`, `,`, `x"`, `x`, `xx,`}}, + Output: [][]Record{{Record{`"`, true}, Record{`,`, true}, Record{`x"`, true}, Record{`x`, false}, Record{`xx,`, true}}}, }, { Name: "EscapeTextWithComma", Escape: 'x', Comma: 'x', Input: `"x""x,x"xxx""x"xx"x"xxxx,"` + "\n", - Output: [][]string{{`"`, `,`, `x"`, `x`, `xx,`}}, + Output: [][]Record{{Record{`"`, true}, Record{`,`, false}, Record{`x"`, true}, Record{`x`, true}, Record{`xx,`, true}}}, }, { Name: "EscapeTextWithNonEscapingCharacter", Escape: 'x', Input: `"xxx,xa",",x,"` + "\n", - Output: [][]string{{`xx,xa`, `,x,`}}, + Output: [][]Record{{Record{`xx,xa`, true}, Record{`,x,`, true}}}, }, { Name: "EscapeTextWithComma", Escape: 'x', From c40f3813257f6e5eaa63c2d1badddacd9c911907 Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Tue, 9 Aug 2022 19:10:08 -0400 Subject: [PATCH 4/5] importer: treat zero-length string as NULL in CSV import Release note (backward-incompatible change): If no `nullif` option is specified while using IMPORT CSV, then a zero-length string in the input is now treated as NULL. The quoted empty string in the input is treated as an empty string. Similarly, if `nullif` is specified, then an unquoted value is treated as NULL, and a quoted value is treated as that string. These changes were made to make IMPORT CSV behave more similarly to COPY CSV. If the previous behavior (i.e. treating either quoted or unquoted values that match the `nullif` setting as NULL) is desired, then use the new `allow_quoted_null` option in the IMPORT statement. --- pkg/roachpb/io-formats.proto | 2 + pkg/sql/importer/import_planning.go | 32 +++++---- pkg/sql/importer/import_stmt_test.go | 100 +++++++++++++++++++++++++-- pkg/sql/importer/read_import_csv.go | 14 +++- pkg/util/encoding/csv/reader.go | 7 ++ 5 files changed, 136 insertions(+), 19 deletions(-) diff --git a/pkg/roachpb/io-formats.proto b/pkg/roachpb/io-formats.proto index c7d97e663aff..8c5f73f5583f 100644 --- a/pkg/roachpb/io-formats.proto +++ b/pkg/roachpb/io-formats.proto @@ -65,6 +65,8 @@ message CSVOptions { // Indicates the number of rows to import per CSV file. // Must be a non-zero positive number. optional int64 row_limit = 6 [(gogoproto.nullable) = false]; + // allow_quoted_null + optional bool allow_quoted_null = 7 [(gogoproto.nullable) = false]; } // MySQLOutfileOptions describe the format of mysql's outfile. diff --git a/pkg/sql/importer/import_planning.go b/pkg/sql/importer/import_planning.go index 0ca2336446d0..2d4d6dbf34f0 100644 --- a/pkg/sql/importer/import_planning.go +++ b/pkg/sql/importer/import_planning.go @@ -56,12 +56,13 @@ import ( ) const ( - csvDelimiter = "delimiter" - csvComment = "comment" - csvNullIf = "nullif" - csvSkip = "skip" - csvRowLimit = "row_limit" - csvStrictQuotes = "strict_quotes" + csvDelimiter = "delimiter" + csvComment = "comment" + csvNullIf = "nullif" + csvSkip = "skip" + csvRowLimit = "row_limit" + csvStrictQuotes = "strict_quotes" + csvAllowQuotedNulls = "allow_quoted_null" mysqlOutfileRowSep = "rows_terminated_by" mysqlOutfileFieldSep = "fields_terminated_by" @@ -105,12 +106,13 @@ const ( ) var importOptionExpectValues = map[string]sql.KVStringOptValidate{ - csvDelimiter: sql.KVStringOptRequireValue, - csvComment: sql.KVStringOptRequireValue, - csvNullIf: sql.KVStringOptRequireValue, - csvSkip: sql.KVStringOptRequireValue, - csvRowLimit: sql.KVStringOptRequireValue, - csvStrictQuotes: sql.KVStringOptRequireNoValue, + csvDelimiter: sql.KVStringOptRequireValue, + csvComment: sql.KVStringOptRequireValue, + csvNullIf: sql.KVStringOptRequireValue, + csvSkip: sql.KVStringOptRequireValue, + csvRowLimit: sql.KVStringOptRequireValue, + csvStrictQuotes: sql.KVStringOptRequireNoValue, + csvAllowQuotedNulls: sql.KVStringOptRequireNoValue, mysqlOutfileRowSep: sql.KVStringOptRequireValue, mysqlOutfileFieldSep: sql.KVStringOptRequireValue, @@ -169,7 +171,7 @@ var avroAllowedOptions = makeStringSet( ) var csvAllowedOptions = makeStringSet( - csvDelimiter, csvComment, csvNullIf, csvSkip, csvStrictQuotes, csvRowLimit, + csvDelimiter, csvComment, csvNullIf, csvSkip, csvStrictQuotes, csvRowLimit, csvAllowQuotedNulls, ) var mysqlOutAllowedOptions = makeStringSet( @@ -543,6 +545,10 @@ func importPlanHook( format.Csv.NullEncoding = &override } + if _, ok := opts[csvAllowQuotedNulls]; ok { + format.Csv.AllowQuotedNull = true + } + if override, ok := opts[csvSkip]; ok { skip, err := strconv.Atoi(override) if err != nil { diff --git a/pkg/sql/importer/import_stmt_test.go b/pkg/sql/importer/import_stmt_test.go index e9095f6fcba6..6240d707cf74 100644 --- a/pkg/sql/importer/import_stmt_test.go +++ b/pkg/sql/importer/import_stmt_test.go @@ -640,6 +640,68 @@ ORDER BY table_name `SELECT * from t`: {{"NULL", "foop"}}, }, }, + { + name: "zero string is the default for nullif with CSV", + create: ` + i int primary key, + s string + `, + typ: "CSV", + data: `1, +2,""`, + query: map[string][][]string{ + `SELECT i, s from t`: { + {"1", "NULL"}, + {"2", ""}, + }, + }, + }, + { + name: "zero string in not null", + create: ` + i int primary key, + s string, + s2 string not null + `, + typ: "CSV", + data: `1,, + 2,"",""`, + err: "null value in column \"s2\" violates not-null constraint", + }, + { + name: "quoted nullif is treated as a string", + create: ` + i int primary key, + s string + `, + with: `WITH nullif = 'foo'`, + typ: "CSV", + data: `1,foo +2,"foo"`, + query: map[string][][]string{ + `SELECT i, s from t`: { + {"1", "NULL"}, + {"2", "foo"}, + }, + }, + }, + { + name: "quoted nullif is treated as a null if allow_quoted_null is used", + create: ` + i int primary key, + s string + `, + with: `WITH nullif = 'foo', allow_quoted_null`, + typ: "CSV", + data: `1,foo +2,"foo"`, + query: map[string][][]string{ + `SELECT i, s from t`: { + {"1", "NULL"}, + {"2", "NULL"}, + }, + }, + }, // PG COPY { @@ -2379,8 +2441,9 @@ func TestImportCSVStmt(t *testing.T) { f STRING DEFAULT 's', PRIMARY KEY (a, b, c) )` - query = `IMPORT INTO t CSV DATA ($1)` - nullif = ` WITH nullif=''` + query = `IMPORT INTO t CSV DATA ($1)` + nullif = ` WITH nullif=''` + allowQuotedNulls = `, allow_quoted_null` ) sqlDB.Exec(t, create) @@ -2388,13 +2451,32 @@ func TestImportCSVStmt(t *testing.T) { data = ",5,e,7,," t.Run(data, func(t *testing.T) { sqlDB.ExpectErr( - t, `row 1: parse "a" as INT8: could not parse ""`, + t, `row 1: generate insert row: null value in column "a" violates not-null constraint`, query, srv.URL, ) sqlDB.ExpectErr( t, `row 1: generate insert row: null value in column "a" violates not-null constraint`, query+nullif, srv.URL, ) + sqlDB.ExpectErr( + t, `row 1: generate insert row: null value in column "a" violates not-null constraint`, + query+nullif+allowQuotedNulls, srv.URL, + ) + }) + data = "\"\",5,e,7,," + t.Run(data, func(t *testing.T) { + sqlDB.ExpectErr( + t, `row 1: parse "a" as INT8: could not parse ""`, + query, srv.URL, + ) + sqlDB.ExpectErr( + t, `row 1: parse "a" as INT8: could not parse ""`, + query+nullif, srv.URL, + ) + sqlDB.ExpectErr( + t, `row 1: generate insert row: null value in column "a" violates not-null constraint`, + query+nullif+allowQuotedNulls, srv.URL, + ) }) data = "2,5,e,,," t.Run(data, func(t *testing.T) { @@ -3754,7 +3836,17 @@ func (s *csvBenchmarkStream) Read(buf []byte) (int, error) { if err != nil { return 0, err } - return copy(buf, strings.Join(r.([]string), "\t")+"\n"), nil + row := r.([]csv.Record) + if len(row) == 0 { + return copy(buf, "\n"), nil + } + var b strings.Builder + b.WriteString(row[0].String()) + for _, v := range row[1:] { + b.WriteString("\t") + b.WriteString(v.String()) + } + return copy(buf, b.String()+"\n"), nil } return 0, io.EOF } diff --git a/pkg/sql/importer/read_import_csv.go b/pkg/sql/importer/read_import_csv.go index 92e85dec3e86..f2abf3f97b57 100644 --- a/pkg/sql/importer/read_import_csv.go +++ b/pkg/sql/importer/read_import_csv.go @@ -204,8 +204,18 @@ func (c *csvRowConsumer) FillDatums( continue } - if c.opts.NullEncoding != nil && - field.Val == *c.opts.NullEncoding { + // NullEncoding is stored as a *string historically, from before we wanted + // it to default to "". Rather than changing the proto, we just set the + // default here. + nullEncoding := "" + if c.opts.NullEncoding != nil { + nullEncoding = *c.opts.NullEncoding + } + if (!field.Quoted || c.opts.AllowQuotedNull) && field.Val == nullEncoding { + // To match COPY, the default behavior is to only treat the field as NULL + // if it was not quoted (and if it matches the configured NullEncoding). + // The AllowQuotedNull option can be used to get the old behavior where + // even a quoted value is treated as NULL. conv.Datums[datumIdx] = tree.DNull } else { var err error diff --git a/pkg/util/encoding/csv/reader.go b/pkg/util/encoding/csv/reader.go index 07df8d721e59..b61e078748bc 100644 --- a/pkg/util/encoding/csv/reader.go +++ b/pkg/util/encoding/csv/reader.go @@ -308,6 +308,13 @@ type Record struct { Quoted bool } +func (r *Record) String() string { + if r.Quoted { + return "\"" + r.Val + "\"" + } + return r.Val +} + func (r *Reader) readRecord(dst []Record) ([]Record, error) { if r.Comma == r.Comment || !validDelim(r.Comma) || (r.Comment != 0 && !validDelim(r.Comment)) { return nil, errInvalidDelim From 6e4feec5cf98469819ebb52203cc4e8b80e15bef Mon Sep 17 00:00:00 2001 From: Xiang Gu Date: Mon, 8 Aug 2022 18:16:58 -0400 Subject: [PATCH 5/5] sql/schemachanger/scexec: fixed a bug in executing validation operations Previously, when we have a stage of validation opearations in the declarative schema changer, we incorrectly only perform the first validation operation and skip the rest. This is problematic because it's quite possible for a stage to have >1 validation operations. This PR fixes it. In a future PR, if the number of validation operation starts to increase significantly, we should employ the same 'visitor' pattern as we did for the mutation operations. Currently, we simply have a 'switch' statement for the two validation operations we support (validateUniqueIndex and validateCheckConstraint). Release note (bug fix): Fixed a bug where we incorrectly only handle the first validation operation and skip the rest in a stage of validation operations in the declarative schema changer. --- .../schemachanger/scexec/exec_validation.go | 29 +++++++++++++------ .../alter_table_alter_primary_key_vanilla | 1 + 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/pkg/sql/schemachanger/scexec/exec_validation.go b/pkg/sql/schemachanger/scexec/exec_validation.go index 31d468dac001..abefc48dfdcf 100644 --- a/pkg/sql/schemachanger/scexec/exec_validation.go +++ b/pkg/sql/schemachanger/scexec/exec_validation.go @@ -59,16 +59,27 @@ func executeValidateCheckConstraint( return errors.Errorf("executeValidateCheckConstraint is not implemented") } -func executeValidationOps(ctx context.Context, deps Dependencies, execute []scop.Op) error { - for _, op := range execute { - switch op := op.(type) { - case *scop.ValidateUniqueIndex: - return executeValidateUniqueIndex(ctx, deps, op) - case *scop.ValidateCheckConstraint: - return executeValidateCheckConstraint(ctx, deps, op) - default: - panic("unimplemented") +func executeValidationOps(ctx context.Context, deps Dependencies, ops []scop.Op) (err error) { + for _, op := range ops { + if err = executeValidationOp(ctx, deps, op); err != nil { + return err } } return nil } + +func executeValidationOp(ctx context.Context, deps Dependencies, op scop.Op) (err error) { + switch op := op.(type) { + case *scop.ValidateUniqueIndex: + if err = executeValidateUniqueIndex(ctx, deps, op); err != nil { + return errors.Wrapf(err, "%T: %v", op, op) + } + case *scop.ValidateCheckConstraint: + if err = executeValidateCheckConstraint(ctx, deps, op); err != nil { + return errors.Wrapf(err, "%T: %v", op, op) + } + default: + panic("unimplemented") + } + return nil +} diff --git a/pkg/sql/schemachanger/testdata/end_to_end/alter_table_alter_primary_key_vanilla b/pkg/sql/schemachanger/testdata/end_to_end/alter_table_alter_primary_key_vanilla index 27b65cea2b35..d4a99c7beaf4 100644 --- a/pkg/sql/schemachanger/testdata/end_to_end/alter_table_alter_primary_key_vanilla +++ b/pkg/sql/schemachanger/testdata/end_to_end/alter_table_alter_primary_key_vanilla @@ -316,6 +316,7 @@ commit transaction #8 begin transaction #9 ## PostCommitPhase stage 7 of 7 with 2 ValidationType ops validate forward indexes [2] in table #104 +validate forward indexes [4] in table #104 commit transaction #9 begin transaction #10 ## PostCommitNonRevertiblePhase stage 1 of 3 with 12 MutationType ops