Skip to content

Commit

Permalink
importccl: hook up MYSQLOUTFILE as a valid IMPORT format
Browse files Browse the repository at this point in the history
Release note (sql change): Add support for MySQL's tabbed OUTFILE format to IMPORT.
  • Loading branch information
dt committed May 17, 2018
1 parent 9c3fa62 commit e88994b
Show file tree
Hide file tree
Showing 12 changed files with 204 additions and 56 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen in the /debug page</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.0-3</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>2.0-4</code></td><td>set the active cluster version in the format '<major>.<minor>'.</td></tr>
</tbody>
</table>
2 changes: 2 additions & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ explain_option_list ::=

import_data_format ::=
'CSV'
| 'MYSQLOUTFILE'

privileges ::=
'ALL'
Expand Down Expand Up @@ -719,6 +720,7 @@ unreserved_keyword ::=
| 'MATCH'
| 'MINUTE'
| 'MONTH'
| 'MYSQLOUTFILE'
| 'NAMES'
| 'NAN'
| 'NAME'
Expand Down
140 changes: 100 additions & 40 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,32 @@ import (
)

const (
importOptionDelimiter = "delimiter"
importOptionComment = "comment"
importOptionNullIf = "nullif"
csvDelimiter = "delimiter"
csvComment = "comment"
csvNullIf = "nullif"
csvSkip = "skip"

mysqlOutfileRowSep = "rows_terminated_by"
mysqlOutfileFieldSep = "fields_terminated_by"
mysqlOutfileEnclose = "fields_enclosed_by"
mysqlOutfileEscape = "fields_escaped_by"

importOptionTransform = "transform"
importOptionSkip = "skip"
importOptionSSTSize = "sstsize"
)

var importOptionExpectValues = map[string]bool{
importOptionDelimiter: true,
importOptionComment: true,
importOptionNullIf: true,
csvDelimiter: true,
csvComment: true,
csvNullIf: true,
csvSkip: true,

mysqlOutfileRowSep: true,
mysqlOutfileFieldSep: true,
mysqlOutfileEnclose: true,
mysqlOutfileEscape: true,

importOptionTransform: true,
importOptionSkip: true,
importOptionSSTSize: true,
}

Expand Down Expand Up @@ -240,11 +252,6 @@ func importPlanHook(
}
}

if importStmt.FileFormat != "CSV" {
// not possible with current parser rules.
return nil, nil, nil, errors.Errorf("unsupported import format: %q", importStmt.FileFormat)
}

optsFn, err := p.TypeAsStringOpts(importStmt.Options, importOptionExpectValues)
if err != nil {
return nil, nil, nil, err
Expand Down Expand Up @@ -299,42 +306,95 @@ func importPlanHook(
parentID = descI.(*sqlbase.DatabaseDescriptor).ID
}

format := roachpb.IOFileFormat{Format: roachpb.IOFileFormat_CSV}
if override, ok := opts[importOptionDelimiter]; ok {
comma, err := util.GetSingleRune(override)
if err != nil {
return errors.Wrap(err, "invalid comma value")
format := roachpb.IOFileFormat{}
switch importStmt.FileFormat {
case "CSV":
format.Format = roachpb.IOFileFormat_CSV
if override, ok := opts[csvDelimiter]; ok {
comma, err := util.GetSingleRune(override)
if err != nil {
return errors.Wrap(err, "invalid comma value")
}
format.Csv.Comma = comma
}
format.Csv.Comma = comma
}

if override, ok := opts[importOptionComment]; ok {
comment, err := util.GetSingleRune(override)
if err != nil {
return errors.Wrap(err, "invalid comment value")
if override, ok := opts[csvComment]; ok {
comment, err := util.GetSingleRune(override)
if err != nil {
return errors.Wrap(err, "invalid comment value")
}
format.Csv.Comment = comment
}
format.Csv.Comment = comment
}

if override, ok := opts[importOptionNullIf]; ok {
format.Csv.NullEncoding = &override
}
if override, ok := opts[csvNullIf]; ok {
format.Csv.NullEncoding = &override
}

if override, ok := opts[importOptionSkip]; ok {
skip, err := strconv.Atoi(override)
if err != nil {
return errors.Wrapf(err, "invalid %s value", importOptionSkip)
if override, ok := opts[csvSkip]; ok {
skip, err := strconv.Atoi(override)
if err != nil {
return errors.Wrapf(err, "invalid %s value", csvSkip)
}
if skip < 0 {
return errors.Errorf("%s must be >= 0", csvSkip)
}
// We need to handle the case where the user wants to skip records and the node
// interpreting the statement might be newer than other nodes in the cluster.
if !p.ExecCfg().Settings.Version.IsMinSupported(cluster.VersionImportSkipRecords) {
return errors.Errorf("Using %s requires all nodes to be upgraded to %s",
csvSkip, cluster.VersionByKey(cluster.VersionImportSkipRecords))
}
format.Csv.Skip = uint32(skip)
}
case "MYSQLOUTFILE":
format.Format = roachpb.IOFileFormat_MysqlOutfile
format.MysqlOut = roachpb.MySQLOutfileOptions{
RowSeparator: '\n',
FieldSeparator: '\t',
}
if override, ok := opts[mysqlOutfileRowSep]; ok {
c, err := util.GetSingleRune(override)
if err != nil {
return errors.Wrapf(err, "invalid %q value", mysqlOutfileRowSep)
}
format.MysqlOut.RowSeparator = c
}

if override, ok := opts[mysqlOutfileFieldSep]; ok {
c, err := util.GetSingleRune(override)
if err != nil {
return errors.Wrapf(err, "invalid %q value", mysqlOutfileFieldSep)
}
format.MysqlOut.FieldSeparator = c
}

if override, ok := opts[mysqlOutfileEnclose]; ok {
c, err := util.GetSingleRune(override)
if err != nil {
return errors.Wrapf(err, "invalid %q value", mysqlOutfileRowSep)
}
format.MysqlOut.Enclose = roachpb.MySQLOutfileOptions_Always
format.MysqlOut.Encloser = c
}
if skip < 0 {
return errors.Errorf("%s must be >= 0", importOptionSkip)

if override, ok := opts[mysqlOutfileEscape]; ok {
c, err := util.GetSingleRune(override)
if err != nil {
return errors.Wrapf(err, "invalid %q value", mysqlOutfileRowSep)
}
format.MysqlOut.HasEscape = true
format.MysqlOut.Escape = c
}
// We need to handle the case where the user wants to skip records and the node
// interpreting the statement might be newer than other nodes in the cluster.
if !p.ExecCfg().Settings.Version.IsMinSupported(cluster.VersionImportSkipRecords) {

default:
return errors.Errorf("unsupported import format: %q", importStmt.FileFormat)
}

if format.Format != roachpb.IOFileFormat_CSV {
if !p.ExecCfg().Settings.Version.IsMinSupported(cluster.VersionImportFormats) {
return errors.Errorf("Using %s requires all nodes to be upgraded to %s",
importOptionSkip, cluster.VersionByKey(cluster.VersionImportSkipRecords))
csvSkip, cluster.VersionByKey(cluster.VersionImportFormats))
}
format.Csv.Skip = uint32(skip)
}

// sstSize, if 0, will be set to an appropriate default by the specific
Expand Down
76 changes: 71 additions & 5 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -245,7 +246,7 @@ func makeCSVData(
return files, filesWithOpts, filesWithDups
}

func TestImportStmt(t *testing.T) {
func TestImportCSVStmt(t *testing.T) {
defer leaktest.AfterTest(t)()

const (
Expand Down Expand Up @@ -327,7 +328,7 @@ func TestImportStmt(t *testing.T) {
`IMPORT TABLE t (a INT PRIMARY KEY, b STRING, INDEX (b), INDEX (a, b)) CSV DATA (%s) WITH delimiter = '|', comment = '#', nullif='', skip = '2'`,
nil,
filesWithOpts,
`WITH comment = '#', delimiter = '|', "nullif" = '', skip = '2'`,
` WITH comment = '#', delimiter = '|', "nullif" = '', skip = '2'`,
"",
},
{
Expand All @@ -336,15 +337,15 @@ func TestImportStmt(t *testing.T) {
`IMPORT TABLE t CREATE USING $1 CSV DATA (%s) WITH sstsize = '10K'`,
schema,
files,
`WITH sstsize = '10K'`,
` WITH sstsize = '10K'`,
"",
},
{
"schema-in-query-transform-only",
`IMPORT TABLE t (a INT PRIMARY KEY, b STRING, INDEX (b), INDEX (a, b)) CSV DATA (%s) WITH delimiter = '|', comment = '#', nullif='', skip = '2', transform = $1`,
nil,
filesWithOpts,
`WITH comment = '#', delimiter = '|', "nullif" = '', skip = '2', transform = 'nodelocal:///5'`,
` WITH comment = '#', delimiter = '|', "nullif" = '', skip = '2', transform = 'nodelocal:///5'`,
"",
},
{
Expand Down Expand Up @@ -447,7 +448,7 @@ func TestImportStmt(t *testing.T) {
} else {
jobPrefix += `""."".`
}
jobPrefix += `t (a INT PRIMARY KEY, b STRING, INDEX (b), INDEX (a, b)) CSV DATA (%s) `
jobPrefix += `t (a INT PRIMARY KEY, b STRING, INDEX (b), INDEX (a, b)) CSV DATA (%s)`

if err := jobutils.VerifySystemJob(t, sqlDB, baseNumJobs+testNum, jobs.TypeImport, jobs.Record{
Username: security.RootUser,
Expand Down Expand Up @@ -1118,3 +1119,68 @@ func TestImportMVCCChecksums(t *testing.T) {
) CSV DATA ($1)`, srv.URL)
sqlDB.Exec(t, `UPDATE d.t SET c = 2 WHERE a = 1`)
}

func TestImportMysqlOutfile(t *testing.T) {
defer leaktest.AfterTest(t)()

const (
nodes = 3
)
ctx := context.Background()
baseDir := filepath.Join("testdata", "mysqlout")
args := base.TestServerArgs{ExternalIODir: baseDir}
tc := testcluster.StartTestCluster(t, nodes, base.TestClusterArgs{ServerArgs: args})
defer tc.Stopper().Stop(ctx)
conn := tc.Conns[0]
sqlDB := sqlutils.MakeSQLRunner(conn)

sqlDB.Exec(t, `SET CLUSTER SETTING kv.import.batch_size = '10KB'`)
sqlDB.Exec(t, `CREATE DATABASE foo; SET DATABASE = foo`)

testRows, configs := getMysqlOutfileTestdata(t)

for i, cfg := range configs {
t.Run(cfg.name, func(t *testing.T) {
var opts []interface{}

cmd := fmt.Sprintf(`IMPORT TABLE test%d (i INT PRIMARY KEY, s text, b bytea) MYSQLOUTFILE DATA ($1)`, i)
opts = append(opts, fmt.Sprintf("nodelocal://%s", strings.TrimPrefix(cfg.filename, baseDir)))

var flags []string
if cfg.opts.RowSeparator != '\n' {
opts = append(opts, string(cfg.opts.RowSeparator))
flags = append(flags, fmt.Sprintf("rows_terminated_by = $%d", len(opts)))
}
if cfg.opts.FieldSeparator != '\t' {
opts = append(opts, string(cfg.opts.FieldSeparator))
flags = append(flags, fmt.Sprintf("fields_terminated_by = $%d", len(opts)))
}
if cfg.opts.Enclose == roachpb.MySQLOutfileOptions_Always {
opts = append(opts, string(cfg.opts.Encloser))
flags = append(flags, fmt.Sprintf("fields_enclosed_by = $%d", len(opts)))
}
if cfg.opts.HasEscape {
opts = append(opts, string(cfg.opts.Escape))
flags = append(flags, fmt.Sprintf("fields_escaped_by = $%d", len(opts)))
}
if len(flags) > 0 {
cmd += " WITH " + strings.Join(flags, ", ")
}
sqlDB.Exec(t, cmd, opts...)
for idx, row := range sqlDB.QueryStr(t, fmt.Sprintf("SELECT * FROM test%d ORDER BY i", i)) {
expected, actual := testRows[idx].s, row[1]
if expected == injectNull {
expected = "NULL"
}
// TODO(dt): known limitation: even escaped `\N` becomes null.
if expected == `\N` {
expected = "NULL"
}

if expected != actual {
t.Fatalf("expected rowi=%s string to be %q, got %q", row[0], expected, actual)
}
}
})
}
}
4 changes: 3 additions & 1 deletion pkg/ccl/importccl/read_import_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ func (c *csvInputReader) inputFinished() {
close(c.recordCh)
}

func (c *csvInputReader) flushBatch(ctx context.Context, finished bool, progFn func(finished bool) error) error {
func (c *csvInputReader) flushBatch(
ctx context.Context, finished bool, progFn func(finished bool) error,
) error {
// if the batch isn't empty, we need to flush it.
if len(c.batch.r) > 0 {
select {
Expand Down
7 changes: 6 additions & 1 deletion pkg/ccl/importccl/read_import_mysqlout.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,13 @@ func newMysqloutfileReader(
tableDesc *sqlbase.TableDescriptor,
expectedCols int,
) *mysqloutfileReader {
null := "NULL"
if opts.HasEscape {
null = `\N`
}
csvOpts := roachpb.CSVOptions{NullEncoding: &null}
return &mysqloutfileReader{
csvInputReader: *newCSVInputReader(kvCh, roachpb.CSVOptions{}, tableDesc, expectedCols),
csvInputReader: *newCSVInputReader(kvCh, csvOpts, tableDesc, expectedCols),
opts: opts,
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_mysqlout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func TestMysqlOutfileReader(t *testing.T) {
ctx := context.TODO()
for _, config := range configs {
t.Run(config.name, func(t *testing.T) {
converter := newMysqloutfileReader(ctx, nil, config.opts, nil, 3)
converter := newMysqloutfileReader(nil, config.opts, nil, 3)
// unblock batch chan sends
converter.csvInputReader.recordCh = make(chan csvRecord, 4)
converter.csvInputReader.batchSize = 10
Expand Down
6 changes: 6 additions & 0 deletions pkg/settings/cluster/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (
VersionImportSkipRecords
VersionProposedTSLeaseRequest
VersionRangeAppliedStateKey
VersionImportFormats

// Add new versions here (step one of two).

Expand Down Expand Up @@ -206,6 +207,11 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Key: VersionRangeAppliedStateKey,
Version: roachpb.Version{Major: 2, Minor: 0, Unstable: 3},
},
{
// VersionImportFormats is https://github.com/cockroachdb/cockroach/pull/25615.
Key: VersionImportFormats,
Version: roachpb.Version{Major: 2, Minor: 0, Unstable: 4},
},

// Add new versions here (step two of two).

Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/parser/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,7 @@ func TestParse(t *testing.T) {
{`BACKUP TABLE foo TO 'bar' WITH key1, key2 = 'value'`},
{`RESTORE TABLE foo FROM 'bar' WITH key1, key2 = 'value'`},
{`IMPORT TABLE foo CREATE USING 'nodelocal:///some/file' CSV DATA ('path/to/some/file', $1) WITH temp = 'path/to/temp'`},
{`IMPORT TABLE foo CREATE USING 'nodelocal:///some/file' MYSQLOUTFILE DATA ('path/to/some/file', $1)`},
{`IMPORT TABLE foo (id INT PRIMARY KEY, email STRING, age INT) CSV DATA ('path/to/some/file', $1) WITH temp = 'path/to/temp'`},
{`IMPORT TABLE foo (id INT, email STRING, age INT) CSV DATA ('path/to/some/file', $1) WITH comma = ',', "nullif" = 'n/a', temp = $2`},
{`EXPORT INTO CSV 'a' FROM TABLE a`},
Expand All @@ -991,7 +992,7 @@ func TestParse(t *testing.T) {
}
s := stmts.String()
if d.sql != s {
t.Errorf("expected \n%s\n, but found \n%s", d.sql, s)
t.Errorf("expected \n%q\n, but found \n%q", d.sql, s)
}
}
}
Expand Down
Loading

0 comments on commit e88994b

Please sign in to comment.