Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…db#85843

84487: sql,csv: distinguish empty columns from quoted empty strings r=otan,dt a=rafiss

fixes cockroachdb#19743

The first commit is meant to backport. The second one maybe should not be backported.

Release note (bug fix): Previously, an empty column in the input to
COPY ... FROM CSV would be treated as an empty string. Now, this is
treated as NULL. The quoted empty string can still be used to input an
empty string, Similarly, if a different NULL token is specified in
the command options, it can be quoted in order to be treated as the
equivalent string value.

Release note (backward-incompatible change): If no `nullif` option is specified
while using IMPORT CSV, then a zero-length string in the input is now treated as
NULL. The quoted empty string in the input is treated as an empty string. Similarly,
if `nullif` is specified, then an unquoted value is treated as NULL, and a
quoted value is treated as that string. These changes were made to make IMPORT CSV
behave more similarly to COPY CSV.

If the previous behavior (i.e. treating either quoted or unquoted values
that match the `nullif` setting as NULL) is desired, then use the new
`allow_quoted_null` option in the IMPORT statement.

85773: sql/schemachanger/scgraph, scplan: fixed a bug when drawing dep graph r=Xiang-Gu a=Xiang-Gu

Previously, we define all stauses an element can be in in the
declarative schema changer in the scpb package. We removed one status
(TXN_DROPPED) previously from that list and leave its enum number as a
reserved number. However, some logic in scgraph incorrectly made the
assumption that all enum numbers are active and we can just iterate
from 0 to len(enum_list)-1 in order to iterate over all possible status,
part of the logic to draw the dep graph. This is problematic because as
we continue to add more status in that enum list, such way of iteration
will be incorrect to draw the dep graph. This PR fixes that.

This PR also spotted and fixed an panic recover bug where we forget to
correctly update the return error, causing a situation where if a panic
happens and the recover catches it, we will return with a nil error.

Release note (bug fix): Fixed a bug internal to drawing dependency
graph of a DDL statement under the declarative schema changer.

85781: sql/schemachanger/scexec: fixed a bug in executing validation operations r=Xiang-Gu a=Xiang-Gu

Previously, when we have a stage of validation opearations in the
declarative schema changer, we incorrectly only perform the first
validation operation and skip the rest. This is problematic because it's
quite possible for a stage to have >1 validation operations. This PR
fixes it.

In a future PR, if the number of validation operation starts to increase
significantly, we should employ the same 'visitor' pattern as we did for
the mutation operations. Currently, we simply have a 'switch' statement
for the two validation operations we support (validateUniqueIndex and
validateCheckConstraint).

Release note (bug fix): Fixed a bug where we incorrectly only handle
the first validation operation and skip the rest in a stage of
validation operations in the declarative schema changer.

85843: opt: respect NO_INDEX_JOIN flag r=rytaft a=rytaft

Prior to this commit, it was possible that the optimizer could
produce a plan with an index join even if the user hinted that
index joins should be avoided by using the `NO_INDEX_JOIN` hint. This
commit fixes that oversight, and we no longer plan an index join
in this case. This commit also adds assertions that an index join
is not planned if `NO_INDEX_JOIN` is used to prevent this bug from
recurring.

Fixes cockroachdb#85841

Release note (bug fix): Fixed an issue where the `NO_INDEX_JOIN`
hint could be ignored by the optimizer in some cases, causing it
to create a query plan with an index join.

Co-authored-by: Rafi Shamim <[email protected]>
Co-authored-by: Xiang Gu <[email protected]>
Co-authored-by: Rebecca Taft <[email protected]>
  • Loading branch information
4 people committed Aug 10, 2022
5 parents 9be7cf5 + c40f381 + caa3c6c + 6e4feec + c3da919 commit 5f331ae
Show file tree
Hide file tree
Showing 23 changed files with 532 additions and 154 deletions.
2 changes: 2 additions & 0 deletions pkg/roachpb/io-formats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ message CSVOptions {
// Indicates the number of rows to import per CSV file.
// Must be a non-zero positive number.
optional int64 row_limit = 6 [(gogoproto.nullable) = false];
// allow_quoted_null
optional bool allow_quoted_null = 7 [(gogoproto.nullable) = false];
}

// MySQLOutfileOptions describe the format of mysql's outfile.
Expand Down
12 changes: 7 additions & 5 deletions pkg/sql/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func (c *copyMachine) readCSVData(ctx context.Context, final bool) (brk bool, er
record, err := c.csvReader.Read()
// Look for end of data before checking for errors, since a field count
// error will still return record data.
if len(record) == 1 && record[0] == endOfData && c.buf.Len() == 0 {
if len(record) == 1 && !record[0].Quoted && record[0].Val == endOfData && c.buf.Len() == 0 {
return true, nil
}
if err != nil {
Expand All @@ -509,7 +509,7 @@ func (c *copyMachine) readCSVData(ctx context.Context, final bool) (brk bool, er
return false, err
}

func (c *copyMachine) maybeIgnoreHiddenColumnsStr(in []string) []string {
func (c *copyMachine) maybeIgnoreHiddenColumnsStr(in []csv.Record) []csv.Record {
if len(c.expectedHiddenColumnIdxs) == 0 {
return in
}
Expand All @@ -523,19 +523,21 @@ func (c *copyMachine) maybeIgnoreHiddenColumnsStr(in []string) []string {
return ret
}

func (c *copyMachine) readCSVTuple(ctx context.Context, record []string) error {
func (c *copyMachine) readCSVTuple(ctx context.Context, record []csv.Record) error {
if expected := len(c.resultColumns) + len(c.expectedHiddenColumnIdxs); expected != len(record) {
return pgerror.Newf(pgcode.BadCopyFileFormat,
"expected %d values, got %d", expected, len(record))
}
record = c.maybeIgnoreHiddenColumnsStr(record)
exprs := make(tree.Exprs, len(record))
for i, s := range record {
if s == c.null {
// NB: When we implement FORCE_NULL, then quoted values also are allowed
// to be treated as NULL.
if !s.Quoted && s.Val == c.null {
exprs[i] = tree.DNull
continue
}
d, _, err := tree.ParseAndRequireString(c.resultColumns[i].Typ, s, c.parsingEvalCtx)
d, _, err := tree.ParseAndRequireString(c.resultColumns[i].Typ, s.Val, c.parsingEvalCtx)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ go_test(
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/ctxgroup",
"//pkg/util/encoding/csv",
"//pkg/util/envutil",
"//pkg/util/hlc",
"//pkg/util/ioctx",
Expand Down
32 changes: 19 additions & 13 deletions pkg/sql/importer/import_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,13 @@ import (
)

const (
csvDelimiter = "delimiter"
csvComment = "comment"
csvNullIf = "nullif"
csvSkip = "skip"
csvRowLimit = "row_limit"
csvStrictQuotes = "strict_quotes"
csvDelimiter = "delimiter"
csvComment = "comment"
csvNullIf = "nullif"
csvSkip = "skip"
csvRowLimit = "row_limit"
csvStrictQuotes = "strict_quotes"
csvAllowQuotedNulls = "allow_quoted_null"

mysqlOutfileRowSep = "rows_terminated_by"
mysqlOutfileFieldSep = "fields_terminated_by"
Expand Down Expand Up @@ -105,12 +106,13 @@ const (
)

var importOptionExpectValues = map[string]sql.KVStringOptValidate{
csvDelimiter: sql.KVStringOptRequireValue,
csvComment: sql.KVStringOptRequireValue,
csvNullIf: sql.KVStringOptRequireValue,
csvSkip: sql.KVStringOptRequireValue,
csvRowLimit: sql.KVStringOptRequireValue,
csvStrictQuotes: sql.KVStringOptRequireNoValue,
csvDelimiter: sql.KVStringOptRequireValue,
csvComment: sql.KVStringOptRequireValue,
csvNullIf: sql.KVStringOptRequireValue,
csvSkip: sql.KVStringOptRequireValue,
csvRowLimit: sql.KVStringOptRequireValue,
csvStrictQuotes: sql.KVStringOptRequireNoValue,
csvAllowQuotedNulls: sql.KVStringOptRequireNoValue,

mysqlOutfileRowSep: sql.KVStringOptRequireValue,
mysqlOutfileFieldSep: sql.KVStringOptRequireValue,
Expand Down Expand Up @@ -169,7 +171,7 @@ var avroAllowedOptions = makeStringSet(
)

var csvAllowedOptions = makeStringSet(
csvDelimiter, csvComment, csvNullIf, csvSkip, csvStrictQuotes, csvRowLimit,
csvDelimiter, csvComment, csvNullIf, csvSkip, csvStrictQuotes, csvRowLimit, csvAllowQuotedNulls,
)

var mysqlOutAllowedOptions = makeStringSet(
Expand Down Expand Up @@ -543,6 +545,10 @@ func importPlanHook(
format.Csv.NullEncoding = &override
}

if _, ok := opts[csvAllowQuotedNulls]; ok {
format.Csv.AllowQuotedNull = true
}

if override, ok := opts[csvSkip]; ok {
skip, err := strconv.Atoi(override)
if err != nil {
Expand Down
121 changes: 113 additions & 8 deletions pkg/sql/importer/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/encoding/csv"
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -639,6 +640,68 @@ ORDER BY table_name
`SELECT * from t`: {{"NULL", "foop"}},
},
},
{
name: "zero string is the default for nullif with CSV",
create: `
i int primary key,
s string
`,
typ: "CSV",
data: `1,
2,""`,
query: map[string][][]string{
`SELECT i, s from t`: {
{"1", "NULL"},
{"2", ""},
},
},
},
{
name: "zero string in not null",
create: `
i int primary key,
s string,
s2 string not null
`,
typ: "CSV",
data: `1,,
2,"",""`,
err: "null value in column \"s2\" violates not-null constraint",
},
{
name: "quoted nullif is treated as a string",
create: `
i int primary key,
s string
`,
with: `WITH nullif = 'foo'`,
typ: "CSV",
data: `1,foo
2,"foo"`,
query: map[string][][]string{
`SELECT i, s from t`: {
{"1", "NULL"},
{"2", "foo"},
},
},
},
{
name: "quoted nullif is treated as a null if allow_quoted_null is used",
create: `
i int primary key,
s string
`,
with: `WITH nullif = 'foo', allow_quoted_null`,
typ: "CSV",
data: `1,foo
2,"foo"`,
query: map[string][][]string{
`SELECT i, s from t`: {
{"1", "NULL"},
{"2", "NULL"},
},
},
},

// PG COPY
{
Expand Down Expand Up @@ -2378,22 +2441,42 @@ func TestImportCSVStmt(t *testing.T) {
f STRING DEFAULT 's',
PRIMARY KEY (a, b, c)
)`
query = `IMPORT INTO t CSV DATA ($1)`
nullif = ` WITH nullif=''`
query = `IMPORT INTO t CSV DATA ($1)`
nullif = ` WITH nullif=''`
allowQuotedNulls = `, allow_quoted_null`
)

sqlDB.Exec(t, create)

data = ",5,e,7,,"
t.Run(data, func(t *testing.T) {
sqlDB.ExpectErr(
t, `row 1: parse "a" as INT8: could not parse ""`,
t, `row 1: generate insert row: null value in column "a" violates not-null constraint`,
query, srv.URL,
)
sqlDB.ExpectErr(
t, `row 1: generate insert row: null value in column "a" violates not-null constraint`,
query+nullif, srv.URL,
)
sqlDB.ExpectErr(
t, `row 1: generate insert row: null value in column "a" violates not-null constraint`,
query+nullif+allowQuotedNulls, srv.URL,
)
})
data = "\"\",5,e,7,,"
t.Run(data, func(t *testing.T) {
sqlDB.ExpectErr(
t, `row 1: parse "a" as INT8: could not parse ""`,
query, srv.URL,
)
sqlDB.ExpectErr(
t, `row 1: parse "a" as INT8: could not parse ""`,
query+nullif, srv.URL,
)
sqlDB.ExpectErr(
t, `row 1: generate insert row: null value in column "a" violates not-null constraint`,
query+nullif+allowQuotedNulls, srv.URL,
)
})
data = "2,5,e,,,"
t.Run(data, func(t *testing.T) {
Expand Down Expand Up @@ -3737,7 +3820,7 @@ func BenchmarkUserfileImport(b *testing.B) {
type csvBenchmarkStream struct {
n int
pos int
data [][]string
data [][]csv.Record
}

func (s *csvBenchmarkStream) Progress() float32 {
Expand Down Expand Up @@ -3769,11 +3852,33 @@ func (s *csvBenchmarkStream) Read(buf []byte) (int, error) {
if err != nil {
return 0, err
}
return copy(buf, strings.Join(r.([]string), "\t")+"\n"), nil
row := r.([]csv.Record)
if len(row) == 0 {
return copy(buf, "\n"), nil
}
var b strings.Builder
b.WriteString(row[0].String())
for _, v := range row[1:] {
b.WriteString("\t")
b.WriteString(v.String())
}
return copy(buf, b.String()+"\n"), nil
}
return 0, io.EOF
}

func toRecords(input [][]string) [][]csv.Record {
records := make([][]csv.Record, len(input))
for i := range input {
row := make([]csv.Record, len(input[i]))
for j := range input[i] {
row[j] = csv.Record{Quoted: false, Val: input[i][j]}
}
records[i] = row
}
return records
}

var _ importRowProducer = &csvBenchmarkStream{}

// BenchmarkConvertRecord-16 1000000 2107 ns/op 56.94 MB/s 3600 B/op 101 allocs/op
Expand Down Expand Up @@ -3865,7 +3970,7 @@ func BenchmarkCSVConvertRecord(b *testing.B) {
producer := &csvBenchmarkStream{
n: b.N,
pos: 0,
data: tpchLineItemDataRows,
data: toRecords(tpchLineItemDataRows),
}
consumer := &csvRowConsumer{importCtx: importCtx, opts: &roachpb.CSVOptions{}}
b.ResetTimer()
Expand Down Expand Up @@ -4815,7 +4920,7 @@ func BenchmarkDelimitedConvertRecord(b *testing.B) {
producer := &csvBenchmarkStream{
n: b.N,
pos: 0,
data: tpchLineItemDataRows,
data: toRecords(tpchLineItemDataRows),
}

delimited := &fileReader{Reader: producer}
Expand Down Expand Up @@ -4919,7 +5024,7 @@ func BenchmarkPgCopyConvertRecord(b *testing.B) {
producer := &csvBenchmarkStream{
n: b.N,
pos: 0,
data: tpchLineItemDataRows,
data: toRecords(tpchLineItemDataRows),
}

pgCopyInput := &fileReader{Reader: producer}
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/importer/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/encoding/csv"
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -724,7 +725,11 @@ func (p *parallelImporter) importWorker(

rowIndex := int64(timestamp) + rowNum
if err := conv.Row(ctx, conv.KvBatch.Source, rowIndex); err != nil {
return newImportRowError(err, fmt.Sprintf("%v", record), rowNum)
s := fmt.Sprintf("%v", record)
if r, ok := record.([]csv.Record); ok {
s = strRecord(r, ',')
}
return newImportRowError(err, s, rowNum)
}
}
}
Expand Down
Loading

0 comments on commit 5f331ae

Please sign in to comment.