Skip to content

Commit

Permalink
Merge #25615
Browse files Browse the repository at this point in the history
25615: importccl: Support mysql's OUTFILE format in IMPORT r=mjibson a=dt

This adds support for MySQL's OUTFILE (as created by `mysqldump --tab`) to IMPORT.

Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
craig[bot] and dt committed May 21, 2018
2 parents 95aa0c2 + b24d85e commit 272ab17
Show file tree
Hide file tree
Showing 23 changed files with 5,129 additions and 122 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen in the /debug page</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.0-3</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.0-4</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
</tbody>
</table>
2 changes: 2 additions & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ explain_option_list ::=

import_data_format ::=
'CSV'
| 'MYSQLOUTFILE'

privileges ::=
'ALL'
Expand Down Expand Up @@ -719,6 +720,7 @@ unreserved_keyword ::=
| 'MATCH'
| 'MINUTE'
| 'MONTH'
| 'MYSQLOUTFILE'
| 'NAMES'
| 'NAN'
| 'NAME'
Expand Down
142 changes: 101 additions & 41 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,32 @@ import (
)

const (
importOptionDelimiter = "delimiter"
importOptionComment = "comment"
importOptionNullIf = "nullif"
csvDelimiter = "delimiter"
csvComment = "comment"
csvNullIf = "nullif"
csvSkip = "skip"

mysqlOutfileRowSep = "rows_terminated_by"
mysqlOutfileFieldSep = "fields_terminated_by"
mysqlOutfileEnclose = "fields_enclosed_by"
mysqlOutfileEscape = "fields_escaped_by"

importOptionTransform = "transform"
importOptionSkip = "skip"
importOptionSSTSize = "sstsize"
)

var importOptionExpectValues = map[string]bool{
importOptionDelimiter: true,
importOptionComment: true,
importOptionNullIf: true,
csvDelimiter: true,
csvComment: true,
csvNullIf: true,
csvSkip: true,

mysqlOutfileRowSep: true,
mysqlOutfileFieldSep: true,
mysqlOutfileEnclose: true,
mysqlOutfileEscape: true,

importOptionTransform: true,
importOptionSkip: true,
importOptionSSTSize: true,
}

Expand Down Expand Up @@ -240,11 +252,6 @@ func importPlanHook(
}
}

if importStmt.FileFormat != "CSV" {
// not possible with current parser rules.
return nil, nil, nil, errors.Errorf("unsupported import format: %q", importStmt.FileFormat)
}

optsFn, err := p.TypeAsStringOpts(importStmt.Options, importOptionExpectValues)
if err != nil {
return nil, nil, nil, err
Expand Down Expand Up @@ -299,42 +306,95 @@ func importPlanHook(
parentID = descI.(*sqlbase.DatabaseDescriptor).ID
}

format := roachpb.IOFileFormat{Format: roachpb.IOFileFormat_CSV}
if override, ok := opts[importOptionDelimiter]; ok {
comma, err := util.GetSingleRune(override)
if err != nil {
return errors.Wrap(err, "invalid comma value")
format := roachpb.IOFileFormat{}
switch importStmt.FileFormat {
case "CSV":
format.Format = roachpb.IOFileFormat_CSV
if override, ok := opts[csvDelimiter]; ok {
comma, err := util.GetSingleRune(override)
if err != nil {
return errors.Wrap(err, "invalid comma value")
}
format.Csv.Comma = comma
}
format.Csv.Comma = comma
}

if override, ok := opts[importOptionComment]; ok {
comment, err := util.GetSingleRune(override)
if err != nil {
return errors.Wrap(err, "invalid comment value")
if override, ok := opts[csvComment]; ok {
comment, err := util.GetSingleRune(override)
if err != nil {
return errors.Wrap(err, "invalid comment value")
}
format.Csv.Comment = comment
}
format.Csv.Comment = comment
}

if override, ok := opts[importOptionNullIf]; ok {
format.Csv.NullEncoding = &override
}
if override, ok := opts[csvNullIf]; ok {
format.Csv.NullEncoding = &override
}

if override, ok := opts[importOptionSkip]; ok {
skip, err := strconv.Atoi(override)
if err != nil {
return errors.Wrapf(err, "invalid %s value", importOptionSkip)
if override, ok := opts[csvSkip]; ok {
skip, err := strconv.Atoi(override)
if err != nil {
return errors.Wrapf(err, "invalid %s value", csvSkip)
}
if skip < 0 {
return errors.Errorf("%s must be >= 0", csvSkip)
}
// We need to handle the case where the user wants to skip records and the node
// interpreting the statement might be newer than other nodes in the cluster.
if !p.ExecCfg().Settings.Version.IsMinSupported(cluster.VersionImportSkipRecords) {
return errors.Errorf("Using non-CSV import format requires all nodes to be upgraded to %s",
cluster.VersionByKey(cluster.VersionImportSkipRecords))
}
format.Csv.Skip = uint32(skip)
}
if skip < 0 {
return errors.Errorf("%s must be >= 0", importOptionSkip)
case "MYSQLOUTFILE":
format.Format = roachpb.IOFileFormat_MysqlOutfile
format.MysqlOut = roachpb.MySQLOutfileOptions{
RowSeparator: '\n',
FieldSeparator: '\t',
}
// We need to handle the case where the user wants to skip records and the node
// interpreting the statement might be newer than other nodes in the cluster.
if !p.ExecCfg().Settings.Version.IsMinSupported(cluster.VersionImportSkipRecords) {
if override, ok := opts[mysqlOutfileRowSep]; ok {
c, err := util.GetSingleRune(override)
if err != nil {
return errors.Wrapf(err, "invalid %q value", mysqlOutfileRowSep)
}
format.MysqlOut.RowSeparator = c
}

if override, ok := opts[mysqlOutfileFieldSep]; ok {
c, err := util.GetSingleRune(override)
if err != nil {
return errors.Wrapf(err, "invalid %q value", mysqlOutfileFieldSep)
}
format.MysqlOut.FieldSeparator = c
}

if override, ok := opts[mysqlOutfileEnclose]; ok {
c, err := util.GetSingleRune(override)
if err != nil {
return errors.Wrapf(err, "invalid %q value", mysqlOutfileRowSep)
}
format.MysqlOut.Enclose = roachpb.MySQLOutfileOptions_Always
format.MysqlOut.Encloser = c
}

if override, ok := opts[mysqlOutfileEscape]; ok {
c, err := util.GetSingleRune(override)
if err != nil {
return errors.Wrapf(err, "invalid %q value", mysqlOutfileRowSep)
}
format.MysqlOut.HasEscape = true
format.MysqlOut.Escape = c
}

default:
return errors.Errorf("unsupported import format: %q", importStmt.FileFormat)
}

if format.Format != roachpb.IOFileFormat_CSV {
if !p.ExecCfg().Settings.Version.IsMinSupported(cluster.VersionImportFormats) {
return errors.Errorf("Using %s requires all nodes to be upgraded to %s",
importOptionSkip, cluster.VersionByKey(cluster.VersionImportSkipRecords))
csvSkip, cluster.VersionByKey(cluster.VersionImportFormats))
}
format.Csv.Skip = uint32(skip)
}

// sstSize, if 0, will be set to an appropriate default by the specific
Expand Down Expand Up @@ -418,7 +478,7 @@ func importPlanHook(
Username: p.User(),
Details: jobs.ImportDetails{
Tables: []jobs.ImportDetails_Table{{

Format: format,
Desc: tableDesc,
URIs: files,
BackupPath: transform,
Expand Down
78 changes: 72 additions & 6 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -245,7 +246,7 @@ func makeCSVData(
return files, filesWithOpts, filesWithDups
}

func TestImportStmt(t *testing.T) {
func TestImportCSVStmt(t *testing.T) {
defer leaktest.AfterTest(t)()

const (
Expand Down Expand Up @@ -327,7 +328,7 @@ func TestImportStmt(t *testing.T) {
`IMPORT TABLE t (a INT PRIMARY KEY, b STRING, INDEX (b), INDEX (a, b)) CSV DATA (%s) WITH delimiter = '|', comment = '#', nullif='', skip = '2'`,
nil,
filesWithOpts,
`WITH comment = '#', delimiter = '|', "nullif" = '', skip = '2'`,
` WITH comment = '#', delimiter = '|', "nullif" = '', skip = '2'`,
"",
},
{
Expand All @@ -336,15 +337,15 @@ func TestImportStmt(t *testing.T) {
`IMPORT TABLE t CREATE USING $1 CSV DATA (%s) WITH sstsize = '10K'`,
schema,
files,
`WITH sstsize = '10K'`,
` WITH sstsize = '10K'`,
"",
},
{
"schema-in-query-transform-only",
`IMPORT TABLE t (a INT PRIMARY KEY, b STRING, INDEX (b), INDEX (a, b)) CSV DATA (%s) WITH delimiter = '|', comment = '#', nullif='', skip = '2', transform = $1`,
nil,
filesWithOpts,
`WITH comment = '#', delimiter = '|', "nullif" = '', skip = '2', transform = 'nodelocal:///5'`,
` WITH comment = '#', delimiter = '|', "nullif" = '', skip = '2', transform = 'nodelocal:///5'`,
"",
},
{
Expand Down Expand Up @@ -447,7 +448,7 @@ func TestImportStmt(t *testing.T) {
} else {
jobPrefix += `""."".`
}
jobPrefix += `t (a INT PRIMARY KEY, b STRING, INDEX (b), INDEX (a, b)) CSV DATA (%s) `
jobPrefix += `t (a INT PRIMARY KEY, b STRING, INDEX (b), INDEX (a, b)) CSV DATA (%s)`

if err := jobutils.VerifySystemJob(t, sqlDB, baseNumJobs+testNum, jobs.TypeImport, jobs.Record{
Username: security.RootUser,
Expand Down Expand Up @@ -709,7 +710,7 @@ func BenchmarkConvertRecord(b *testing.B) {
// start up workers.
for i := 0; i < runtime.NumCPU(); i++ {
group.Go(func() error {
return c.convertRecord(ctx, kvCh)
return c.convertRecord(ctx)
})
}
const batchSize = 500
Expand Down Expand Up @@ -1118,3 +1119,68 @@ func TestImportMVCCChecksums(t *testing.T) {
) CSV DATA ($1)`, srv.URL)
sqlDB.Exec(t, `UPDATE d.t SET c = 2 WHERE a = 1`)
}

func TestImportMysqlOutfile(t *testing.T) {
defer leaktest.AfterTest(t)()

const (
nodes = 3
)
ctx := context.Background()
baseDir := filepath.Join("testdata", "mysqlout")
args := base.TestServerArgs{ExternalIODir: baseDir}
tc := testcluster.StartTestCluster(t, nodes, base.TestClusterArgs{ServerArgs: args})
defer tc.Stopper().Stop(ctx)
conn := tc.Conns[0]
sqlDB := sqlutils.MakeSQLRunner(conn)

sqlDB.Exec(t, `SET CLUSTER SETTING kv.import.batch_size = '10KB'`)
sqlDB.Exec(t, `CREATE DATABASE foo; SET DATABASE = foo`)

testRows, configs := getMysqlOutfileTestdata(t)

for i, cfg := range configs {
t.Run(cfg.name, func(t *testing.T) {
var opts []interface{}

cmd := fmt.Sprintf(`IMPORT TABLE test%d (i INT PRIMARY KEY, s text, b bytea) MYSQLOUTFILE DATA ($1)`, i)
opts = append(opts, fmt.Sprintf("nodelocal://%s", strings.TrimPrefix(cfg.filename, baseDir)))

var flags []string
if cfg.opts.RowSeparator != '\n' {
opts = append(opts, string(cfg.opts.RowSeparator))
flags = append(flags, fmt.Sprintf("rows_terminated_by = $%d", len(opts)))
}
if cfg.opts.FieldSeparator != '\t' {
opts = append(opts, string(cfg.opts.FieldSeparator))
flags = append(flags, fmt.Sprintf("fields_terminated_by = $%d", len(opts)))
}
if cfg.opts.Enclose == roachpb.MySQLOutfileOptions_Always {
opts = append(opts, string(cfg.opts.Encloser))
flags = append(flags, fmt.Sprintf("fields_enclosed_by = $%d", len(opts)))
}
if cfg.opts.HasEscape {
opts = append(opts, string(cfg.opts.Escape))
flags = append(flags, fmt.Sprintf("fields_escaped_by = $%d", len(opts)))
}
if len(flags) > 0 {
cmd += " WITH " + strings.Join(flags, ", ")
}
sqlDB.Exec(t, cmd, opts...)
for idx, row := range sqlDB.QueryStr(t, fmt.Sprintf("SELECT * FROM test%d ORDER BY i", i)) {
expected, actual := testRows[idx].s, row[1]
if expected == injectNull {
expected = "NULL"
}
// TODO(dt): known limitation: even escaped `\N` becomes null.
if expected == `\N` {
expected = "NULL"
}

if expected != actual {
t.Fatalf("expected row i=%s string to be %q, got %q", row[0], expected, actual)
}
}
})
}
}
Loading

0 comments on commit 272ab17

Please sign in to comment.