diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 66149c13f95a..4e17d2e80961 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -60,6 +60,6 @@ trace.debug.enablebooleanfalseif set, traces for recent requests can be seen in the /debug page trace.lightstep.tokenstringif set, traces go to Lightstep using this token trace.zipkin.collectorstringif set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set. -versioncustom validation2.0-3set the active cluster version in the format '.'. +versioncustom validation2.0-4set the active cluster version in the format '.'. diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index 4bc19a113102..8b87504c78c8 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -327,6 +327,7 @@ explain_option_list ::= import_data_format ::= 'CSV' + | 'MYSQLOUTFILE' privileges ::= 'ALL' @@ -719,6 +720,7 @@ unreserved_keyword ::= | 'MATCH' | 'MINUTE' | 'MONTH' + | 'MYSQLOUTFILE' | 'NAMES' | 'NAN' | 'NAME' diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index 88914ef44217..7eecf38ae607 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -37,20 +37,32 @@ import ( ) const ( - importOptionDelimiter = "delimiter" - importOptionComment = "comment" - importOptionNullIf = "nullif" + csvDelimiter = "delimiter" + csvComment = "comment" + csvNullIf = "nullif" + csvSkip = "skip" + + mysqlOutfileRowSep = "rows_terminated_by" + mysqlOutfileFieldSep = "fields_terminated_by" + mysqlOutfileEnclose = "fields_enclosed_by" + mysqlOutfileEscape = "fields_escaped_by" + importOptionTransform = "transform" - importOptionSkip = "skip" importOptionSSTSize = "sstsize" ) var importOptionExpectValues = map[string]bool{ - importOptionDelimiter: true, - importOptionComment: true, - importOptionNullIf: true, + csvDelimiter: true, + csvComment: true, + csvNullIf: true, + csvSkip: true, + + mysqlOutfileRowSep: true, + mysqlOutfileFieldSep: true, + mysqlOutfileEnclose: true, + mysqlOutfileEscape: true, + importOptionTransform: true, - importOptionSkip: true, importOptionSSTSize: true, } @@ -240,11 +252,6 @@ func importPlanHook( } } - if importStmt.FileFormat != "CSV" { - // not possible with current parser rules. - return nil, nil, nil, errors.Errorf("unsupported import format: %q", importStmt.FileFormat) - } - optsFn, err := p.TypeAsStringOpts(importStmt.Options, importOptionExpectValues) if err != nil { return nil, nil, nil, err @@ -299,42 +306,95 @@ func importPlanHook( parentID = descI.(*sqlbase.DatabaseDescriptor).ID } - format := roachpb.IOFileFormat{Format: roachpb.IOFileFormat_CSV} - if override, ok := opts[importOptionDelimiter]; ok { - comma, err := util.GetSingleRune(override) - if err != nil { - return errors.Wrap(err, "invalid comma value") + format := roachpb.IOFileFormat{} + switch importStmt.FileFormat { + case "CSV": + format.Format = roachpb.IOFileFormat_CSV + if override, ok := opts[csvDelimiter]; ok { + comma, err := util.GetSingleRune(override) + if err != nil { + return errors.Wrap(err, "invalid comma value") + } + format.Csv.Comma = comma } - format.Csv.Comma = comma - } - if override, ok := opts[importOptionComment]; ok { - comment, err := util.GetSingleRune(override) - if err != nil { - return errors.Wrap(err, "invalid comment value") + if override, ok := opts[csvComment]; ok { + comment, err := util.GetSingleRune(override) + if err != nil { + return errors.Wrap(err, "invalid comment value") + } + format.Csv.Comment = comment } - format.Csv.Comment = comment - } - if override, ok := opts[importOptionNullIf]; ok { - format.Csv.NullEncoding = &override - } + if override, ok := opts[csvNullIf]; ok { + format.Csv.NullEncoding = &override + } - if override, ok := opts[importOptionSkip]; ok { - skip, err := strconv.Atoi(override) - if err != nil { - return errors.Wrapf(err, "invalid %s value", importOptionSkip) + if override, ok := opts[csvSkip]; ok { + skip, err := strconv.Atoi(override) + if err != nil { + return errors.Wrapf(err, "invalid %s value", csvSkip) + } + if skip < 0 { + return errors.Errorf("%s must be >= 0", csvSkip) + } + // We need to handle the case where the user wants to skip records and the node + // interpreting the statement might be newer than other nodes in the cluster. + if !p.ExecCfg().Settings.Version.IsMinSupported(cluster.VersionImportSkipRecords) { + return errors.Errorf("Using %s requires all nodes to be upgraded to %s", + csvSkip, cluster.VersionByKey(cluster.VersionImportSkipRecords)) + } + format.Csv.Skip = uint32(skip) + } + case "MYSQLOUTFILE": + format.Format = roachpb.IOFileFormat_MysqlOutfile + format.MysqlOut = roachpb.MySQLOutfileOptions{ + RowSeparator: '\n', + FieldSeparator: '\t', + } + if override, ok := opts[mysqlOutfileRowSep]; ok { + c, err := util.GetSingleRune(override) + if err != nil { + return errors.Wrapf(err, "invalid %q value", mysqlOutfileRowSep) + } + format.MysqlOut.RowSeparator = c + } + + if override, ok := opts[mysqlOutfileFieldSep]; ok { + c, err := util.GetSingleRune(override) + if err != nil { + return errors.Wrapf(err, "invalid %q value", mysqlOutfileFieldSep) + } + format.MysqlOut.FieldSeparator = c + } + + if override, ok := opts[mysqlOutfileEnclose]; ok { + c, err := util.GetSingleRune(override) + if err != nil { + return errors.Wrapf(err, "invalid %q value", mysqlOutfileRowSep) + } + format.MysqlOut.Enclose = roachpb.MySQLOutfileOptions_Always + format.MysqlOut.Encloser = c } - if skip < 0 { - return errors.Errorf("%s must be >= 0", importOptionSkip) + + if override, ok := opts[mysqlOutfileEscape]; ok { + c, err := util.GetSingleRune(override) + if err != nil { + return errors.Wrapf(err, "invalid %q value", mysqlOutfileRowSep) + } + format.MysqlOut.HasEscape = true + format.MysqlOut.Escape = c } - // We need to handle the case where the user wants to skip records and the node - // interpreting the statement might be newer than other nodes in the cluster. - if !p.ExecCfg().Settings.Version.IsMinSupported(cluster.VersionImportSkipRecords) { + + default: + return errors.Errorf("unsupported import format: %q", importStmt.FileFormat) + } + + if format.Format != roachpb.IOFileFormat_CSV { + if !p.ExecCfg().Settings.Version.IsMinSupported(cluster.VersionImportFormats) { return errors.Errorf("Using %s requires all nodes to be upgraded to %s", - importOptionSkip, cluster.VersionByKey(cluster.VersionImportSkipRecords)) + csvSkip, cluster.VersionByKey(cluster.VersionImportFormats)) } - format.Csv.Skip = uint32(skip) } // sstSize, if 0, will be set to an appropriate default by the specific diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index 3516e787b716..2e8e0445ddaf 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "golang.org/x/sync/errgroup" @@ -245,7 +246,7 @@ func makeCSVData( return files, filesWithOpts, filesWithDups } -func TestImportStmt(t *testing.T) { +func TestImportCSVStmt(t *testing.T) { defer leaktest.AfterTest(t)() const ( @@ -327,7 +328,7 @@ func TestImportStmt(t *testing.T) { `IMPORT TABLE t (a INT PRIMARY KEY, b STRING, INDEX (b), INDEX (a, b)) CSV DATA (%s) WITH delimiter = '|', comment = '#', nullif='', skip = '2'`, nil, filesWithOpts, - `WITH comment = '#', delimiter = '|', "nullif" = '', skip = '2'`, + ` WITH comment = '#', delimiter = '|', "nullif" = '', skip = '2'`, "", }, { @@ -336,7 +337,7 @@ func TestImportStmt(t *testing.T) { `IMPORT TABLE t CREATE USING $1 CSV DATA (%s) WITH sstsize = '10K'`, schema, files, - `WITH sstsize = '10K'`, + ` WITH sstsize = '10K'`, "", }, { @@ -344,7 +345,7 @@ func TestImportStmt(t *testing.T) { `IMPORT TABLE t (a INT PRIMARY KEY, b STRING, INDEX (b), INDEX (a, b)) CSV DATA (%s) WITH delimiter = '|', comment = '#', nullif='', skip = '2', transform = $1`, nil, filesWithOpts, - `WITH comment = '#', delimiter = '|', "nullif" = '', skip = '2', transform = 'nodelocal:///5'`, + ` WITH comment = '#', delimiter = '|', "nullif" = '', skip = '2', transform = 'nodelocal:///5'`, "", }, { @@ -447,7 +448,7 @@ func TestImportStmt(t *testing.T) { } else { jobPrefix += `""."".` } - jobPrefix += `t (a INT PRIMARY KEY, b STRING, INDEX (b), INDEX (a, b)) CSV DATA (%s) ` + jobPrefix += `t (a INT PRIMARY KEY, b STRING, INDEX (b), INDEX (a, b)) CSV DATA (%s)` if err := jobutils.VerifySystemJob(t, sqlDB, baseNumJobs+testNum, jobs.TypeImport, jobs.Record{ Username: security.RootUser, @@ -1118,3 +1119,68 @@ func TestImportMVCCChecksums(t *testing.T) { ) CSV DATA ($1)`, srv.URL) sqlDB.Exec(t, `UPDATE d.t SET c = 2 WHERE a = 1`) } + +func TestImportMysqlOutfile(t *testing.T) { + defer leaktest.AfterTest(t)() + + const ( + nodes = 3 + ) + ctx := context.Background() + baseDir := filepath.Join("testdata", "mysqlout") + args := base.TestServerArgs{ExternalIODir: baseDir} + tc := testcluster.StartTestCluster(t, nodes, base.TestClusterArgs{ServerArgs: args}) + defer tc.Stopper().Stop(ctx) + conn := tc.Conns[0] + sqlDB := sqlutils.MakeSQLRunner(conn) + + sqlDB.Exec(t, `SET CLUSTER SETTING kv.import.batch_size = '10KB'`) + sqlDB.Exec(t, `CREATE DATABASE foo; SET DATABASE = foo`) + + testRows, configs := getMysqlOutfileTestdata(t) + + for i, cfg := range configs { + t.Run(cfg.name, func(t *testing.T) { + var opts []interface{} + + cmd := fmt.Sprintf(`IMPORT TABLE test%d (i INT PRIMARY KEY, s text, b bytea) MYSQLOUTFILE DATA ($1)`, i) + opts = append(opts, fmt.Sprintf("nodelocal://%s", strings.TrimPrefix(cfg.filename, baseDir))) + + var flags []string + if cfg.opts.RowSeparator != '\n' { + opts = append(opts, string(cfg.opts.RowSeparator)) + flags = append(flags, fmt.Sprintf("rows_terminated_by = $%d", len(opts))) + } + if cfg.opts.FieldSeparator != '\t' { + opts = append(opts, string(cfg.opts.FieldSeparator)) + flags = append(flags, fmt.Sprintf("fields_terminated_by = $%d", len(opts))) + } + if cfg.opts.Enclose == roachpb.MySQLOutfileOptions_Always { + opts = append(opts, string(cfg.opts.Encloser)) + flags = append(flags, fmt.Sprintf("fields_enclosed_by = $%d", len(opts))) + } + if cfg.opts.HasEscape { + opts = append(opts, string(cfg.opts.Escape)) + flags = append(flags, fmt.Sprintf("fields_escaped_by = $%d", len(opts))) + } + if len(flags) > 0 { + cmd += " WITH " + strings.Join(flags, ", ") + } + sqlDB.Exec(t, cmd, opts...) + for idx, row := range sqlDB.QueryStr(t, fmt.Sprintf("SELECT * FROM test%d ORDER BY i", i)) { + expected, actual := testRows[idx].s, row[1] + if expected == injectNull { + expected = "NULL" + } + // TODO(dt): known limitation: even escaped `\N` becomes null. + if expected == `\N` { + expected = "NULL" + } + + if expected != actual { + t.Fatalf("expected rowi=%s string to be %q, got %q", row[0], expected, actual) + } + } + }) + } +} diff --git a/pkg/ccl/importccl/read_import_csv.go b/pkg/ccl/importccl/read_import_csv.go index 7ba2c03fe536..232503e37832 100644 --- a/pkg/ccl/importccl/read_import_csv.go +++ b/pkg/ccl/importccl/read_import_csv.go @@ -64,7 +64,9 @@ func (c *csvInputReader) inputFinished() { close(c.recordCh) } -func (c *csvInputReader) flushBatch(ctx context.Context, finished bool, progFn func(finished bool) error) error { +func (c *csvInputReader) flushBatch( + ctx context.Context, finished bool, progFn func(finished bool) error, +) error { // if the batch isn't empty, we need to flush it. if len(c.batch.r) > 0 { select { diff --git a/pkg/ccl/importccl/read_import_mysqlout.go b/pkg/ccl/importccl/read_import_mysqlout.go index 8f86addf121d..a5e67191faa3 100644 --- a/pkg/ccl/importccl/read_import_mysqlout.go +++ b/pkg/ccl/importccl/read_import_mysqlout.go @@ -33,8 +33,13 @@ func newMysqloutfileReader( tableDesc *sqlbase.TableDescriptor, expectedCols int, ) *mysqloutfileReader { + null := "NULL" + if opts.HasEscape { + null = `\N` + } + csvOpts := roachpb.CSVOptions{NullEncoding: &null} return &mysqloutfileReader{ - csvInputReader: *newCSVInputReader(kvCh, roachpb.CSVOptions{}, tableDesc, expectedCols), + csvInputReader: *newCSVInputReader(kvCh, csvOpts, tableDesc, expectedCols), opts: opts, } } diff --git a/pkg/ccl/importccl/read_import_mysqlout_test.go b/pkg/ccl/importccl/read_import_mysqlout_test.go index 62320a9f7bfe..d278a2a0e0d1 100644 --- a/pkg/ccl/importccl/read_import_mysqlout_test.go +++ b/pkg/ccl/importccl/read_import_mysqlout_test.go @@ -208,7 +208,7 @@ func TestMysqlOutfileReader(t *testing.T) { ctx := context.TODO() for _, config := range configs { t.Run(config.name, func(t *testing.T) { - converter := newMysqloutfileReader(ctx, nil, config.opts, nil, 3) + converter := newMysqloutfileReader(nil, config.opts, nil, 3) // unblock batch chan sends converter.csvInputReader.recordCh = make(chan csvRecord, 4) converter.csvInputReader.batchSize = 10 diff --git a/pkg/settings/cluster/cockroach_versions.go b/pkg/settings/cluster/cockroach_versions.go index 03e4f1d78475..fce5592a9602 100644 --- a/pkg/settings/cluster/cockroach_versions.go +++ b/pkg/settings/cluster/cockroach_versions.go @@ -51,6 +51,7 @@ const ( VersionImportSkipRecords VersionProposedTSLeaseRequest VersionRangeAppliedStateKey + VersionImportFormats // Add new versions here (step one of two). @@ -206,6 +207,11 @@ var versionsSingleton = keyedVersions([]keyedVersion{ Key: VersionRangeAppliedStateKey, Version: roachpb.Version{Major: 2, Minor: 0, Unstable: 3}, }, + { + // VersionImportFormats is https://github.com/cockroachdb/cockroach/pull/25615. + Key: VersionImportFormats, + Version: roachpb.Version{Major: 2, Minor: 0, Unstable: 4}, + }, // Add new versions here (step two of two). diff --git a/pkg/sql/parser/parse_test.go b/pkg/sql/parser/parse_test.go index a20ae861bb62..25ba7793d780 100644 --- a/pkg/sql/parser/parse_test.go +++ b/pkg/sql/parser/parse_test.go @@ -968,6 +968,7 @@ func TestParse(t *testing.T) { {`BACKUP TABLE foo TO 'bar' WITH key1, key2 = 'value'`}, {`RESTORE TABLE foo FROM 'bar' WITH key1, key2 = 'value'`}, {`IMPORT TABLE foo CREATE USING 'nodelocal:///some/file' CSV DATA ('path/to/some/file', $1) WITH temp = 'path/to/temp'`}, + {`IMPORT TABLE foo CREATE USING 'nodelocal:///some/file' MYSQLOUTFILE DATA ('path/to/some/file', $1)`}, {`IMPORT TABLE foo (id INT PRIMARY KEY, email STRING, age INT) CSV DATA ('path/to/some/file', $1) WITH temp = 'path/to/temp'`}, {`IMPORT TABLE foo (id INT, email STRING, age INT) CSV DATA ('path/to/some/file', $1) WITH comma = ',', "nullif" = 'n/a', temp = $2`}, {`EXPORT INTO CSV 'a' FROM TABLE a`}, @@ -991,7 +992,7 @@ func TestParse(t *testing.T) { } s := stmts.String() if d.sql != s { - t.Errorf("expected \n%s\n, but found \n%s", d.sql, s) + t.Errorf("expected \n%q\n, but found \n%q", d.sql, s) } } } diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 6e3f36effcd7..7c76fe4f3620 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -497,7 +497,7 @@ func newNameFromStr(s string) *tree.Name { %token LEADING LEAST LEFT LESS LEVEL LIKE LIMIT LIST LOCAL %token LOCALTIME LOCALTIMESTAMP LOW LSHIFT -%token MATCH MINVALUE MAXVALUE MINUTE MONTH +%token MATCH MINVALUE MAXVALUE MINUTE MONTH MYSQLOUTFILE %token NAN NAME NAMES NATURAL NEXT NO NO_INDEX_JOIN NORMAL %token NOT NOTHING NOTNULL NULL NULLIF @@ -1597,6 +1597,10 @@ import_data_format: { $$ = "CSV" } +| MYSQLOUTFILE + { + $$ = "MYSQLOUTFILE" + } // %Help: IMPORT - load data from file in a distributed manner // %Category: CCL @@ -1609,6 +1613,7 @@ import_data_format: // // Formats: // CSV +// MYSQLOUTFILE // // Options: // distributed = '...' @@ -7946,6 +7951,7 @@ unreserved_keyword: | MATCH | MINUTE | MONTH +| MYSQLOUTFILE | NAMES | NAN | NAME diff --git a/pkg/sql/sem/tree/import.go b/pkg/sql/sem/tree/import.go index 70c15642aa87..2b671a098120 100644 --- a/pkg/sql/sem/tree/import.go +++ b/pkg/sql/sem/tree/import.go @@ -44,10 +44,10 @@ func (node *Import) Format(ctx *FmtCtx) { ctx.WriteString(node.FileFormat) ctx.WriteString(" DATA (") ctx.FormatNode(&node.Files) - ctx.WriteString(") ") + ctx.WriteString(")") if node.Options != nil { - ctx.WriteString("WITH ") + ctx.WriteString(" WITH ") ctx.FormatNode(&node.Options) } } diff --git a/pkg/util/strings.go b/pkg/util/strings.go index e9b6083a7114..4f68cace399b 100644 --- a/pkg/util/strings.go +++ b/pkg/util/strings.go @@ -25,12 +25,12 @@ func GetSingleRune(s string) (rune, error) { if s == "" { return 0, nil } - r, sz := utf8.DecodeRuneInString(s) + r, _ := utf8.DecodeRuneInString(s) if r == utf8.RuneError { return 0, errors.Errorf("invalid character: %s", s) } - if sz != len(s) { - return r, errors.New("must be only one character") + if l := len(s); 1 != l { + return r, errors.Errorf("must be only one character, got %d in %q", l, s) } return r, nil }