From b507648b1b01567b38dd36d16cdd1666bc029e24 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Mon, 16 Mar 2020 16:25:16 -0400 Subject: [PATCH] importccl: Reject invalid options when importing data. Fixes #46090 Add validation logic to the options specified for the IMPORT statement. Report an error if the user specifies an option not supported by the import data format. Release notes (bug fix): Better error reporting when importing data. Release justification: low risk bug fix --- pkg/ccl/importccl/import_stmt.go | 61 ++++++++++++++++++ pkg/ccl/importccl/import_stmt_test.go | 89 +++++++++++++++++++++++++++ 2 files changed, 150 insertions(+) diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index c99daa202ab0..af01f385c353 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -113,6 +113,49 @@ var importOptionExpectValues = map[string]sql.KVStringOptValidate{ avroJSONRecords: sql.KVStringOptRequireNoValue, } +func makeStringSet(opts ...string) map[string]struct{} { + res := make(map[string]struct{}, len(opts)) + for _, opt := range opts { + res[opt] = struct{}{} + } + return res +} + +// Options common to all formats. +var allowedCommonOptions = makeStringSet( + importOptionSSTSize, importOptionDecompress, importOptionOversample, + importOptionSaveRejected, importOptionDisableGlobMatch) + +// Format specific allowed options. +var avroAllowedOptions = makeStringSet( + avroStrict, avroBinRecords, avroJSONRecords, + avroRecordsSeparatedBy, avroSchema, avroSchemaURI, optMaxRowSize, +) +var csvAllowedOptions = makeStringSet( + csvDelimiter, csvComment, csvNullIf, csvSkip, csvStrictQuotes, +) +var mysqlOutAllowedOptions = makeStringSet( + mysqlOutfileRowSep, mysqlOutfileFieldSep, mysqlOutfileEnclose, + mysqlOutfileEscape, csvNullIf, csvSkip, +) +var mysqlDumpAllowedOptions = makeStringSet(importOptionSkipFKs) +var pgCopyAllowedOptions = makeStringSet(pgCopyDelimiter, pgCopyNull, optMaxRowSize) +var pgDumpAllowedOptions = makeStringSet(optMaxRowSize, importOptionSkipFKs) + +func validateFormatOptions( + format string, specified map[string]string, formatAllowed map[string]struct{}, +) error { + for opt := range specified { + if _, ok := formatAllowed[opt]; !ok { + if _, ok = allowedCommonOptions[opt]; !ok { + return errors.Errorf( + "invalid option %q specified for %s import format", opt, format) + } + } + } + return nil +} + func importJobDescription( p sql.PlanHookState, orig *tree.Import, @@ -272,6 +315,9 @@ func importPlanHook( format := roachpb.IOFileFormat{} switch importStmt.FileFormat { case "CSV": + if err = validateFormatOptions(importStmt.FileFormat, opts, csvAllowedOptions); err != nil { + return err + } telemetry.Count("import.format.csv") format.Format = roachpb.IOFileFormat_CSV // Set the default CSV separator for the cases when it is not overwritten. @@ -313,6 +359,9 @@ func importPlanHook( format.SaveRejected = true } case "DELIMITED": + if err = validateFormatOptions(importStmt.FileFormat, opts, mysqlOutAllowedOptions); err != nil { + return err + } telemetry.Count("import.format.mysqlout") format.Format = roachpb.IOFileFormat_MysqlOutfile format.MysqlOut = roachpb.MySQLOutfileOptions{ @@ -370,9 +419,15 @@ func importPlanHook( format.SaveRejected = true } case "MYSQLDUMP": + if err = validateFormatOptions(importStmt.FileFormat, opts, mysqlDumpAllowedOptions); err != nil { + return err + } telemetry.Count("import.format.mysqldump") format.Format = roachpb.IOFileFormat_Mysqldump case "PGCOPY": + if err = validateFormatOptions(importStmt.FileFormat, opts, pgCopyAllowedOptions); err != nil { + return err + } telemetry.Count("import.format.pgcopy") format.Format = roachpb.IOFileFormat_PgCopy format.PgCopy = roachpb.PgCopyOptions{ @@ -402,6 +457,9 @@ func importPlanHook( } format.PgCopy.MaxRowSize = maxRowSize case "PGDUMP": + if err = validateFormatOptions(importStmt.FileFormat, opts, pgDumpAllowedOptions); err != nil { + return err + } telemetry.Count("import.format.pgdump") format.Format = roachpb.IOFileFormat_PgDump maxRowSize := int32(defaultScanBuffer) @@ -417,6 +475,9 @@ func importPlanHook( } format.PgDump.MaxRowSize = maxRowSize case "AVRO": + if err = validateFormatOptions(importStmt.FileFormat, opts, avroAllowedOptions); err != nil { + return err + } err := parseAvroOptions(ctx, opts, p, &format) if err != nil { return err diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index a85b410edd60..3681e763a7c6 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -15,6 +15,7 @@ import ( "fmt" "io" "io/ioutil" + "math/rand" "net/http" "net/http/httptest" "net/url" @@ -1333,6 +1334,14 @@ func TestImportCSVStmt(t *testing.T) { ` WITH decompress = 'gzip'`, "gzip: invalid header", }, + { + "csv-with-invalid-delimited-option", + `IMPORT TABLE t CREATE USING $1 CSV DATA (%s) WITH fields_delimited_by = '|'`, + schema, + testFiles.files, + ``, + "invalid option", + }, } { t.Run(tc.name, func(t *testing.T) { if strings.Contains(tc.name, "bzip") && len(testFiles.bzipFiles) == 0 { @@ -3877,3 +3886,83 @@ func TestImportClientDisconnect(t *testing.T) { return nil }) } + +func TestDisallowsInvalidFormatOptions(t *testing.T) { + defer leaktest.AfterTest(t)() + + allOpts := make(map[string]struct{}) + addOpts := func(opts map[string]struct{}) { + for opt := range opts { + allOpts[opt] = struct{}{} + } + } + addOpts(allowedCommonOptions) + addOpts(avroAllowedOptions) + addOpts(csvAllowedOptions) + addOpts(mysqlDumpAllowedOptions) + addOpts(mysqlOutAllowedOptions) + addOpts(pgDumpAllowedOptions) + addOpts(pgCopyAllowedOptions) + + // Helper to pick num options from the set of allowed and the set + // of all other options. Returns generated options plus a flag indicating + // if the generated options contain disallowed ones. + pickOpts := func(num int, allowed map[string]struct{}) (map[string]string, bool) { + opts := make(map[string]string, num) + haveDisallowed := false + var picks []string + if rand.Intn(10) > 5 { + for opt := range allOpts { + picks = append(picks, opt) + } + } else { + for opt := range allowed { + picks = append(picks, opt) + } + } + require.NotNil(t, picks) + + for i := 0; i < num; i++ { + pick := picks[rand.Intn(len(picks))] + _, allowed := allowed[pick] + if !allowed { + _, allowed = allowedCommonOptions[pick] + } + if allowed { + opts[pick] = "ok" + } else { + opts[pick] = "bad" + haveDisallowed = true + } + } + + return opts, haveDisallowed + } + + tests := []struct { + format string + allowed map[string]struct{} + }{ + {"avro", avroAllowedOptions}, + {"csv", csvAllowedOptions}, + {"mysqouout", mysqlOutAllowedOptions}, + {"mysqldump", mysqlDumpAllowedOptions}, + {"pgdump", pgDumpAllowedOptions}, + {"pgcopy", pgCopyAllowedOptions}, + } + + for _, tc := range tests { + for i := 0; i < 5; i++ { + opts, haveBadOptions := pickOpts(i, tc.allowed) + t.Run(fmt.Sprintf("validate-%s-%d/badOpts=%t", tc.format, i, haveBadOptions), + func(t *testing.T) { + err := validateFormatOptions(tc.format, opts, tc.allowed) + if haveBadOptions { + require.Error(t, err, opts) + } else { + require.NoError(t, err, opts) + } + }) + } + } +}