Skip to content

Commit

Permalink
Merge #46165
Browse files Browse the repository at this point in the history
46165: importccl: Reject invalid options when importing data. r=miretskiy a=miretskiy

Fixes #46090

Add validation logic to the options specified for the
IMPORT statement.  Report an error if the user specifies
an option not supported by the import data format.

Release notes (bug fix): Better error reporting when
importing data.

Release justification: low risk bug fix

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
craig[bot] and Yevgeniy Miretskiy committed Mar 18, 2020
2 parents 7cfd35c + b507648 commit aaa6040
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 0 deletions.
61 changes: 61 additions & 0 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,49 @@ var importOptionExpectValues = map[string]sql.KVStringOptValidate{
avroJSONRecords: sql.KVStringOptRequireNoValue,
}

func makeStringSet(opts ...string) map[string]struct{} {
res := make(map[string]struct{}, len(opts))
for _, opt := range opts {
res[opt] = struct{}{}
}
return res
}

// Options common to all formats.
var allowedCommonOptions = makeStringSet(
importOptionSSTSize, importOptionDecompress, importOptionOversample,
importOptionSaveRejected, importOptionDisableGlobMatch)

// Format specific allowed options.
var avroAllowedOptions = makeStringSet(
avroStrict, avroBinRecords, avroJSONRecords,
avroRecordsSeparatedBy, avroSchema, avroSchemaURI, optMaxRowSize,
)
var csvAllowedOptions = makeStringSet(
csvDelimiter, csvComment, csvNullIf, csvSkip, csvStrictQuotes,
)
var mysqlOutAllowedOptions = makeStringSet(
mysqlOutfileRowSep, mysqlOutfileFieldSep, mysqlOutfileEnclose,
mysqlOutfileEscape, csvNullIf, csvSkip,
)
var mysqlDumpAllowedOptions = makeStringSet(importOptionSkipFKs)
var pgCopyAllowedOptions = makeStringSet(pgCopyDelimiter, pgCopyNull, optMaxRowSize)
var pgDumpAllowedOptions = makeStringSet(optMaxRowSize, importOptionSkipFKs)

func validateFormatOptions(
format string, specified map[string]string, formatAllowed map[string]struct{},
) error {
for opt := range specified {
if _, ok := formatAllowed[opt]; !ok {
if _, ok = allowedCommonOptions[opt]; !ok {
return errors.Errorf(
"invalid option %q specified for %s import format", opt, format)
}
}
}
return nil
}

func importJobDescription(
p sql.PlanHookState,
orig *tree.Import,
Expand Down Expand Up @@ -272,6 +315,9 @@ func importPlanHook(
format := roachpb.IOFileFormat{}
switch importStmt.FileFormat {
case "CSV":
if err = validateFormatOptions(importStmt.FileFormat, opts, csvAllowedOptions); err != nil {
return err
}
telemetry.Count("import.format.csv")
format.Format = roachpb.IOFileFormat_CSV
// Set the default CSV separator for the cases when it is not overwritten.
Expand Down Expand Up @@ -313,6 +359,9 @@ func importPlanHook(
format.SaveRejected = true
}
case "DELIMITED":
if err = validateFormatOptions(importStmt.FileFormat, opts, mysqlOutAllowedOptions); err != nil {
return err
}
telemetry.Count("import.format.mysqlout")
format.Format = roachpb.IOFileFormat_MysqlOutfile
format.MysqlOut = roachpb.MySQLOutfileOptions{
Expand Down Expand Up @@ -370,9 +419,15 @@ func importPlanHook(
format.SaveRejected = true
}
case "MYSQLDUMP":
if err = validateFormatOptions(importStmt.FileFormat, opts, mysqlDumpAllowedOptions); err != nil {
return err
}
telemetry.Count("import.format.mysqldump")
format.Format = roachpb.IOFileFormat_Mysqldump
case "PGCOPY":
if err = validateFormatOptions(importStmt.FileFormat, opts, pgCopyAllowedOptions); err != nil {
return err
}
telemetry.Count("import.format.pgcopy")
format.Format = roachpb.IOFileFormat_PgCopy
format.PgCopy = roachpb.PgCopyOptions{
Expand Down Expand Up @@ -402,6 +457,9 @@ func importPlanHook(
}
format.PgCopy.MaxRowSize = maxRowSize
case "PGDUMP":
if err = validateFormatOptions(importStmt.FileFormat, opts, pgDumpAllowedOptions); err != nil {
return err
}
telemetry.Count("import.format.pgdump")
format.Format = roachpb.IOFileFormat_PgDump
maxRowSize := int32(defaultScanBuffer)
Expand All @@ -417,6 +475,9 @@ func importPlanHook(
}
format.PgDump.MaxRowSize = maxRowSize
case "AVRO":
if err = validateFormatOptions(importStmt.FileFormat, opts, avroAllowedOptions); err != nil {
return err
}
err := parseAvroOptions(ctx, opts, p, &format)
if err != nil {
return err
Expand Down
89 changes: 89 additions & 0 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"net/http"
"net/http/httptest"
"net/url"
Expand Down Expand Up @@ -1332,6 +1333,14 @@ func TestImportCSVStmt(t *testing.T) {
` WITH decompress = 'gzip'`,
"gzip: invalid header",
},
{
"csv-with-invalid-delimited-option",
`IMPORT TABLE t CREATE USING $1 CSV DATA (%s) WITH fields_delimited_by = '|'`,
schema,
testFiles.files,
``,
"invalid option",
},
} {
t.Run(tc.name, func(t *testing.T) {
if strings.Contains(tc.name, "bzip") && len(testFiles.bzipFiles) == 0 {
Expand Down Expand Up @@ -3876,3 +3885,83 @@ func TestImportClientDisconnect(t *testing.T) {
return nil
})
}

func TestDisallowsInvalidFormatOptions(t *testing.T) {
defer leaktest.AfterTest(t)()

allOpts := make(map[string]struct{})
addOpts := func(opts map[string]struct{}) {
for opt := range opts {
allOpts[opt] = struct{}{}
}
}
addOpts(allowedCommonOptions)
addOpts(avroAllowedOptions)
addOpts(csvAllowedOptions)
addOpts(mysqlDumpAllowedOptions)
addOpts(mysqlOutAllowedOptions)
addOpts(pgDumpAllowedOptions)
addOpts(pgCopyAllowedOptions)

// Helper to pick num options from the set of allowed and the set
// of all other options. Returns generated options plus a flag indicating
// if the generated options contain disallowed ones.
pickOpts := func(num int, allowed map[string]struct{}) (map[string]string, bool) {
opts := make(map[string]string, num)
haveDisallowed := false
var picks []string
if rand.Intn(10) > 5 {
for opt := range allOpts {
picks = append(picks, opt)
}
} else {
for opt := range allowed {
picks = append(picks, opt)
}
}
require.NotNil(t, picks)

for i := 0; i < num; i++ {
pick := picks[rand.Intn(len(picks))]
_, allowed := allowed[pick]
if !allowed {
_, allowed = allowedCommonOptions[pick]
}
if allowed {
opts[pick] = "ok"
} else {
opts[pick] = "bad"
haveDisallowed = true
}
}

return opts, haveDisallowed
}

tests := []struct {
format string
allowed map[string]struct{}
}{
{"avro", avroAllowedOptions},
{"csv", csvAllowedOptions},
{"mysqouout", mysqlOutAllowedOptions},
{"mysqldump", mysqlDumpAllowedOptions},
{"pgdump", pgDumpAllowedOptions},
{"pgcopy", pgCopyAllowedOptions},
}

for _, tc := range tests {
for i := 0; i < 5; i++ {
opts, haveBadOptions := pickOpts(i, tc.allowed)
t.Run(fmt.Sprintf("validate-%s-%d/badOpts=%t", tc.format, i, haveBadOptions),
func(t *testing.T) {
err := validateFormatOptions(tc.format, opts, tc.allowed)
if haveBadOptions {
require.Error(t, err, opts)
} else {
require.NoError(t, err, opts)
}
})
}
}
}

0 comments on commit aaa6040

Please sign in to comment.